openzeppelin_relayer/queues/sqs/
worker.rs

1//! SQS worker implementation for polling and processing messages.
2//!
3//! This module provides worker tasks that poll SQS queues and process jobs
4//! using the existing handler functions.
5
6use std::future::Future;
7use std::panic::AssertUnwindSafe;
8use std::sync::Arc;
9use std::time::Duration;
10
11use actix_web::web::ThinData;
12use aws_sdk_sqs::error::{ProvideErrorMetadata, SdkError};
13use aws_sdk_sqs::types::{
14    DeleteMessageBatchRequestEntry, Message, MessageAttributeValue, MessageSystemAttributeName,
15};
16use futures::FutureExt;
17use serde::de::DeserializeOwned;
18use tokio::sync::watch;
19use tokio::task::{JoinHandle, JoinSet};
20use tracing::{debug, error, info, warn};
21
22use crate::queues::{backoff_config_for_queue, retry_delay_secs};
23use crate::{
24    config::ServerConfig,
25    jobs::{
26        notification_handler, relayer_health_check_handler, token_swap_request_handler,
27        transaction_request_handler, transaction_status_handler, transaction_submission_handler,
28        Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
29        TransactionSend, TransactionStatusCheck,
30    },
31};
32
33use super::{HandlerError, WorkerContext};
34use super::{QueueBackendError, QueueType, WorkerHandle};
35
36#[derive(Debug)]
37enum ProcessingError {
38    Retryable(String),
39    Permanent(String),
40}
41
42/// Outcome of processing a single SQS message, used to decide whether the
43/// message should be batch-deleted or left in the queue.
44#[derive(Debug)]
45enum MessageOutcome {
46    /// Message processed successfully — should be deleted from queue.
47    Delete { receipt_handle: String },
48    /// Message should remain in queue (e.g. status-check retry via visibility
49    /// change, or retryable error awaiting visibility timeout).
50    Retain,
51}
52
53/// Spawns a worker task for a specific SQS queue.
54///
55/// The worker continuously polls the queue, processes messages, and handles
56/// retries via SQS visibility timeout.
57///
58/// # Arguments
59/// * `sqs_client` - AWS SQS client for all operations (poll, send, delete, change visibility)
60/// * `queue_type` - Type of queue (determines handler and concurrency)
61/// * `queue_url` - SQS queue URL
62/// * `app_state` - Application state with repositories and services
63///
64/// # Returns
65/// JoinHandle to the spawned worker task
66pub async fn spawn_worker_for_queue(
67    sqs_client: aws_sdk_sqs::Client,
68    queue_type: QueueType,
69    queue_url: String,
70    app_state: Arc<ThinData<crate::models::DefaultAppState>>,
71    mut shutdown_rx: watch::Receiver<bool>,
72) -> Result<WorkerHandle, QueueBackendError> {
73    let concurrency = get_concurrency_for_queue(queue_type);
74    let max_retries = queue_type.max_retries();
75    let polling_interval = queue_type.polling_interval_secs();
76    let visibility_timeout = queue_type.visibility_timeout_secs();
77    let handler_timeout_secs = handler_timeout_secs(queue_type);
78    let handler_timeout = Duration::from_secs(handler_timeout_secs);
79
80    info!(
81        queue_type = ?queue_type,
82        queue_url = %queue_url,
83        concurrency = concurrency,
84        max_retries = max_retries,
85        polling_interval_secs = polling_interval,
86        visibility_timeout_secs = visibility_timeout,
87        handler_timeout_secs = handler_timeout_secs,
88        "Spawning SQS worker"
89    );
90
91    let handle: JoinHandle<()> = tokio::spawn(async move {
92        let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency));
93        let mut inflight: JoinSet<Option<String>> = JoinSet::new();
94        let mut consecutive_poll_errors: u32 = 0;
95        let mut pending_deletes: Vec<String> = Vec::new();
96
97        loop {
98            // Reap completed tasks and collect receipt handles for batch delete
99            while let Some(result) = inflight.try_join_next() {
100                match result {
101                    Ok(Some(receipt_handle)) => pending_deletes.push(receipt_handle),
102                    Ok(None) => {} // Retained message, no delete needed
103                    Err(e) => {
104                        warn!(
105                            queue_type = ?queue_type,
106                            error = %e,
107                            "In-flight task failed"
108                        );
109                    }
110                }
111            }
112
113            // Flush any accumulated deletes as a batch
114            if !pending_deletes.is_empty() {
115                flush_delete_batch(&sqs_client, &queue_url, &pending_deletes, queue_type).await;
116                pending_deletes.clear();
117            }
118
119            // Check shutdown before each iteration
120            if *shutdown_rx.borrow() {
121                info!(queue_type = ?queue_type, "Shutdown signal received, stopping SQS worker");
122                break;
123            }
124
125            // Do not poll for more messages than we can process; otherwise
126            // messages sit in-flight waiting for permits.
127            let available_permits = semaphore.available_permits();
128            if available_permits == 0 {
129                tokio::select! {
130                    _ = tokio::time::sleep(Duration::from_millis(50)) => continue,
131                    _ = shutdown_rx.changed() => {
132                        info!(queue_type = ?queue_type, "Shutdown signal received, stopping SQS worker");
133                        break;
134                    }
135                }
136            }
137
138            let batch_size = available_permits.min(10) as i32;
139
140            // Poll SQS for messages, racing with shutdown signal
141            let messages_result = tokio::select! {
142                result = sqs_client
143                    .receive_message()
144                    .queue_url(&queue_url)
145                    .max_number_of_messages(batch_size) // SQS max is 10
146                    .wait_time_seconds(polling_interval as i32)
147                    .visibility_timeout(visibility_timeout as i32)
148                    .message_system_attribute_names(MessageSystemAttributeName::ApproximateReceiveCount)
149                    .message_system_attribute_names(MessageSystemAttributeName::MessageGroupId)
150                    .message_attribute_names("target_scheduled_on")
151                    .message_attribute_names("retry_attempt")
152                    .send() => result,
153                _ = shutdown_rx.changed() => {
154                    info!(queue_type = ?queue_type, "Shutdown signal received during SQS poll, stopping worker");
155                    break;
156                }
157            };
158
159            match messages_result {
160                Ok(output) => {
161                    if consecutive_poll_errors > 0 {
162                        info!(
163                            queue_type = ?queue_type,
164                            previous_errors = consecutive_poll_errors,
165                            "SQS polling recovered after consecutive errors"
166                        );
167                    }
168                    consecutive_poll_errors = 0;
169
170                    if let Some(messages) = output.messages {
171                        if !messages.is_empty() {
172                            debug!(
173                                queue_type = ?queue_type,
174                                message_count = messages.len(),
175                                "Received messages from SQS"
176                            );
177
178                            // Process messages concurrently (up to semaphore limit)
179                            for message in messages {
180                                let permit = match semaphore.clone().acquire_owned().await {
181                                    Ok(permit) => permit,
182                                    Err(err) => {
183                                        error!(
184                                            queue_type = ?queue_type,
185                                            error = %err,
186                                            "Semaphore closed, stopping SQS worker loop"
187                                        );
188                                        return;
189                                    }
190                                };
191                                let client = sqs_client.clone();
192                                let url = queue_url.clone();
193                                let state = app_state.clone();
194
195                                inflight.spawn(async move {
196                                    let _permit = permit; // always dropped, even on panic
197
198                                    let result = tokio::time::timeout(
199                                        handler_timeout,
200                                        AssertUnwindSafe(process_message(
201                                            client.clone(),
202                                            message,
203                                            queue_type,
204                                            &url,
205                                            state,
206                                            max_retries,
207                                        ))
208                                        .catch_unwind(),
209                                    )
210                                    .await;
211
212                                    match result {
213                                        Ok(Ok(Ok(MessageOutcome::Delete { receipt_handle }))) => {
214                                            Some(receipt_handle)
215                                        }
216                                        Ok(Ok(Ok(MessageOutcome::Retain))) => None,
217                                        Ok(Ok(Err(e))) => {
218                                            error!(
219                                                queue_type = ?queue_type,
220                                                error = %e,
221                                                "Failed to process message"
222                                            );
223                                            None
224                                        }
225                                        Ok(Err(panic_info)) => {
226                                            let msg = panic_info
227                                                .downcast_ref::<String>()
228                                                .map(|s| s.as_str())
229                                                .or_else(|| {
230                                                    panic_info.downcast_ref::<&str>().copied()
231                                                })
232                                                .unwrap_or("unknown panic");
233                                            error!(
234                                                queue_type = ?queue_type,
235                                                panic = %msg,
236                                                "Message handler panicked"
237                                            );
238                                            None
239                                        }
240                                        Err(_) => {
241                                            error!(
242                                                queue_type = ?queue_type,
243                                                timeout_secs = handler_timeout.as_secs(),
244                                                "Message handler timed out; message will be retried after visibility timeout"
245                                            );
246                                            None
247                                        }
248                                    }
249                                });
250                            }
251                        }
252                    }
253                }
254                Err(e) => {
255                    consecutive_poll_errors = consecutive_poll_errors.saturating_add(1);
256                    let backoff_secs = poll_error_backoff_secs(consecutive_poll_errors);
257                    let (error_kind, error_code, error_message) = match &e {
258                        SdkError::ServiceError(ctx) => {
259                            ("service", ctx.err().code(), ctx.err().message())
260                        }
261                        SdkError::DispatchFailure(_) => ("dispatch", None, None),
262                        SdkError::ResponseError(_) => ("response", None, None),
263                        SdkError::TimeoutError(_) => ("timeout", None, None),
264                        _ => ("other", None, None),
265                    };
266                    error!(
267                        queue_type = ?queue_type,
268                        error_kind = error_kind,
269                        error_code = error_code.unwrap_or("unknown"),
270                        error_message = error_message.unwrap_or("n/a"),
271                        error = %e,
272                        error_debug = ?e,
273                        consecutive_errors = consecutive_poll_errors,
274                        backoff_secs = backoff_secs,
275                        "Failed to receive messages from SQS, backing off"
276                    );
277                    tokio::select! {
278                        _ = tokio::time::sleep(Duration::from_secs(backoff_secs)) => {}
279                        _ = shutdown_rx.changed() => {
280                            info!(queue_type = ?queue_type, "Shutdown signal received during backoff, stopping worker");
281                            break;
282                        }
283                    }
284                }
285            }
286        }
287
288        // Drain in-flight tasks before shutdown, collecting final deletes
289        if !inflight.is_empty() {
290            info!(
291                queue_type = ?queue_type,
292                count = inflight.len(),
293                "Draining in-flight tasks before shutdown"
294            );
295            match tokio::time::timeout(Duration::from_secs(30), async {
296                while let Some(result) = inflight.join_next().await {
297                    match result {
298                        Ok(Some(receipt_handle)) => pending_deletes.push(receipt_handle),
299                        Ok(None) => {}
300                        Err(e) => {
301                            warn!(
302                                queue_type = ?queue_type,
303                                error = %e,
304                                "In-flight task failed during drain"
305                            );
306                        }
307                    }
308                }
309            })
310            .await
311            {
312                Ok(()) => info!(queue_type = ?queue_type, "All in-flight tasks drained"),
313                Err(_) => {
314                    warn!(
315                        queue_type = ?queue_type,
316                        remaining = inflight.len(),
317                        "Drain timeout, abandoning remaining tasks"
318                    );
319                    inflight.abort_all();
320                }
321            }
322        }
323
324        // Flush any remaining deletes accumulated during drain
325        if !pending_deletes.is_empty() {
326            flush_delete_batch(&sqs_client, &queue_url, &pending_deletes, queue_type).await;
327        }
328
329        info!(queue_type = ?queue_type, "SQS worker stopped");
330    });
331
332    Ok(WorkerHandle::Tokio(handle))
333}
334
335/// Processes a single SQS message.
336///
337/// Routes the message to the appropriate handler based on queue type,
338/// handles success/failure, and manages message deletion/retry.
339async fn process_message(
340    sqs_client: aws_sdk_sqs::Client,
341    message: Message,
342    queue_type: QueueType,
343    queue_url: &str,
344    app_state: Arc<ThinData<crate::models::DefaultAppState>>,
345    max_retries: usize,
346) -> Result<MessageOutcome, QueueBackendError> {
347    let body = message
348        .body()
349        .ok_or_else(|| QueueBackendError::QueueError("Empty message body".to_string()))?;
350
351    let receipt_handle = message
352        .receipt_handle()
353        .ok_or_else(|| QueueBackendError::QueueError("Missing receipt handle".to_string()))?;
354
355    // For jobs with scheduling beyond SQS 15-minute max delay, keep deferring in hops.
356    if let Some(target_scheduled_on) = parse_target_scheduled_on(&message) {
357        let now = std::time::SystemTime::now()
358            .duration_since(std::time::SystemTime::UNIX_EPOCH)
359            .map_err(|e| QueueBackendError::QueueError(format!("System clock error: {e}")))?
360            .as_secs() as i64;
361        let remaining = target_scheduled_on - now;
362        if remaining > 0 {
363            let should_delete_original = defer_message(
364                &sqs_client,
365                queue_url,
366                body.to_string(),
367                &message,
368                target_scheduled_on,
369                remaining.min(900) as i32,
370            )
371            .await?;
372
373            debug!(
374                queue_type = ?queue_type,
375                remaining_seconds = remaining,
376                "Deferred scheduled SQS message for next delay hop"
377            );
378            return if should_delete_original {
379                Ok(MessageOutcome::Delete {
380                    receipt_handle: receipt_handle.to_string(),
381                })
382            } else {
383                Ok(MessageOutcome::Retain)
384            };
385        }
386    }
387
388    // Get retry attempt count from message attributes
389    let receive_count = message
390        .attributes()
391        .and_then(|attrs| attrs.get(&MessageSystemAttributeName::ApproximateReceiveCount))
392        .and_then(|count| count.parse::<usize>().ok())
393        .unwrap_or(1);
394    // SQS receive count starts at 1; Apalis Attempt starts at 0.
395    let attempt_number = receive_count.saturating_sub(1);
396    // Persisted retry attempt for self-reenqueued status checks. Falls back to receive_count-based
397    // attempt when attribute is missing.
398    let logical_retry_attempt = parse_retry_attempt(&message).unwrap_or(attempt_number);
399
400    // Use SQS MessageId as the worker task_id for log correlation.
401    let sqs_message_id = message.message_id().unwrap_or("unknown").to_string();
402
403    debug!(
404        queue_type = ?queue_type,
405        message_id = %sqs_message_id,
406        attempt = attempt_number,
407        receive_count = receive_count,
408        max_retries = max_retries,
409        "Processing message"
410    );
411
412    // Route to appropriate handler
413    let result = match queue_type {
414        QueueType::TransactionRequest => {
415            process_job::<TransactionRequest, _, _>(
416                body,
417                app_state,
418                attempt_number,
419                sqs_message_id,
420                "TransactionRequest",
421                transaction_request_handler,
422            )
423            .await
424        }
425        QueueType::TransactionSubmission => {
426            process_job::<TransactionSend, _, _>(
427                body,
428                app_state,
429                attempt_number,
430                sqs_message_id,
431                "TransactionSend",
432                transaction_submission_handler,
433            )
434            .await
435        }
436        QueueType::StatusCheck | QueueType::StatusCheckEvm | QueueType::StatusCheckStellar => {
437            process_job::<TransactionStatusCheck, _, _>(
438                body,
439                app_state,
440                attempt_number,
441                sqs_message_id,
442                "TransactionStatusCheck",
443                transaction_status_handler,
444            )
445            .await
446        }
447        QueueType::Notification => {
448            process_job::<NotificationSend, _, _>(
449                body,
450                app_state,
451                attempt_number,
452                sqs_message_id,
453                "NotificationSend",
454                notification_handler,
455            )
456            .await
457        }
458        QueueType::TokenSwapRequest => {
459            process_job::<TokenSwapRequest, _, _>(
460                body,
461                app_state,
462                attempt_number,
463                sqs_message_id,
464                "TokenSwapRequest",
465                token_swap_request_handler,
466            )
467            .await
468        }
469        QueueType::RelayerHealthCheck => {
470            process_job::<RelayerHealthCheck, _, _>(
471                body,
472                app_state,
473                attempt_number,
474                sqs_message_id,
475                "RelayerHealthCheck",
476                relayer_health_check_handler,
477            )
478            .await
479        }
480    };
481
482    match result {
483        Ok(()) => {
484            debug!(
485                queue_type = ?queue_type,
486                attempt = attempt_number,
487                "Message processed successfully"
488            );
489
490            Ok(MessageOutcome::Delete {
491                receipt_handle: receipt_handle.to_string(),
492            })
493        }
494        Err(ProcessingError::Permanent(e)) => {
495            error!(
496                queue_type = ?queue_type,
497                attempt = attempt_number,
498                error = %e,
499                "Permanent handler failure, message will be deleted"
500            );
501
502            Ok(MessageOutcome::Delete {
503                receipt_handle: receipt_handle.to_string(),
504            })
505        }
506        Err(ProcessingError::Retryable(e)) => {
507            // Check max retries for non-infinite queues (status checks use usize::MAX)
508            if max_retries != usize::MAX && receive_count > max_retries {
509                error!(
510                    queue_type = ?queue_type,
511                    attempt = attempt_number,
512                    receive_count = receive_count,
513                    max_retries = max_retries,
514                    error = %e,
515                    "Max retries exceeded; message will be automatically moved to DLQ by SQS redrive policy"
516                );
517                return Ok(MessageOutcome::Retain);
518            }
519
520            // Compute retry delay based on queue type:
521            // - Status checks use network-type-aware backoff from the message body
522            // - All other queues use their configured backoff profile from retry_config
523            let delay = if queue_type.is_status_check() {
524                compute_status_retry_delay(body, logical_retry_attempt)
525            } else {
526                retry_delay_secs(backoff_config_for_queue(queue_type), logical_retry_attempt)
527            };
528
529            // FIFO queues do not support per-message DelaySeconds. Use visibility
530            // timeout on the in-flight message to schedule the retry.
531            if is_fifo_queue_url(queue_url) {
532                if let Err(err) = sqs_client
533                    .change_message_visibility()
534                    .queue_url(queue_url)
535                    .receipt_handle(receipt_handle)
536                    .visibility_timeout(delay.clamp(1, 900))
537                    .send()
538                    .await
539                {
540                    error!(
541                        queue_type = ?queue_type,
542                        error = %err,
543                        "Failed to set visibility timeout for retry; falling back to existing visibility timeout"
544                    );
545                    return Ok(MessageOutcome::Retain);
546                }
547
548                debug!(
549                    queue_type = ?queue_type,
550                    attempt = logical_retry_attempt,
551                    delay_seconds = delay,
552                    error = %e,
553                    "Retry scheduled via visibility timeout"
554                );
555
556                return Ok(MessageOutcome::Retain);
557            }
558
559            let next_retry_attempt = logical_retry_attempt.saturating_add(1);
560
561            // Standard queues: re-enqueue with native DelaySeconds,
562            // no group_id or dedup_id needed. Duplicate deliveries are
563            // harmless because handlers are idempotent.
564            if let Err(send_err) = sqs_client
565                .send_message()
566                .queue_url(queue_url)
567                .message_body(body.to_string())
568                .delay_seconds(delay)
569                .message_attributes(
570                    "retry_attempt",
571                    MessageAttributeValue::builder()
572                        .data_type("Number")
573                        .string_value(next_retry_attempt.to_string())
574                        .build()
575                        .map_err(|err| {
576                            QueueBackendError::SqsError(format!(
577                                "Failed to build retry_attempt attribute: {err}"
578                            ))
579                        })?,
580                )
581                .send()
582                .await
583            {
584                error!(
585                    queue_type = ?queue_type,
586                    error = %send_err,
587                    "Failed to re-enqueue message; leaving original for visibility timeout retry"
588                );
589                // Fall through — original message will retry after visibility timeout
590                return Ok(MessageOutcome::Retain);
591            }
592
593            debug!(
594                queue_type = ?queue_type,
595                attempt = logical_retry_attempt,
596                delay_seconds = delay,
597                error = %e,
598                "Message re-enqueued with backoff delay"
599            );
600
601            // Delete the original message now that the re-enqueue succeeded
602            Ok(MessageOutcome::Delete {
603                receipt_handle: receipt_handle.to_string(),
604            })
605        }
606    }
607}
608
609/// Generic job processor — deserializes `Job<T>`, creates a `WorkerContext`,
610/// and delegates to the provided handler function.
611async fn process_job<T, F, Fut>(
612    body: &str,
613    app_state: Arc<ThinData<crate::models::DefaultAppState>>,
614    attempt: usize,
615    task_id: String,
616    type_name: &str,
617    handler: F,
618) -> Result<(), ProcessingError>
619where
620    T: DeserializeOwned,
621    F: FnOnce(Job<T>, ThinData<crate::models::DefaultAppState>, WorkerContext) -> Fut,
622    Fut: Future<Output = Result<(), HandlerError>>,
623{
624    let job: Job<T> = serde_json::from_str(body).map_err(|e| {
625        error!(error = %e, "Failed to deserialize {} job", type_name);
626        // Malformed payload is not recoverable by retrying the same message body.
627        ProcessingError::Permanent(format!("Failed to deserialize {type_name} job: {e}"))
628    })?;
629
630    let ctx = WorkerContext::new(attempt, task_id);
631    handler(job, (*app_state).clone(), ctx)
632        .await
633        .map_err(map_handler_error)
634}
635
636fn map_handler_error(error: HandlerError) -> ProcessingError {
637    match error {
638        HandlerError::Abort(msg) => ProcessingError::Permanent(msg),
639        HandlerError::Retry(msg) => ProcessingError::Retryable(msg),
640    }
641}
642
643fn parse_target_scheduled_on(message: &Message) -> Option<i64> {
644    message
645        .message_attributes()
646        .and_then(|attrs| attrs.get("target_scheduled_on"))
647        .and_then(|value| value.string_value())
648        .and_then(|value| value.parse::<i64>().ok())
649}
650
651fn parse_retry_attempt(message: &Message) -> Option<usize> {
652    message
653        .message_attributes()
654        .and_then(|attrs| attrs.get("retry_attempt"))
655        .and_then(|value| value.string_value())
656        .and_then(|value| value.parse::<usize>().ok())
657}
658
659fn is_fifo_queue_url(queue_url: &str) -> bool {
660    queue_url.ends_with(".fifo")
661}
662
663async fn defer_message(
664    sqs_client: &aws_sdk_sqs::Client,
665    queue_url: &str,
666    body: String,
667    message: &Message,
668    target_scheduled_on: i64,
669    delay_seconds: i32,
670) -> Result<bool, QueueBackendError> {
671    if is_fifo_queue_url(queue_url) {
672        let receipt_handle = message.receipt_handle().ok_or_else(|| {
673            QueueBackendError::QueueError(
674                "Cannot defer FIFO message: missing receipt handle".to_string(),
675            )
676        })?;
677
678        sqs_client
679            .change_message_visibility()
680            .queue_url(queue_url)
681            .receipt_handle(receipt_handle)
682            .visibility_timeout(delay_seconds.clamp(1, 900))
683            .send()
684            .await
685            .map_err(|e| {
686                QueueBackendError::SqsError(format!(
687                    "Failed to defer FIFO message via visibility timeout: {e}"
688                ))
689            })?;
690
691        return Ok(false);
692    }
693
694    // Standard queues support native per-message DelaySeconds — no need for
695    // group_id or dedup_id. Just re-send with the delay and scheduling attribute.
696    let request = sqs_client
697        .send_message()
698        .queue_url(queue_url)
699        .message_body(body)
700        .delay_seconds(delay_seconds.clamp(1, 900))
701        .message_attributes(
702            "target_scheduled_on",
703            MessageAttributeValue::builder()
704                .data_type("Number")
705                .string_value(target_scheduled_on.to_string())
706                .build()
707                .map_err(|e| {
708                    QueueBackendError::SqsError(format!(
709                        "Failed to build deferred scheduled attribute: {e}"
710                    ))
711                })?,
712        );
713
714    request.send().await.map_err(|e| {
715        QueueBackendError::SqsError(format!("Failed to defer scheduled message: {e}"))
716    })?;
717
718    Ok(true)
719}
720
721/// Partial struct for deserializing only the `network_type` field from a status check job.
722///
723/// Used to avoid deserializing the entire `Job<TransactionStatusCheck>` when we only
724/// need the network type to determine retry delay.
725#[derive(serde::Deserialize)]
726struct StatusCheckData {
727    network_type: Option<crate::models::NetworkType>,
728}
729
730/// Partial struct matching `Job<TransactionStatusCheck>` structure.
731///
732/// Used for efficient partial deserialization to extract only the `network_type`
733/// field without parsing the entire job payload.
734#[derive(serde::Deserialize)]
735struct PartialStatusCheckJob {
736    data: StatusCheckData,
737}
738
739/// Extracts `network_type` from a status check payload and computes retry delay.
740///
741/// This uses hardcoded network-specific backoff windows aligned with Redis/Apalis:
742/// - EVM: 8s -> 12s cap
743/// - Stellar: 2s -> 3s cap
744/// - Solana/default: 5s -> 8s cap
745fn compute_status_retry_delay(body: &str, attempt: usize) -> i32 {
746    let network_type = serde_json::from_str::<PartialStatusCheckJob>(body)
747        .ok()
748        .and_then(|j| j.data.network_type);
749
750    crate::queues::retry_config::status_check_retry_delay_secs(network_type, attempt)
751}
752
753/// Gets the concurrency limit for a queue type from environment.
754fn get_concurrency_for_queue(queue_type: QueueType) -> usize {
755    let configured = ServerConfig::get_worker_concurrency(
756        queue_type.concurrency_env_key(),
757        queue_type.default_concurrency(),
758    );
759    if configured == 0 {
760        warn!(
761            queue_type = ?queue_type,
762            "Configured concurrency is 0; clamping to 1"
763        );
764        1
765    } else {
766        configured
767    }
768}
769
770/// Maximum allowed wall-clock processing time per message before the handler task is canceled.
771///
772/// Keep this bounded so permits cannot be held forever by hung handlers.
773fn handler_timeout_secs(queue_type: QueueType) -> u64 {
774    u64::from(queue_type.visibility_timeout_secs().max(1))
775}
776
777/// Maximum backoff duration for poll errors (1 minute).
778const MAX_POLL_BACKOFF_SECS: u64 = 60;
779
780/// Number of consecutive errors between recovery probes at the backoff ceiling.
781/// Once the backoff reaches `MAX_POLL_BACKOFF_SECS`, every Nth error cycle uses
782/// the base interval (5s) to quickly detect when the SQS endpoint recovers.
783const RECOVERY_PROBE_EVERY: u32 = 4;
784
785/// Computes exponential backoff for consecutive poll errors with recovery probes.
786///
787/// Returns: 5, 10, 20, 40, 60, 60, 60, **5** (probe), 60, 60, 60, **5**, ...
788fn poll_error_backoff_secs(consecutive_errors: u32) -> u64 {
789    let base: u64 = 5;
790
791    // Once well past the ceiling, periodically try the base interval
792    // to quickly detect when the SQS endpoint recovers.
793    if consecutive_errors >= 7 && consecutive_errors % RECOVERY_PROBE_EVERY == 0 {
794        return base;
795    }
796
797    let exponent = consecutive_errors.saturating_sub(1).min(16);
798    base.saturating_mul(2_u64.saturating_pow(exponent))
799        .min(MAX_POLL_BACKOFF_SECS)
800}
801
802/// Deletes messages from SQS in batches of up to 10 (the SQS maximum per call).
803///
804/// Returns the total number of successfully deleted messages. Any per-entry
805/// failures are logged as warnings — SQS will redeliver those messages after
806/// the visibility timeout expires.
807async fn flush_delete_batch(
808    sqs_client: &aws_sdk_sqs::Client,
809    queue_url: &str,
810    batch: &[String],
811    queue_type: QueueType,
812) -> usize {
813    if batch.is_empty() {
814        return 0;
815    }
816
817    let mut deleted = 0;
818
819    for chunk in batch.chunks(10) {
820        let entries: Vec<DeleteMessageBatchRequestEntry> = chunk
821            .iter()
822            .enumerate()
823            .map(|(i, handle)| {
824                DeleteMessageBatchRequestEntry::builder()
825                    .id(i.to_string())
826                    .receipt_handle(handle)
827                    .build()
828                    .expect("id and receipt_handle are always set")
829            })
830            .collect();
831
832        match sqs_client
833            .delete_message_batch()
834            .queue_url(queue_url)
835            .set_entries(Some(entries))
836            .send()
837            .await
838        {
839            Ok(output) => {
840                deleted += output.successful().len();
841
842                for f in output.failed() {
843                    warn!(
844                        queue_type = ?queue_type,
845                        id = %f.id(),
846                        code = %f.code(),
847                        message = f.message().unwrap_or("unknown"),
848                        "Batch delete entry failed (message will be redelivered)"
849                    );
850                }
851            }
852            Err(e) => {
853                error!(
854                    queue_type = ?queue_type,
855                    error = %e,
856                    batch_size = chunk.len(),
857                    "Batch delete API call failed (messages will be redelivered)"
858                );
859            }
860        }
861    }
862
863    deleted
864}
865
866#[cfg(test)]
867mod tests {
868    use super::*;
869
870    #[test]
871    fn test_get_concurrency_for_queue() {
872        // Test that concurrency is retrieved (exact value depends on env)
873        let concurrency = get_concurrency_for_queue(QueueType::TransactionRequest);
874        assert!(concurrency > 0);
875
876        let concurrency = get_concurrency_for_queue(QueueType::StatusCheck);
877        assert!(concurrency > 0);
878    }
879
880    #[test]
881    fn test_handler_timeout_secs_is_positive() {
882        let all = [
883            QueueType::TransactionRequest,
884            QueueType::TransactionSubmission,
885            QueueType::StatusCheck,
886            QueueType::StatusCheckEvm,
887            QueueType::StatusCheckStellar,
888            QueueType::Notification,
889            QueueType::TokenSwapRequest,
890            QueueType::RelayerHealthCheck,
891        ];
892        for queue_type in all {
893            assert!(handler_timeout_secs(queue_type) > 0);
894        }
895    }
896
897    #[test]
898    fn test_handler_timeout_secs_uses_visibility_timeout() {
899        assert_eq!(
900            handler_timeout_secs(QueueType::StatusCheckEvm),
901            QueueType::StatusCheckEvm.visibility_timeout_secs() as u64
902        );
903        assert_eq!(
904            handler_timeout_secs(QueueType::Notification),
905            QueueType::Notification.visibility_timeout_secs() as u64
906        );
907    }
908
909    #[test]
910    fn test_parse_target_scheduled_on() {
911        // Test parsing target_scheduled_on from message attributes
912        let message = Message::builder().build();
913
914        // Message without attribute should return None
915        assert_eq!(parse_target_scheduled_on(&message), None);
916
917        // Message with valid attribute
918        let message = Message::builder()
919            .message_attributes(
920                "target_scheduled_on",
921                MessageAttributeValue::builder()
922                    .data_type("Number")
923                    .string_value("1234567890")
924                    .build()
925                    .unwrap(),
926            )
927            .build();
928
929        assert_eq!(parse_target_scheduled_on(&message), Some(1234567890));
930    }
931
932    #[test]
933    fn test_parse_retry_attempt() {
934        let message = Message::builder().build();
935        assert_eq!(parse_retry_attempt(&message), None);
936
937        let message = Message::builder()
938            .message_attributes(
939                "retry_attempt",
940                MessageAttributeValue::builder()
941                    .data_type("Number")
942                    .string_value("7")
943                    .build()
944                    .unwrap(),
945            )
946            .build();
947        assert_eq!(parse_retry_attempt(&message), Some(7));
948    }
949
950    #[test]
951    fn test_map_handler_error() {
952        // Test Abort maps to Permanent
953        let error = HandlerError::Abort("Validation failed".to_string());
954        let result = map_handler_error(error);
955        assert!(matches!(result, ProcessingError::Permanent(_)));
956
957        // Test Retry maps to Retryable
958        let error = HandlerError::Retry("Network timeout".to_string());
959        let result = map_handler_error(error);
960        assert!(matches!(result, ProcessingError::Retryable(_)));
961    }
962
963    #[test]
964    fn test_is_fifo_queue_url() {
965        assert!(is_fifo_queue_url(
966            "https://sqs.us-east-1.amazonaws.com/123/queue.fifo"
967        ));
968        assert!(!is_fifo_queue_url(
969            "https://sqs.us-east-1.amazonaws.com/123/queue"
970        ));
971    }
972
973    #[test]
974    fn test_compute_status_retry_delay_evm() {
975        // NetworkType uses #[serde(rename_all = "lowercase")]
976        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"evm"}}"#;
977        assert_eq!(compute_status_retry_delay(body, 0), 8);
978        assert_eq!(compute_status_retry_delay(body, 1), 12);
979        assert_eq!(compute_status_retry_delay(body, 8), 12);
980    }
981
982    #[test]
983    fn test_compute_status_retry_delay_stellar() {
984        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"stellar"}}"#;
985        assert_eq!(compute_status_retry_delay(body, 0), 2);
986        assert_eq!(compute_status_retry_delay(body, 1), 3);
987        assert_eq!(compute_status_retry_delay(body, 8), 3);
988    }
989
990    #[test]
991    fn test_compute_status_retry_delay_solana() {
992        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"solana"}}"#;
993        assert_eq!(compute_status_retry_delay(body, 0), 5);
994        assert_eq!(compute_status_retry_delay(body, 1), 8);
995        assert_eq!(compute_status_retry_delay(body, 8), 8);
996    }
997
998    #[test]
999    fn test_compute_status_retry_delay_missing_network() {
1000        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1"}}"#;
1001        assert_eq!(compute_status_retry_delay(body, 0), 5);
1002        assert_eq!(compute_status_retry_delay(body, 1), 8);
1003        assert_eq!(compute_status_retry_delay(body, 8), 8);
1004    }
1005
1006    #[test]
1007    fn test_compute_status_retry_delay_invalid_body() {
1008        assert_eq!(compute_status_retry_delay("not json", 0), 5);
1009        assert_eq!(compute_status_retry_delay("not json", 1), 8);
1010        assert_eq!(compute_status_retry_delay("not json", 8), 8);
1011    }
1012
1013    #[tokio::test]
1014    async fn test_semaphore_released_on_panic() {
1015        let sem = Arc::new(tokio::sync::Semaphore::new(1));
1016        let permit = sem.clone().acquire_owned().await.unwrap();
1017
1018        let handle = tokio::spawn(async move {
1019            let _permit = permit; // dropped on scope exit, even after panic
1020            let _ = AssertUnwindSafe(async { panic!("test panic") })
1021                .catch_unwind()
1022                .await;
1023        });
1024
1025        handle.await.unwrap();
1026        // Would hang forever if permit leaked
1027        let _p = tokio::time::timeout(Duration::from_millis(100), sem.acquire())
1028            .await
1029            .expect("permit should be available after panic");
1030    }
1031
1032    #[test]
1033    fn test_poll_error_backoff_secs() {
1034        // First error: 5s
1035        assert_eq!(poll_error_backoff_secs(1), 5);
1036        // Second: 10s
1037        assert_eq!(poll_error_backoff_secs(2), 10);
1038        // Third: 20s
1039        assert_eq!(poll_error_backoff_secs(3), 20);
1040        // Fourth: 40s
1041        assert_eq!(poll_error_backoff_secs(4), 40);
1042        // Capped at MAX_POLL_BACKOFF_SECS (60)
1043        assert_eq!(poll_error_backoff_secs(5), 60);
1044        assert_eq!(poll_error_backoff_secs(6), 60);
1045        assert_eq!(poll_error_backoff_secs(7), 60);
1046        // Recovery probe: base interval at multiples of RECOVERY_PROBE_EVERY (>= 7)
1047        assert_eq!(poll_error_backoff_secs(8), 5);
1048        assert_eq!(poll_error_backoff_secs(9), 60);
1049        assert_eq!(poll_error_backoff_secs(12), 5); // next probe
1050    }
1051
1052    #[test]
1053    fn test_poll_error_backoff_zero_errors() {
1054        // Zero consecutive errors should still produce a reasonable value
1055        assert_eq!(poll_error_backoff_secs(0), 5);
1056    }
1057
1058    #[test]
1059    fn test_poll_error_backoff_recovery_probes() {
1060        // Verify probes repeat at regular intervals once past threshold
1061        for i in (8..=100).step_by(RECOVERY_PROBE_EVERY as usize) {
1062            assert_eq!(
1063                poll_error_backoff_secs(i as u32),
1064                5,
1065                "Expected recovery probe at error {i}"
1066            );
1067        }
1068    }
1069
1070    #[test]
1071    fn test_message_outcome_delete_carries_receipt_handle() {
1072        let handle = "test-receipt-handle-123".to_string();
1073        let outcome = MessageOutcome::Delete {
1074            receipt_handle: handle.clone(),
1075        };
1076        match outcome {
1077            MessageOutcome::Delete { receipt_handle } => {
1078                assert_eq!(receipt_handle, handle);
1079            }
1080            MessageOutcome::Retain => panic!("Expected Delete variant"),
1081        }
1082    }
1083
1084    #[test]
1085    fn test_message_outcome_retain() {
1086        let outcome = MessageOutcome::Retain;
1087        assert!(matches!(outcome, MessageOutcome::Retain));
1088    }
1089
1090    #[test]
1091    fn test_batch_delete_entry_builder() {
1092        // Verify DeleteMessageBatchRequestEntry builds correctly with sequential IDs,
1093        // matching the pattern used in flush_delete_batch.
1094        let handles = vec![
1095            "receipt-0".to_string(),
1096            "receipt-1".to_string(),
1097            "receipt-2".to_string(),
1098        ];
1099        let entries: Vec<DeleteMessageBatchRequestEntry> = handles
1100            .iter()
1101            .enumerate()
1102            .map(|(i, handle)| {
1103                DeleteMessageBatchRequestEntry::builder()
1104                    .id(i.to_string())
1105                    .receipt_handle(handle)
1106                    .build()
1107                    .expect("id and receipt_handle are set")
1108            })
1109            .collect();
1110
1111        assert_eq!(entries.len(), 3);
1112        assert_eq!(entries[0].id(), "0");
1113        assert_eq!(entries[0].receipt_handle(), "receipt-0");
1114        assert_eq!(entries[2].id(), "2");
1115        assert_eq!(entries[2].receipt_handle(), "receipt-2");
1116    }
1117
1118    #[test]
1119    fn test_batch_chunking_logic() {
1120        // Verify that chunks(10) correctly splits receipt handles,
1121        // matching the pattern used in flush_delete_batch.
1122        let handles: Vec<String> = (0..25).map(|i| format!("receipt-{i}")).collect();
1123        let chunks: Vec<&[String]> = handles.chunks(10).collect();
1124
1125        assert_eq!(chunks.len(), 3);
1126        assert_eq!(chunks[0].len(), 10);
1127        assert_eq!(chunks[1].len(), 10);
1128        assert_eq!(chunks[2].len(), 5);
1129    }
1130
1131    #[test]
1132    fn test_outcome_collection_pattern() {
1133        // Verify the pattern used in the main loop to collect receipt handles
1134        // from a mix of Delete and Retain outcomes.
1135        let outcomes = vec![
1136            Some("receipt-1".to_string()), // Delete
1137            None,                          // Retain
1138            Some("receipt-2".to_string()), // Delete
1139            None,                          // Retain
1140            Some("receipt-3".to_string()), // Delete
1141        ];
1142
1143        let pending_deletes: Vec<String> = outcomes.into_iter().flatten().collect();
1144
1145        assert_eq!(pending_deletes.len(), 3);
1146        assert_eq!(pending_deletes[0], "receipt-1");
1147        assert_eq!(pending_deletes[1], "receipt-2");
1148        assert_eq!(pending_deletes[2], "receipt-3");
1149    }
1150
1151    // ── parse_target_scheduled_on: edge cases ─────────────────────────
1152
1153    #[test]
1154    fn test_parse_target_scheduled_on_non_numeric_string() {
1155        let message = Message::builder()
1156            .message_attributes(
1157                "target_scheduled_on",
1158                MessageAttributeValue::builder()
1159                    .data_type("String")
1160                    .string_value("not-a-number")
1161                    .build()
1162                    .unwrap(),
1163            )
1164            .build();
1165        assert_eq!(parse_target_scheduled_on(&message), None);
1166    }
1167
1168    #[test]
1169    fn test_parse_target_scheduled_on_empty_string() {
1170        let message = Message::builder()
1171            .message_attributes(
1172                "target_scheduled_on",
1173                MessageAttributeValue::builder()
1174                    .data_type("Number")
1175                    .string_value("")
1176                    .build()
1177                    .unwrap(),
1178            )
1179            .build();
1180        assert_eq!(parse_target_scheduled_on(&message), None);
1181    }
1182
1183    #[test]
1184    fn test_parse_target_scheduled_on_negative_value() {
1185        let message = Message::builder()
1186            .message_attributes(
1187                "target_scheduled_on",
1188                MessageAttributeValue::builder()
1189                    .data_type("Number")
1190                    .string_value("-1000")
1191                    .build()
1192                    .unwrap(),
1193            )
1194            .build();
1195        // Negative values parse fine as i64
1196        assert_eq!(parse_target_scheduled_on(&message), Some(-1000));
1197    }
1198
1199    #[test]
1200    fn test_parse_target_scheduled_on_float_string() {
1201        let message = Message::builder()
1202            .message_attributes(
1203                "target_scheduled_on",
1204                MessageAttributeValue::builder()
1205                    .data_type("Number")
1206                    .string_value("1234567890.5")
1207                    .build()
1208                    .unwrap(),
1209            )
1210            .build();
1211        // Floats can't parse as i64
1212        assert_eq!(parse_target_scheduled_on(&message), None);
1213    }
1214
1215    #[test]
1216    fn test_parse_target_scheduled_on_zero() {
1217        let message = Message::builder()
1218            .message_attributes(
1219                "target_scheduled_on",
1220                MessageAttributeValue::builder()
1221                    .data_type("Number")
1222                    .string_value("0")
1223                    .build()
1224                    .unwrap(),
1225            )
1226            .build();
1227        assert_eq!(parse_target_scheduled_on(&message), Some(0));
1228    }
1229
1230    #[test]
1231    fn test_parse_target_scheduled_on_wrong_attribute_name() {
1232        // Attribute exists but under a different key
1233        let message = Message::builder()
1234            .message_attributes(
1235                "wrong_key",
1236                MessageAttributeValue::builder()
1237                    .data_type("Number")
1238                    .string_value("1234567890")
1239                    .build()
1240                    .unwrap(),
1241            )
1242            .build();
1243        assert_eq!(parse_target_scheduled_on(&message), None);
1244    }
1245
1246    // ── parse_retry_attempt: edge cases ───────────────────────────────
1247
1248    #[test]
1249    fn test_parse_retry_attempt_non_numeric_string() {
1250        let message = Message::builder()
1251            .message_attributes(
1252                "retry_attempt",
1253                MessageAttributeValue::builder()
1254                    .data_type("String")
1255                    .string_value("abc")
1256                    .build()
1257                    .unwrap(),
1258            )
1259            .build();
1260        assert_eq!(parse_retry_attempt(&message), None);
1261    }
1262
1263    #[test]
1264    fn test_parse_retry_attempt_negative_value() {
1265        let message = Message::builder()
1266            .message_attributes(
1267                "retry_attempt",
1268                MessageAttributeValue::builder()
1269                    .data_type("Number")
1270                    .string_value("-1")
1271                    .build()
1272                    .unwrap(),
1273            )
1274            .build();
1275        // Negative values can't parse as usize
1276        assert_eq!(parse_retry_attempt(&message), None);
1277    }
1278
1279    #[test]
1280    fn test_parse_retry_attempt_zero() {
1281        let message = Message::builder()
1282            .message_attributes(
1283                "retry_attempt",
1284                MessageAttributeValue::builder()
1285                    .data_type("Number")
1286                    .string_value("0")
1287                    .build()
1288                    .unwrap(),
1289            )
1290            .build();
1291        assert_eq!(parse_retry_attempt(&message), Some(0));
1292    }
1293
1294    #[test]
1295    fn test_parse_retry_attempt_large_value() {
1296        let message = Message::builder()
1297            .message_attributes(
1298                "retry_attempt",
1299                MessageAttributeValue::builder()
1300                    .data_type("Number")
1301                    .string_value("999999")
1302                    .build()
1303                    .unwrap(),
1304            )
1305            .build();
1306        assert_eq!(parse_retry_attempt(&message), Some(999999));
1307    }
1308
1309    // ── is_fifo_queue_url: comprehensive cases ────────────────────────
1310
1311    #[test]
1312    fn test_is_fifo_queue_url_empty_string() {
1313        assert!(!is_fifo_queue_url(""));
1314    }
1315
1316    #[test]
1317    fn test_is_fifo_queue_url_just_fifo_suffix() {
1318        assert!(is_fifo_queue_url("my-queue.fifo"));
1319    }
1320
1321    #[test]
1322    fn test_is_fifo_queue_url_fifo_in_middle() {
1323        // .fifo appearing in the path but not as suffix
1324        assert!(!is_fifo_queue_url(
1325            "https://sqs.us-east-1.amazonaws.com/123/.fifo/queue"
1326        ));
1327    }
1328
1329    #[test]
1330    fn test_is_fifo_queue_url_case_sensitive() {
1331        assert!(!is_fifo_queue_url(
1332            "https://sqs.us-east-1.amazonaws.com/123/queue.FIFO"
1333        ));
1334        assert!(!is_fifo_queue_url(
1335            "https://sqs.us-east-1.amazonaws.com/123/queue.Fifo"
1336        ));
1337    }
1338
1339    #[test]
1340    fn test_is_fifo_queue_url_standard_queue_variations() {
1341        assert!(!is_fifo_queue_url(
1342            "https://sqs.us-east-1.amazonaws.com/123456789/my-queue"
1343        ));
1344        assert!(!is_fifo_queue_url(
1345            "https://sqs.eu-west-1.amazonaws.com/123456789/relayer-tx-request"
1346        ));
1347        assert!(!is_fifo_queue_url(
1348            "http://localhost:4566/000000000000/test-queue"
1349        ));
1350    }
1351
1352    #[test]
1353    fn test_is_fifo_queue_url_localstack() {
1354        // LocalStack FIFO queue URL format
1355        assert!(is_fifo_queue_url(
1356            "http://localhost:4566/000000000000/test-queue.fifo"
1357        ));
1358    }
1359
1360    // ── map_handler_error: message preservation ───────────────────────
1361
1362    #[test]
1363    fn test_map_handler_error_preserves_abort_message() {
1364        let msg = "Validation failed: invalid nonce";
1365        let error = HandlerError::Abort(msg.to_string());
1366        match map_handler_error(error) {
1367            ProcessingError::Permanent(s) => assert_eq!(s, msg),
1368            ProcessingError::Retryable(_) => panic!("Expected Permanent"),
1369        }
1370    }
1371
1372    #[test]
1373    fn test_map_handler_error_preserves_retry_message() {
1374        let msg = "RPC timeout after 30s";
1375        let error = HandlerError::Retry(msg.to_string());
1376        match map_handler_error(error) {
1377            ProcessingError::Retryable(s) => assert_eq!(s, msg),
1378            ProcessingError::Permanent(_) => panic!("Expected Retryable"),
1379        }
1380    }
1381
1382    #[test]
1383    fn test_map_handler_error_empty_message() {
1384        let error = HandlerError::Abort(String::new());
1385        match map_handler_error(error) {
1386            ProcessingError::Permanent(s) => assert!(s.is_empty()),
1387            ProcessingError::Retryable(_) => panic!("Expected Permanent"),
1388        }
1389    }
1390
1391    // ── handler_timeout_secs: all queue types ─────────────────────────
1392
1393    #[test]
1394    fn test_handler_timeout_secs_matches_visibility_timeout_for_all_queues() {
1395        let all = [
1396            QueueType::TransactionRequest,
1397            QueueType::TransactionSubmission,
1398            QueueType::StatusCheck,
1399            QueueType::StatusCheckEvm,
1400            QueueType::StatusCheckStellar,
1401            QueueType::Notification,
1402            QueueType::TokenSwapRequest,
1403            QueueType::RelayerHealthCheck,
1404        ];
1405        for qt in all {
1406            assert_eq!(
1407                handler_timeout_secs(qt),
1408                qt.visibility_timeout_secs().max(1) as u64,
1409                "{qt:?}: handler timeout should equal max(visibility_timeout, 1)"
1410            );
1411        }
1412    }
1413
1414    // ── get_concurrency_for_queue: all queue types ────────────────────
1415
1416    #[test]
1417    fn test_get_concurrency_for_queue_all_types_positive() {
1418        let all = [
1419            QueueType::TransactionRequest,
1420            QueueType::TransactionSubmission,
1421            QueueType::StatusCheck,
1422            QueueType::StatusCheckEvm,
1423            QueueType::StatusCheckStellar,
1424            QueueType::Notification,
1425            QueueType::TokenSwapRequest,
1426            QueueType::RelayerHealthCheck,
1427        ];
1428        for qt in all {
1429            assert!(
1430                get_concurrency_for_queue(qt) > 0,
1431                "{qt:?}: concurrency must be positive (clamped to at least 1)"
1432            );
1433        }
1434    }
1435
1436    // ── poll_error_backoff_secs: overflow and invariants ───────────────
1437
1438    #[test]
1439    fn test_poll_error_backoff_never_exceeds_max() {
1440        for i in 0..200 {
1441            let backoff = poll_error_backoff_secs(i);
1442            assert!(
1443                backoff <= MAX_POLL_BACKOFF_SECS,
1444                "Error count {i}: backoff {backoff}s exceeds MAX {MAX_POLL_BACKOFF_SECS}s"
1445            );
1446        }
1447    }
1448
1449    #[test]
1450    fn test_poll_error_backoff_u32_max_does_not_overflow() {
1451        let backoff = poll_error_backoff_secs(u32::MAX);
1452        assert!(backoff <= MAX_POLL_BACKOFF_SECS);
1453        assert!(backoff > 0);
1454    }
1455
1456    #[test]
1457    fn test_poll_error_backoff_always_positive() {
1458        for i in 0..200 {
1459            assert!(
1460                poll_error_backoff_secs(i) > 0,
1461                "Error count {i}: backoff must be positive"
1462            );
1463        }
1464    }
1465
1466    #[test]
1467    fn test_poll_error_backoff_monotonic_before_cap() {
1468        // Before hitting the cap, backoff should be non-decreasing
1469        let mut prev = poll_error_backoff_secs(0);
1470        for i in 1..=4 {
1471            let curr = poll_error_backoff_secs(i);
1472            assert!(
1473                curr >= prev,
1474                "Backoff should be non-decreasing before cap: {prev} -> {curr} at error {i}"
1475            );
1476            prev = curr;
1477        }
1478    }
1479
1480    // ── Constants validation ──────────────────────────────────────────
1481
1482    #[test]
1483    fn test_max_poll_backoff_is_reasonable() {
1484        assert!(
1485            MAX_POLL_BACKOFF_SECS >= 10,
1486            "Max backoff should be at least 10s to avoid tight error loops"
1487        );
1488        assert!(
1489            MAX_POLL_BACKOFF_SECS <= 300,
1490            "Max backoff should be at most 5 minutes to detect recovery promptly"
1491        );
1492    }
1493
1494    #[test]
1495    fn test_recovery_probe_every_is_valid() {
1496        assert!(
1497            RECOVERY_PROBE_EVERY >= 2,
1498            "Recovery probe interval must be at least 2 to avoid probing every attempt"
1499        );
1500        assert!(
1501            RECOVERY_PROBE_EVERY <= 10,
1502            "Recovery probe interval should not be too large or recovery detection is slow"
1503        );
1504    }
1505
1506    // ── compute_status_retry_delay: edge cases ────────────────────────
1507
1508    #[test]
1509    fn test_compute_status_retry_delay_very_high_attempt() {
1510        let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"evm"}}"#;
1511        // Very high attempts should stay capped at the max (12s for EVM)
1512        assert_eq!(compute_status_retry_delay(body, 1000), 12);
1513        assert_eq!(compute_status_retry_delay(body, usize::MAX), 12);
1514    }
1515
1516    #[test]
1517    fn test_compute_status_retry_delay_empty_body() {
1518        // Empty JSON body should fall back to generic/Solana defaults
1519        assert_eq!(compute_status_retry_delay("", 0), 5);
1520        assert_eq!(compute_status_retry_delay("{}", 0), 5);
1521    }
1522
1523    #[test]
1524    fn test_compute_status_retry_delay_partial_json() {
1525        // JSON with missing inner structure
1526        assert_eq!(compute_status_retry_delay(r#"{"data":{}}"#, 0), 5);
1527        assert_eq!(
1528            compute_status_retry_delay(r#"{"data":{"network_type":"evm"}}"#, 0),
1529            8
1530        );
1531    }
1532
1533    // ── PartialStatusCheckJob deserialization ──────────────────────────
1534
1535    #[test]
1536    fn test_partial_status_check_job_deserializes_network_type() {
1537        let body = r#"{"data":{"network_type":"evm","extra_field":"ignored"}}"#;
1538        let parsed: PartialStatusCheckJob = serde_json::from_str(body).unwrap();
1539        assert_eq!(
1540            parsed.data.network_type,
1541            Some(crate::models::NetworkType::Evm)
1542        );
1543    }
1544
1545    #[test]
1546    fn test_partial_status_check_job_handles_missing_network_type() {
1547        let body = r#"{"data":{"transaction_id":"tx1"}}"#;
1548        let parsed: PartialStatusCheckJob = serde_json::from_str(body).unwrap();
1549        assert_eq!(parsed.data.network_type, None);
1550    }
1551
1552    #[test]
1553    fn test_partial_status_check_job_rejects_missing_data() {
1554        let body = r#"{"not_data":{}}"#;
1555        let result = serde_json::from_str::<PartialStatusCheckJob>(body);
1556        assert!(result.is_err());
1557    }
1558
1559    // ── is_fifo_queue_url used consistently ───────────────────────────
1560
1561    #[test]
1562    fn test_fifo_detection_consistent_with_defer_and_retry_logic() {
1563        // Both defer_message and the retry path in process_message use
1564        // is_fifo_queue_url to decide between visibility-timeout vs re-enqueue.
1565        // Verify our standard and FIFO URLs are classified identically by both
1566        // call sites (they both call the same function).
1567        let standard = "https://sqs.us-east-1.amazonaws.com/123/relayer-status-check";
1568        let fifo = "https://sqs.us-east-1.amazonaws.com/123/relayer-status-check.fifo";
1569
1570        assert!(!is_fifo_queue_url(standard));
1571        assert!(is_fifo_queue_url(fifo));
1572    }
1573}