1use actix_web::web::ThinData;
8use eyre::Result;
9use tracing::{debug, info, instrument, warn};
10
11use crate::{
12 constants::{get_max_consecutive_status_failures, get_max_total_status_failures},
13 domain::{get_relayer_transaction, get_transaction_by_id, is_final_state, Transaction},
14 jobs::{Job, StatusCheckContext, TransactionStatusCheck},
15 models::{
16 ApiError, DefaultAppState, TransactionMetadata, TransactionRepoModel,
17 TransactionUpdateRequest,
18 },
19 observability::request_id::set_request_id,
20 queues::{HandlerError, WorkerContext},
21 repositories::TransactionRepository,
22};
23
24#[instrument(
25 level = "debug",
26 skip(job, state, ctx),
27 fields(
28 request_id = ?job.request_id,
29 job_id = %job.message_id,
30 job_type = %job.job_type.to_string(),
31 attempt = %ctx.attempt,
32 tx_id = %job.data.transaction_id,
33 relayer_id = %job.data.relayer_id,
34 task_id = %ctx.task_id,
35 )
36)]
37pub async fn transaction_status_handler(
38 job: Job<TransactionStatusCheck>,
39 state: ThinData<DefaultAppState>,
40 ctx: WorkerContext,
41) -> Result<(), HandlerError> {
42 if let Some(request_id) = job.request_id.clone() {
43 set_request_id(request_id);
44 }
45
46 let tx_repo = state.transaction_repository();
47
48 let req_result = handle_request(&job.data, &state, ctx.attempt, &ctx.task_id).await;
50
51 let tx_id = &job.data.transaction_id;
52
53 handle_result(
55 req_result.result,
56 &*tx_repo,
57 tx_id,
58 req_result.consecutive_failures,
59 req_result.total_failures,
60 req_result.should_retry_on_error,
61 )
62 .await
63}
64
65async fn handle_result<TR>(
76 result: Result<TransactionRepoModel>,
77 tx_repo: &TR,
78 tx_id: &str,
79 consecutive_failures: Option<u32>,
80 total_failures: Option<u32>,
81 should_retry_on_error: bool,
82) -> Result<(), HandlerError>
83where
84 TR: TransactionRepository + Send + Sync,
85{
86 match result {
87 Ok(tx) if is_final_state(&tx.status) => {
88 debug!(
91 tx_id = %tx.id,
92 relayer_id = %tx.relayer_id,
93 status = ?tx.status,
94 consecutive_failures = ?consecutive_failures,
95 total_failures = ?total_failures,
96 "transaction in final state, status check complete"
97 );
98
99 Ok(())
100 }
101 Ok(tx) => {
102 debug!(
104 tx_id = %tx.id,
105 relayer_id = %tx.relayer_id,
106 status = ?tx.status,
107 "transaction not in final state"
108 );
109
110 match (consecutive_failures, total_failures) {
113 (Some(consecutive), Some(total)) if consecutive > 0 || total > 0 => {
114 let update = TransactionUpdateRequest {
115 metadata: Some(TransactionMetadata {
116 consecutive_failures: 0,
117 total_failures: total,
118 }),
119 ..Default::default()
120 };
121 if let Err(e) = tx_repo.partial_update(tx_id.to_string(), update).await {
122 warn!(error = %e, tx_id = %tx_id, relayer_id = %tx.relayer_id, "failed to reset consecutive counter");
123 }
124 }
125 _ => {
126 }
128 }
129
130 Err(HandlerError::Retry(format!(
132 "transaction status: {:?} - not in final state, retrying",
133 tx.status
134 )))
135 }
136 Err(e) => {
137 if !should_retry_on_error {
139 info!(
140 error = %e,
141 tx_id = %tx_id,
142 "status check failed with permanent error, completing job without retry"
143 );
144 return Ok(());
145 }
146
147 match (consecutive_failures, total_failures) {
149 (Some(consecutive), Some(total)) => {
150 let new_consecutive = consecutive.saturating_add(1);
151 let new_total = total.saturating_add(1);
152
153 warn!(
154 error = %e,
155 tx_id = %tx_id,
156 consecutive_failures = new_consecutive,
157 total_failures = new_total,
158 "status check failed, incrementing failure counters"
159 );
160
161 let update = TransactionUpdateRequest {
163 metadata: Some(TransactionMetadata {
164 consecutive_failures: new_consecutive,
165 total_failures: new_total,
166 }),
167 ..Default::default()
168 };
169 if let Err(update_err) = tx_repo.partial_update(tx_id.to_string(), update).await
170 {
171 warn!(error = %update_err, tx_id = %tx_id, "failed to update counters");
172 }
173 }
174 _ => {
175 warn!(
177 error = %e,
178 tx_id = %tx_id,
179 "status check failed early, counters not available"
180 );
181 }
182 }
183
184 Err(HandlerError::Retry(format!("{e}")))
186 }
187 }
188}
189
190struct HandleRequestResult {
192 result: Result<TransactionRepoModel>,
193 consecutive_failures: Option<u32>,
194 total_failures: Option<u32>,
195 should_retry_on_error: bool,
197}
198
199async fn handle_request(
203 status_request: &TransactionStatusCheck,
204 state: &ThinData<DefaultAppState>,
205 attempt: usize,
206 task_id: &str,
207) -> HandleRequestResult {
208 let tx_id = &status_request.transaction_id;
209 debug!(
210 tx_id = %tx_id,
211 relayer_id = %status_request.relayer_id,
212 "handling transaction status check"
213 );
214
215 let transaction = match get_transaction_by_id(tx_id.clone(), state).await {
217 Ok(tx) => tx,
218 Err(ApiError::NotFound(msg)) => {
219 warn!(tx_id = %tx_id, "transaction not found, completing job without retry: {}", msg);
221 return HandleRequestResult {
222 result: Err(eyre::eyre!("Transaction not found: {}", msg)),
223 consecutive_failures: None,
224 total_failures: None,
225 should_retry_on_error: false,
226 };
227 }
228 Err(e) => {
229 return HandleRequestResult {
231 result: Err(e.into()),
232 consecutive_failures: None,
233 total_failures: None,
234 should_retry_on_error: true,
235 };
236 }
237 };
238
239 let (consecutive_failures, total_failures) = match &transaction.metadata {
241 Some(meta) => (meta.consecutive_failures, meta.total_failures),
242 None => (0, 0),
243 };
244
245 let network_type = transaction.network_type;
247 let max_consecutive = get_max_consecutive_status_failures(network_type);
248 let max_total = get_max_total_status_failures(network_type);
249
250 debug!(
251 tx_id = %tx_id,
252 consecutive_failures,
253 total_failures,
254 max_consecutive,
255 max_total,
256 attempt,
257 task_id = %task_id,
258 "handling transaction status check"
259 );
260
261 let context = StatusCheckContext::new(
263 consecutive_failures,
264 total_failures,
265 attempt as u32,
266 max_consecutive,
267 max_total,
268 network_type,
269 );
270
271 let relayer_transaction =
273 match get_relayer_transaction(status_request.relayer_id.clone(), state).await {
274 Ok(rt) => rt,
275 Err(ApiError::NotFound(msg)) => {
276 warn!(
278 tx_id = %tx_id,
279 relayer_id = %status_request.relayer_id,
280 "relayer or signer not found, completing job without retry: {}", msg
281 );
282 return HandleRequestResult {
283 result: Err(eyre::eyre!("Relayer or signer not found: {}", msg)),
284 consecutive_failures: Some(consecutive_failures),
285 total_failures: Some(total_failures),
286 should_retry_on_error: false,
287 };
288 }
289 Err(e) => {
290 return HandleRequestResult {
292 result: Err(e.into()),
293 consecutive_failures: Some(consecutive_failures),
294 total_failures: Some(total_failures),
295 should_retry_on_error: true,
296 };
297 }
298 };
299
300 let result = relayer_transaction
302 .handle_transaction_status(transaction, Some(context))
303 .await
304 .map_err(|e| e.into());
305
306 if let Ok(tx) = result.as_ref() {
307 debug!(
308 tx_id = %tx.id,
309 status = ?tx.status,
310 "status check handled successfully"
311 );
312 }
313
314 HandleRequestResult {
315 result,
316 consecutive_failures: Some(consecutive_failures),
317 total_failures: Some(total_failures),
318 should_retry_on_error: true,
319 }
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325 use crate::models::{NetworkType, TransactionStatus};
326 use std::collections::HashMap;
327
328 #[tokio::test]
329 async fn test_status_check_job_validation() {
330 let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
331 let job = Job::new(crate::jobs::JobType::TransactionStatusCheck, check_job);
332
333 assert_eq!(job.data.transaction_id, "tx123");
334 assert_eq!(job.data.relayer_id, "relayer-1");
335 assert!(job.data.metadata.is_none());
336 }
337
338 #[tokio::test]
339 async fn test_status_check_with_metadata() {
340 let mut metadata = HashMap::new();
341 metadata.insert("retry_count".to_string(), "2".to_string());
342 metadata.insert("last_status".to_string(), "pending".to_string());
343
344 let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm)
345 .with_metadata(metadata.clone());
346
347 assert!(check_job.metadata.is_some());
348 let job_metadata = check_job.metadata.unwrap();
349 assert_eq!(job_metadata.get("retry_count").unwrap(), "2");
350 assert_eq!(job_metadata.get("last_status").unwrap(), "pending");
351 }
352
353 #[test]
354 fn test_status_check_network_type_required() {
355 let check_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
357 assert!(check_job.network_type.is_some());
358
359 let solana_job = TransactionStatusCheck::new("tx456", "relayer-2", NetworkType::Solana);
361 assert_eq!(solana_job.network_type, Some(NetworkType::Solana));
362
363 let stellar_job = TransactionStatusCheck::new("tx789", "relayer-3", NetworkType::Stellar);
364 assert_eq!(stellar_job.network_type, Some(NetworkType::Stellar));
365 }
366
367 mod context_tests {
368 use super::*;
369
370 #[test]
371 fn test_context_should_force_finalize_below_threshold() {
372 let ctx = StatusCheckContext::new(5, 10, 15, 25, 75, NetworkType::Evm);
373 assert!(!ctx.should_force_finalize());
374 }
375
376 #[test]
377 fn test_context_should_force_finalize_consecutive_at_threshold() {
378 let ctx = StatusCheckContext::new(25, 30, 35, 25, 75, NetworkType::Evm);
379 assert!(ctx.should_force_finalize());
380 }
381
382 #[test]
383 fn test_context_should_force_finalize_total_at_threshold() {
384 let ctx = StatusCheckContext::new(10, 75, 80, 25, 75, NetworkType::Evm);
385 assert!(ctx.should_force_finalize());
386 }
387 }
388
389 mod final_state_tests {
390 use super::*;
391
392 fn verify_final_state(status: TransactionStatus) {
393 assert!(is_final_state(&status));
394 }
395
396 fn verify_not_final_state(status: TransactionStatus) {
397 assert!(!is_final_state(&status));
398 }
399
400 #[test]
401 fn test_confirmed_is_final() {
402 verify_final_state(TransactionStatus::Confirmed);
403 }
404
405 #[test]
406 fn test_failed_is_final() {
407 verify_final_state(TransactionStatus::Failed);
408 }
409
410 #[test]
411 fn test_canceled_is_final() {
412 verify_final_state(TransactionStatus::Canceled);
413 }
414
415 #[test]
416 fn test_expired_is_final() {
417 verify_final_state(TransactionStatus::Expired);
418 }
419
420 #[test]
421 fn test_pending_is_not_final() {
422 verify_not_final_state(TransactionStatus::Pending);
423 }
424
425 #[test]
426 fn test_sent_is_not_final() {
427 verify_not_final_state(TransactionStatus::Sent);
428 }
429
430 #[test]
431 fn test_submitted_is_not_final() {
432 verify_not_final_state(TransactionStatus::Submitted);
433 }
434
435 #[test]
436 fn test_mined_is_not_final() {
437 verify_not_final_state(TransactionStatus::Mined);
438 }
439 }
440
441 mod handle_result_tests {
442 use super::*;
443
444 #[test]
446 fn test_counter_increment_saturating() {
447 let consecutive: u32 = u32::MAX;
448 let total: u32 = u32::MAX;
449
450 let new_consecutive = consecutive.saturating_add(1);
451 let new_total = total.saturating_add(1);
452
453 assert_eq!(new_consecutive, u32::MAX);
455 assert_eq!(new_total, u32::MAX);
456 }
457
458 #[test]
460 fn test_counter_increment_normal() {
461 let consecutive: u32 = 5;
462 let total: u32 = 10;
463
464 let new_consecutive = consecutive.saturating_add(1);
465 let new_total = total.saturating_add(1);
466
467 assert_eq!(new_consecutive, 6);
468 assert_eq!(new_total, 11);
469 }
470
471 #[test]
473 fn test_consecutive_reset_on_success() {
474 let total: u32 = 20;
477
478 let new_consecutive = 0;
480 let new_total = total; assert_eq!(new_consecutive, 0);
483 assert_eq!(new_total, 20);
484 }
485
486 #[test]
488 fn test_final_state_triggers_cleanup() {
489 let final_states = vec![
490 TransactionStatus::Confirmed,
491 TransactionStatus::Failed,
492 TransactionStatus::Canceled,
493 TransactionStatus::Expired,
494 ];
495
496 for status in final_states {
497 assert!(
498 is_final_state(&status),
499 "Expected {status:?} to be a final state"
500 );
501 }
502 }
503
504 #[test]
506 fn test_non_final_state_triggers_retry() {
507 let non_final_states = vec![
508 TransactionStatus::Pending,
509 TransactionStatus::Sent,
510 TransactionStatus::Submitted,
511 TransactionStatus::Mined,
512 ];
513
514 for status in non_final_states {
515 assert!(
516 !is_final_state(&status),
517 "Expected {status:?} to NOT be a final state"
518 );
519 }
520 }
521 }
522
523 mod handle_request_result_tests {
524 use super::*;
525
526 #[test]
527 fn test_handle_request_result_with_counters() {
528 let result = HandleRequestResult {
529 result: Ok(TransactionRepoModel::default()),
530 consecutive_failures: Some(5),
531 total_failures: Some(10),
532 should_retry_on_error: true,
533 };
534
535 assert!(result.result.is_ok());
536 assert_eq!(result.consecutive_failures, Some(5));
537 assert_eq!(result.total_failures, Some(10));
538 assert!(result.should_retry_on_error);
539 }
540
541 #[test]
542 fn test_handle_request_result_without_counters() {
543 let result = HandleRequestResult {
545 result: Err(eyre::eyre!("Transaction not found")),
546 consecutive_failures: None,
547 total_failures: None,
548 should_retry_on_error: false,
549 };
550
551 assert!(result.result.is_err());
552 assert_eq!(result.consecutive_failures, None);
553 assert_eq!(result.total_failures, None);
554 assert!(!result.should_retry_on_error);
555 }
556
557 #[test]
558 fn test_permanent_error_should_not_retry() {
559 let result = HandleRequestResult {
561 result: Err(eyre::eyre!("Transaction not found")),
562 consecutive_failures: None,
563 total_failures: None,
564 should_retry_on_error: false,
565 };
566
567 assert!(!result.should_retry_on_error);
569 }
570
571 #[test]
572 fn test_transient_error_should_retry() {
573 let result = HandleRequestResult {
575 result: Err(eyre::eyre!("Connection timeout")),
576 consecutive_failures: Some(3),
577 total_failures: Some(7),
578 should_retry_on_error: true,
579 };
580
581 assert!(result.should_retry_on_error);
583 }
584 }
585}