openzeppelin_relayer/jobs/handlers/
transaction_status_handler.rs

1//! Transaction status monitoring handler.
2//!
3//! Monitors the status of submitted transactions by:
4//! - Checking transaction status on the network
5//! - Updating transaction status in storage
6//! - Tracking failure counts for circuit breaker decisions (stored in transaction metadata)
7use actix_web::web::ThinData;
8use eyre::Result;
9use tracing::{debug, info, instrument, warn};
10
11use crate::{
12    constants::{get_max_consecutive_status_failures, get_max_total_status_failures},
13    domain::{get_relayer_transaction, get_transaction_by_id, is_final_state, Transaction},
14    jobs::{Job, StatusCheckContext, TransactionStatusCheck},
15    models::{
16        ApiError, DefaultAppState, TransactionMetadata, TransactionRepoModel,
17        TransactionUpdateRequest,
18    },
19    observability::request_id::set_request_id,
20    queues::{HandlerError, WorkerContext},
21    repositories::TransactionRepository,
22};
23
24#[instrument(
25    level = "debug",
26    skip(job, state, ctx),
27    fields(
28        request_id = ?job.request_id,
29        job_id = %job.message_id,
30        job_type = %job.job_type.to_string(),
31        attempt = %ctx.attempt,
32        tx_id = %job.data.transaction_id,
33        relayer_id = %job.data.relayer_id,
34        task_id = %ctx.task_id,
35    )
36)]
37pub async fn transaction_status_handler(
38    job: Job<TransactionStatusCheck>,
39    state: ThinData<DefaultAppState>,
40    ctx: WorkerContext,
41) -> Result<(), HandlerError> {
42    if let Some(request_id) = job.request_id.clone() {
43        set_request_id(request_id);
44    }
45
46    let tx_repo = state.transaction_repository();
47
48    // Execute status check - all logic moved here so errors go through handle_result
49    let req_result = handle_request(&job.data, &state, ctx.attempt, &ctx.task_id).await;
50
51    let tx_id = &job.data.transaction_id;
52
53    // Handle result and update counters via transaction repository
54    handle_result(
55        req_result.result,
56        &*tx_repo,
57        tx_id,
58        req_result.consecutive_failures,
59        req_result.total_failures,
60        req_result.should_retry_on_error,
61    )
62    .await
63}
64
65/// Handles status check results with circuit breaker tracking.
66///
67/// # Strategy
68/// - If transaction is in final state → return Ok (job completes, metadata cleaned up via delete_at)
69/// - If success but not final → Reset consecutive to 0, return Err (retries)
70/// - If error with should_retry=true → Increment counters, return Err (retries)
71/// - If error with should_retry=false → Return Ok (job completes, e.g., transaction not found)
72/// - If counters are None (early failure) → Skip counter updates
73///
74/// Counters are stored in transaction metadata, persisted via partial_update.
75async fn handle_result<TR>(
76    result: Result<TransactionRepoModel>,
77    tx_repo: &TR,
78    tx_id: &str,
79    consecutive_failures: Option<u32>,
80    total_failures: Option<u32>,
81    should_retry_on_error: bool,
82) -> Result<(), HandlerError>
83where
84    TR: TransactionRepository + Send + Sync,
85{
86    match result {
87        Ok(tx) if is_final_state(&tx.status) => {
88            // Transaction reached final state - job complete
89            // No need to clean up counters - tx will be deleted via delete_at
90            debug!(
91                tx_id = %tx.id,
92                relayer_id = %tx.relayer_id,
93                status = ?tx.status,
94                consecutive_failures = ?consecutive_failures,
95                total_failures = ?total_failures,
96                "transaction in final state, status check complete"
97            );
98
99            Ok(())
100        }
101        Ok(tx) => {
102            // Success but not final - RESET consecutive counter, keep total unchanged
103            debug!(
104                tx_id = %tx.id,
105                relayer_id = %tx.relayer_id,
106                status = ?tx.status,
107                "transaction not in final state"
108            );
109
110            // Reset consecutive counter only if there were previous failures
111            // This avoids unnecessary writes for transactions that never failed
112            match (consecutive_failures, total_failures) {
113                (Some(consecutive), Some(total)) if consecutive > 0 || total > 0 => {
114                    let update = TransactionUpdateRequest {
115                        metadata: Some(TransactionMetadata {
116                            consecutive_failures: 0,
117                            total_failures: total,
118                        }),
119                        ..Default::default()
120                    };
121                    if let Err(e) = tx_repo.partial_update(tx_id.to_string(), update).await {
122                        warn!(error = %e, tx_id = %tx_id, relayer_id = %tx.relayer_id, "failed to reset consecutive counter");
123                    }
124                }
125                _ => {
126                    // No previous failures or counters not available - nothing to reset
127                }
128            }
129
130            // Return error to trigger retry
131            Err(HandlerError::Retry(format!(
132                "transaction status: {:?} - not in final state, retrying",
133                tx.status
134            )))
135        }
136        Err(e) => {
137            // Check if this is a permanent failure that shouldn't retry
138            if !should_retry_on_error {
139                info!(
140                    error = %e,
141                    tx_id = %tx_id,
142                    "status check failed with permanent error, completing job without retry"
143                );
144                return Ok(());
145            }
146
147            // Transient error - INCREMENT both counters (only if we have values)
148            match (consecutive_failures, total_failures) {
149                (Some(consecutive), Some(total)) => {
150                    let new_consecutive = consecutive.saturating_add(1);
151                    let new_total = total.saturating_add(1);
152
153                    warn!(
154                        error = %e,
155                        tx_id = %tx_id,
156                        consecutive_failures = new_consecutive,
157                        total_failures = new_total,
158                        "status check failed, incrementing failure counters"
159                    );
160
161                    // Update counters via transaction repository
162                    let update = TransactionUpdateRequest {
163                        metadata: Some(TransactionMetadata {
164                            consecutive_failures: new_consecutive,
165                            total_failures: new_total,
166                        }),
167                        ..Default::default()
168                    };
169                    if let Err(update_err) = tx_repo.partial_update(tx_id.to_string(), update).await
170                    {
171                        warn!(error = %update_err, tx_id = %tx_id, "failed to update counters");
172                    }
173                }
174                _ => {
175                    // Early failure before counters were read - skip counter update
176                    warn!(
177                        error = %e,
178                        tx_id = %tx_id,
179                        "status check failed early, counters not available"
180                    );
181                }
182            }
183
184            // Return error to trigger retry
185            Err(HandlerError::Retry(format!("{e}")))
186        }
187    }
188}
189
190/// Result of handle_request including whether to retry on error.
191struct HandleRequestResult {
192    result: Result<TransactionRepoModel>,
193    consecutive_failures: Option<u32>,
194    total_failures: Option<u32>,
195    /// If false, errors should not trigger retry (e.g., transaction not found)
196    should_retry_on_error: bool,
197}
198
199/// Executes the status check logic and returns the result with counter values.
200/// Returns None for counters if they couldn't be read (e.g., transaction fetch failed early).
201/// Sets should_retry_on_error=false for permanent failures like transaction not found.
202async fn handle_request(
203    status_request: &TransactionStatusCheck,
204    state: &ThinData<DefaultAppState>,
205    attempt: usize,
206    task_id: &str,
207) -> HandleRequestResult {
208    let tx_id = &status_request.transaction_id;
209    debug!(
210        tx_id = %tx_id,
211        relayer_id = %status_request.relayer_id,
212        "handling transaction status check"
213    );
214
215    // Fetch transaction - if this fails, we can't read counters yet
216    let transaction = match get_transaction_by_id(tx_id.clone(), state).await {
217        Ok(tx) => tx,
218        Err(ApiError::NotFound(msg)) => {
219            // Transaction not found - permanent failure, don't retry
220            warn!(tx_id = %tx_id, "transaction not found, completing job without retry: {}", msg);
221            return HandleRequestResult {
222                result: Err(eyre::eyre!("Transaction not found: {}", msg)),
223                consecutive_failures: None,
224                total_failures: None,
225                should_retry_on_error: false,
226            };
227        }
228        Err(e) => {
229            // Other errors - should retry
230            return HandleRequestResult {
231                result: Err(e.into()),
232                consecutive_failures: None,
233                total_failures: None,
234                should_retry_on_error: true,
235            };
236        }
237    };
238
239    // Read failure counters from transaction metadata
240    let (consecutive_failures, total_failures) = match &transaction.metadata {
241        Some(meta) => (meta.consecutive_failures, meta.total_failures),
242        None => (0, 0),
243    };
244
245    // Get network type from transaction (authoritative source)
246    let network_type = transaction.network_type;
247    let max_consecutive = get_max_consecutive_status_failures(network_type);
248    let max_total = get_max_total_status_failures(network_type);
249
250    debug!(
251        tx_id = %tx_id,
252        consecutive_failures,
253        total_failures,
254        max_consecutive,
255        max_total,
256        attempt,
257        task_id = %task_id,
258        "handling transaction status check"
259    );
260
261    // Build circuit breaker context
262    let context = StatusCheckContext::new(
263        consecutive_failures,
264        total_failures,
265        attempt as u32,
266        max_consecutive,
267        max_total,
268        network_type,
269    );
270
271    // Get relayer transaction handler
272    let relayer_transaction =
273        match get_relayer_transaction(status_request.relayer_id.clone(), state).await {
274            Ok(rt) => rt,
275            Err(ApiError::NotFound(msg)) => {
276                // Relayer or signer not found - permanent failure, don't retry
277                warn!(
278                    tx_id = %tx_id,
279                    relayer_id = %status_request.relayer_id,
280                    "relayer or signer not found, completing job without retry: {}", msg
281                );
282                return HandleRequestResult {
283                    result: Err(eyre::eyre!("Relayer or signer not found: {}", msg)),
284                    consecutive_failures: Some(consecutive_failures),
285                    total_failures: Some(total_failures),
286                    should_retry_on_error: false,
287                };
288            }
289            Err(e) => {
290                // Other errors - should retry
291                return HandleRequestResult {
292                    result: Err(e.into()),
293                    consecutive_failures: Some(consecutive_failures),
294                    total_failures: Some(total_failures),
295                    should_retry_on_error: true,
296                };
297            }
298        };
299
300    // Execute status check
301    let result = relayer_transaction
302        .handle_transaction_status(transaction, Some(context))
303        .await
304        .map_err(|e| e.into());
305
306    if let Ok(tx) = result.as_ref() {
307        debug!(
308            tx_id = %tx.id,
309            status = ?tx.status,
310            "status check handled successfully"
311        );
312    }
313
314    HandleRequestResult {
315        result,
316        consecutive_failures: Some(consecutive_failures),
317        total_failures: Some(total_failures),
318        should_retry_on_error: true,
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325    use crate::models::{NetworkType, TransactionStatus};
326    use std::collections::HashMap;
327
328    #[tokio::test]
329    async fn test_status_check_job_validation() {
330        let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
331        let job = Job::new(crate::jobs::JobType::TransactionStatusCheck, check_job);
332
333        assert_eq!(job.data.transaction_id, "tx123");
334        assert_eq!(job.data.relayer_id, "relayer-1");
335        assert!(job.data.metadata.is_none());
336    }
337
338    #[tokio::test]
339    async fn test_status_check_with_metadata() {
340        let mut metadata = HashMap::new();
341        metadata.insert("retry_count".to_string(), "2".to_string());
342        metadata.insert("last_status".to_string(), "pending".to_string());
343
344        let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm)
345            .with_metadata(metadata.clone());
346
347        assert!(check_job.metadata.is_some());
348        let job_metadata = check_job.metadata.unwrap();
349        assert_eq!(job_metadata.get("retry_count").unwrap(), "2");
350        assert_eq!(job_metadata.get("last_status").unwrap(), "pending");
351    }
352
353    #[test]
354    fn test_status_check_network_type_required() {
355        // Jobs should always have network_type set
356        let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
357        assert!(check_job.network_type.is_some());
358
359        // Verify different network types are preserved
360        let solana_job = TransactionStatusCheck::new("tx456", "relayer-2", NetworkType::Solana);
361        assert_eq!(solana_job.network_type, Some(NetworkType::Solana));
362
363        let stellar_job = TransactionStatusCheck::new("tx789", "relayer-3", NetworkType::Stellar);
364        assert_eq!(stellar_job.network_type, Some(NetworkType::Stellar));
365    }
366
367    mod context_tests {
368        use super::*;
369
370        #[test]
371        fn test_context_should_force_finalize_below_threshold() {
372            let ctx = StatusCheckContext::new(5, 10, 15, 25, 75, NetworkType::Evm);
373            assert!(!ctx.should_force_finalize());
374        }
375
376        #[test]
377        fn test_context_should_force_finalize_consecutive_at_threshold() {
378            let ctx = StatusCheckContext::new(25, 30, 35, 25, 75, NetworkType::Evm);
379            assert!(ctx.should_force_finalize());
380        }
381
382        #[test]
383        fn test_context_should_force_finalize_total_at_threshold() {
384            let ctx = StatusCheckContext::new(10, 75, 80, 25, 75, NetworkType::Evm);
385            assert!(ctx.should_force_finalize());
386        }
387    }
388
389    mod final_state_tests {
390        use super::*;
391
392        fn verify_final_state(status: TransactionStatus) {
393            assert!(is_final_state(&status));
394        }
395
396        fn verify_not_final_state(status: TransactionStatus) {
397            assert!(!is_final_state(&status));
398        }
399
400        #[test]
401        fn test_confirmed_is_final() {
402            verify_final_state(TransactionStatus::Confirmed);
403        }
404
405        #[test]
406        fn test_failed_is_final() {
407            verify_final_state(TransactionStatus::Failed);
408        }
409
410        #[test]
411        fn test_canceled_is_final() {
412            verify_final_state(TransactionStatus::Canceled);
413        }
414
415        #[test]
416        fn test_expired_is_final() {
417            verify_final_state(TransactionStatus::Expired);
418        }
419
420        #[test]
421        fn test_pending_is_not_final() {
422            verify_not_final_state(TransactionStatus::Pending);
423        }
424
425        #[test]
426        fn test_sent_is_not_final() {
427            verify_not_final_state(TransactionStatus::Sent);
428        }
429
430        #[test]
431        fn test_submitted_is_not_final() {
432            verify_not_final_state(TransactionStatus::Submitted);
433        }
434
435        #[test]
436        fn test_mined_is_not_final() {
437            verify_not_final_state(TransactionStatus::Mined);
438        }
439    }
440
441    mod handle_result_tests {
442        use super::*;
443
444        /// Tests that counter increment uses saturating_add to prevent overflow
445        #[test]
446        fn test_counter_increment_saturating() {
447            let consecutive: u32 = u32::MAX;
448            let total: u32 = u32::MAX;
449
450            let new_consecutive = consecutive.saturating_add(1);
451            let new_total = total.saturating_add(1);
452
453            // Should not overflow, stays at MAX
454            assert_eq!(new_consecutive, u32::MAX);
455            assert_eq!(new_total, u32::MAX);
456        }
457
458        /// Tests normal counter increment
459        #[test]
460        fn test_counter_increment_normal() {
461            let consecutive: u32 = 5;
462            let total: u32 = 10;
463
464            let new_consecutive = consecutive.saturating_add(1);
465            let new_total = total.saturating_add(1);
466
467            assert_eq!(new_consecutive, 6);
468            assert_eq!(new_total, 11);
469        }
470
471        /// Tests that consecutive counter resets to 0 on success (non-final)
472        #[test]
473        fn test_consecutive_reset_on_success() {
474            // When status check succeeds but tx is not final,
475            // consecutive should reset to 0, total stays unchanged
476            let total: u32 = 20;
477
478            // On success, consecutive resets
479            let new_consecutive = 0;
480            let new_total = total; // unchanged
481
482            assert_eq!(new_consecutive, 0);
483            assert_eq!(new_total, 20);
484        }
485
486        /// Tests that final states are correctly identified for cleanup
487        #[test]
488        fn test_final_state_triggers_cleanup() {
489            let final_states = vec![
490                TransactionStatus::Confirmed,
491                TransactionStatus::Failed,
492                TransactionStatus::Canceled,
493                TransactionStatus::Expired,
494            ];
495
496            for status in final_states {
497                assert!(
498                    is_final_state(&status),
499                    "Expected {status:?} to be a final state"
500                );
501            }
502        }
503
504        /// Tests that non-final states trigger retry
505        #[test]
506        fn test_non_final_state_triggers_retry() {
507            let non_final_states = vec![
508                TransactionStatus::Pending,
509                TransactionStatus::Sent,
510                TransactionStatus::Submitted,
511                TransactionStatus::Mined,
512            ];
513
514            for status in non_final_states {
515                assert!(
516                    !is_final_state(&status),
517                    "Expected {status:?} to NOT be a final state"
518                );
519            }
520        }
521    }
522
523    mod handle_request_result_tests {
524        use super::*;
525
526        #[test]
527        fn test_handle_request_result_with_counters() {
528            let result = HandleRequestResult {
529                result: Ok(TransactionRepoModel::default()),
530                consecutive_failures: Some(5),
531                total_failures: Some(10),
532                should_retry_on_error: true,
533            };
534
535            assert!(result.result.is_ok());
536            assert_eq!(result.consecutive_failures, Some(5));
537            assert_eq!(result.total_failures, Some(10));
538            assert!(result.should_retry_on_error);
539        }
540
541        #[test]
542        fn test_handle_request_result_without_counters() {
543            // Early failure before counters could be read
544            let result = HandleRequestResult {
545                result: Err(eyre::eyre!("Transaction not found")),
546                consecutive_failures: None,
547                total_failures: None,
548                should_retry_on_error: false,
549            };
550
551            assert!(result.result.is_err());
552            assert_eq!(result.consecutive_failures, None);
553            assert_eq!(result.total_failures, None);
554            assert!(!result.should_retry_on_error);
555        }
556
557        #[test]
558        fn test_permanent_error_should_not_retry() {
559            // NotFound errors are permanent - should not retry
560            let result = HandleRequestResult {
561                result: Err(eyre::eyre!("Transaction not found")),
562                consecutive_failures: None,
563                total_failures: None,
564                should_retry_on_error: false,
565            };
566
567            // Permanent errors have should_retry_on_error = false
568            assert!(!result.should_retry_on_error);
569        }
570
571        #[test]
572        fn test_transient_error_should_retry() {
573            // Network/connection errors are transient - should retry
574            let result = HandleRequestResult {
575                result: Err(eyre::eyre!("Connection timeout")),
576                consecutive_failures: Some(3),
577                total_failures: Some(7),
578                should_retry_on_error: true,
579            };
580
581            // Transient errors have should_retry_on_error = true
582            assert!(result.should_retry_on_error);
583        }
584    }
585}