openzeppelin_relayer/queues/sqs/
backend.rs

1//! AWS SQS backend implementation.
2//!
3//! This module provides an AWS SQS-backed implementation of the QueueBackend trait.
4//! Supports both Standard and FIFO queues. By default (`SQS_QUEUE_TYPE=auto`),
5//! the queue type is auto-detected at startup by probing a reference queue.
6//! Can also be set explicitly to `standard` or `fifo`.
7
8use async_trait::async_trait;
9use aws_sdk_sqs::types::MessageAttributeValue;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::SystemTime;
13use tokio::sync::watch;
14use tracing::{debug, error, info, warn};
15
16use crate::{
17    config::ServerConfig,
18    jobs::{
19        Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
20        TransactionSend, TransactionStatusCheck,
21    },
22    models::{DefaultAppState, NetworkType},
23    queues::QueueBackendType,
24};
25use actix_web::web::ThinData;
26
27use super::{QueueBackend, QueueBackendError, QueueHealth, QueueType, WorkerHandle};
28
29/// SQS maximum message body size (256 KB).
30const SQS_MAX_MESSAGE_SIZE_BYTES: usize = 256 * 1024;
31
32/// Chooses the FIFO message group ID based on network type.
33///
34/// EVM requires per-relayer ordering (nonce management), so uses `relayer_id`.
35/// Non-EVM networks (Stellar, Solana) can safely parallelize per transaction,
36/// so uses `transaction_id` for better throughput.
37/// Falls back to `relayer_id` when network type is unknown (conservative/safe).
38fn transaction_message_group_id(
39    network_type: Option<&NetworkType>,
40    _relayer_id: &str,
41    transaction_id: &str,
42) -> String {
43    match network_type {
44        Some(_) | None => transaction_id.to_string(),
45    }
46}
47
48/// Selects the status-check queue for a given network type.
49///
50/// EVM and Stellar use dedicated queues; all other/unknown network types use
51/// the generic status-check queue.
52fn status_check_queue_type(network_type: Option<&NetworkType>) -> QueueType {
53    match network_type {
54        Some(NetworkType::Evm) => QueueType::StatusCheckEvm,
55        Some(NetworkType::Stellar) => QueueType::StatusCheckStellar,
56        _ => QueueType::StatusCheck,
57    }
58}
59
60/// AWS SQS backend for job queue operations.
61///
62/// Supports both Standard and FIFO queues (auto-detected at startup, or set via `SQS_QUEUE_TYPE`).
63/// FIFO mode provides message ordering and exactly-once delivery;
64/// Standard mode offers higher throughput and native per-message delays.
65#[derive(Clone)]
66pub struct SqsBackend {
67    /// AWS SQS client for all operations (send, delete, poll, change visibility)
68    sqs_client: aws_sdk_sqs::Client,
69    /// Mapping of queue types to SQS queue URLs
70    queue_urls: HashMap<QueueType, String>,
71    /// Cached DLQ URLs resolved once at startup, keyed by the source queue type.
72    /// Avoids repeated `get_queue_url` calls on every health check.
73    dlq_urls: HashMap<QueueType, String>,
74    /// AWS region
75    region: String,
76    /// Shutdown signal sender — sending `true` tells all workers and cron tasks to stop
77    shutdown_tx: Arc<watch::Sender<bool>>,
78}
79
80impl std::fmt::Debug for SqsBackend {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        f.debug_struct("SqsBackend")
83            .field("backend_type", &"sqs")
84            .field("region", &self.region)
85            .field("queue_count", &self.queue_urls.len())
86            .finish()
87    }
88}
89
90/// Resolves the queue type from the configured `SQS_QUEUE_TYPE` value and
91/// probe results.
92///
93/// - `"standard"` / `"fifo"` → returns immediately (probes ignored).
94/// - `"auto"` → decides based on which probe succeeded.
95/// - anything else → error.
96///
97/// `probe_results` is `Option<(bool, bool)>`: `Some((standard_ok, fifo_ok))`
98/// when `sqs_queue_type == "auto"`, `None` otherwise.
99fn resolve_queue_type(
100    sqs_queue_type: &str,
101    probe_results: Option<(bool, bool)>,
102    ref_standard_url: &str,
103    ref_fifo_url: &str,
104) -> Result<bool, QueueBackendError> {
105    match sqs_queue_type {
106        "standard" => {
107            info!("Using explicit SQS queue type: standard");
108            Ok(false)
109        }
110        "fifo" => {
111            info!("Using explicit SQS queue type: fifo");
112            Ok(true)
113        }
114        "auto" => {
115            let (standard_exists, fifo_exists) = probe_results.unwrap_or((false, false));
116            match (standard_exists, fifo_exists) {
117                (true, false) => {
118                    info!("Detected SQS queue type: standard");
119                    Ok(false)
120                }
121                (false, true) => {
122                    info!("Detected SQS queue type: fifo");
123                    Ok(true)
124                }
125                (true, true) => Err(QueueBackendError::ConfigError(
126                    "Ambiguous SQS queue type: both standard and FIFO \
127                     'transaction-request' queues exist. Remove one set or set \
128                     SQS_QUEUE_TYPE explicitly."
129                        .to_string(),
130                )),
131                (false, false) => Err(QueueBackendError::ConfigError(format!(
132                    "No SQS queues found. Neither '{ref_standard_url}' nor \
133                     '{ref_fifo_url}' is accessible. Create queues before starting \
134                     the relayer, or set SQS_QUEUE_TYPE explicitly."
135                ))),
136            }
137        }
138        other => Err(QueueBackendError::ConfigError(format!(
139            "Unsupported SQS_QUEUE_TYPE: '{other}'. Must be 'auto', 'standard', or 'fifo'."
140        ))),
141    }
142}
143
144impl SqsBackend {
145    fn is_fifo_queue_url(queue_url: &str) -> bool {
146        queue_url.ends_with(".fifo")
147    }
148
149    /// Creates a new SQS backend.
150    ///
151    /// Loads AWS configuration from environment and builds queue URLs.
152    /// Queue type is determined by `SQS_QUEUE_TYPE`:
153    /// - `auto` (default): probes a reference queue at startup to detect the type
154    /// - `standard` / `fifo`: uses the specified type directly, skipping probing
155    ///
156    /// # Environment Variables
157    /// - `AWS_REGION` - AWS region (required)
158    /// - `SQS_QUEUE_URL_PREFIX` - Optional custom prefix
159    /// - `AWS_ACCOUNT_ID` - Required only when `SQS_QUEUE_URL_PREFIX` is not set
160    /// - `SQS_QUEUE_TYPE` - Queue type: `auto` (default), `standard`, or `fifo`
161    ///
162    /// # Errors
163    /// Returns ConfigError if required environment variables are missing or
164    /// if queue type cannot be determined (no queues found, or both types exist).
165    pub async fn new() -> Result<Self, QueueBackendError> {
166        info!("Initializing SQS queue backend");
167
168        // Load AWS config from environment
169        let config = aws_config::load_from_env().await;
170        let sqs_client = aws_sdk_sqs::Client::new(&config);
171        let region = config
172            .region()
173            .ok_or_else(|| {
174                QueueBackendError::ConfigError(
175                    "AWS_REGION not set. Required for SQS backend.".to_string(),
176                )
177            })?
178            .to_string();
179
180        // Build queue URL prefix.
181        // If an explicit prefix is provided, avoid forcing AWS_ACCOUNT_ID.
182        let prefix = match std::env::var("SQS_QUEUE_URL_PREFIX") {
183            Ok(prefix) => prefix,
184            Err(_) => {
185                let account_id =
186                    ServerConfig::get_aws_account_id().map_err(QueueBackendError::ConfigError)?;
187                format!("https://sqs.{region}.amazonaws.com/{account_id}/relayer-")
188            }
189        };
190        info!(
191            region = %region,
192            queue_url_prefix = %prefix,
193            "Resolved SQS queue URL prefix"
194        );
195
196        // Determine queue type: explicit override or auto-detect by probing.
197        let sqs_queue_type = ServerConfig::get_sqs_queue_type().to_lowercase();
198        let ref_standard_url = format!("{prefix}transaction-request");
199        let ref_fifo_url = format!("{prefix}transaction-request.fifo");
200
201        // Only probe when auto-detecting; explicit values skip the network call.
202        let probe_results = if sqs_queue_type == "auto" {
203            let (standard_probe, fifo_probe) = {
204                let client_s = sqs_client.clone();
205                let client_f = sqs_client.clone();
206                let url_s = ref_standard_url.clone();
207                let url_f = ref_fifo_url.clone();
208                tokio::join!(
209                    async move {
210                        client_s
211                            .get_queue_attributes()
212                            .queue_url(&url_s)
213                            .attribute_names(aws_sdk_sqs::types::QueueAttributeName::QueueArn)
214                            .send()
215                            .await
216                    },
217                    async move {
218                        client_f
219                            .get_queue_attributes()
220                            .queue_url(&url_f)
221                            .attribute_names(aws_sdk_sqs::types::QueueAttributeName::QueueArn)
222                            .send()
223                            .await
224                    }
225                )
226            };
227            Some((standard_probe.is_ok(), fifo_probe.is_ok()))
228        } else {
229            None
230        };
231
232        let is_fifo = resolve_queue_type(
233            &sqs_queue_type,
234            probe_results,
235            &ref_standard_url,
236            &ref_fifo_url,
237        )?;
238        let suffix = if is_fifo { ".fifo" } else { "" };
239
240        // Build queue URL mapping.
241        // Status checks use per-network queues (EVM, Stellar, generic/Solana)
242        // to match the Redis backend's separate worker setup with independent
243        // concurrency pools and network-tuned polling intervals.
244        let queue_urls = HashMap::from([
245            (
246                QueueType::TransactionRequest,
247                format!("{prefix}transaction-request{suffix}"),
248            ),
249            (
250                QueueType::TransactionSubmission,
251                format!("{prefix}transaction-submission{suffix}"),
252            ),
253            (
254                QueueType::StatusCheck,
255                format!("{prefix}status-check{suffix}"),
256            ),
257            (
258                QueueType::StatusCheckEvm,
259                format!("{prefix}status-check-evm{suffix}"),
260            ),
261            (
262                QueueType::StatusCheckStellar,
263                format!("{prefix}status-check-stellar{suffix}"),
264            ),
265            (
266                QueueType::Notification,
267                format!("{prefix}notification{suffix}"),
268            ),
269            (
270                QueueType::TokenSwapRequest,
271                format!("{prefix}token-swap-request{suffix}"),
272            ),
273            (
274                QueueType::RelayerHealthCheck,
275                format!("{prefix}relayer-health-check{suffix}"),
276            ),
277        ]);
278
279        // Fail fast at startup when expected queues are missing/misconfigured.
280        // This avoids silent runtime polling failures and makes infra drift explicit.
281        // Probe all queues concurrently to avoid slow sequential startup under
282        // SQS throttling during scale-out events.
283        let probe_futures: Vec<_> = queue_urls
284            .iter()
285            .map(|(queue_type, queue_url)| {
286                let client = sqs_client.clone();
287                let qt = *queue_type;
288                let url = queue_url.clone();
289                async move {
290                    debug!(
291                        queue_type = %qt,
292                        queue_url = %url,
293                        "Probing SQS queue accessibility at startup"
294                    );
295                    let probe = client
296                        .get_queue_attributes()
297                        .queue_url(&url)
298                        .attribute_names(aws_sdk_sqs::types::QueueAttributeName::QueueArn)
299                        .attribute_names(aws_sdk_sqs::types::QueueAttributeName::RedrivePolicy)
300                        .send()
301                        .await;
302                    (qt, url, probe)
303                }
304            })
305            .collect();
306
307        let probe_results = futures::future::join_all(probe_futures).await;
308
309        let mut missing_queues = Vec::new();
310        let mut dlq_urls: HashMap<QueueType, String> = HashMap::new();
311
312        for (queue_type, queue_url, probe) in probe_results {
313            match probe {
314                Ok(output) => {
315                    debug!(
316                        queue_type = %queue_type,
317                        queue_url = %queue_url,
318                        is_fifo = is_fifo,
319                        "SQS queue probe succeeded"
320                    );
321
322                    // Resolve and cache DLQ URL from the redrive policy while we
323                    // already have the attributes, avoiding per-health-check lookups.
324                    if let Some(dlq_url) =
325                        Self::resolve_dlq_url_from_attrs(&sqs_client, output.attributes()).await
326                    {
327                        dlq_urls.insert(queue_type, dlq_url);
328                    }
329                }
330                Err(err) => {
331                    // Include debug details because Display often collapses to "service error".
332                    error!(
333                        queue_type = %queue_type,
334                        queue_url = %queue_url,
335                        error = ?err,
336                        "SQS queue probe failed"
337                    );
338                    missing_queues.push(format!("{queue_type} ({queue_url}): {err:?}"));
339                }
340            }
341        }
342
343        if !missing_queues.is_empty() {
344            return Err(QueueBackendError::ConfigError(format!(
345                "SQS backend initialization failed. Missing/inaccessible queues: {}",
346                missing_queues.join(", ")
347            )));
348        }
349
350        info!(
351            region = %region,
352            queue_count = queue_urls.len(),
353            "SQS backend initialized"
354        );
355
356        let (shutdown_tx, _) = watch::channel(false);
357
358        Ok(Self {
359            sqs_client,
360            queue_urls,
361            dlq_urls,
362            region,
363            shutdown_tx: Arc::new(shutdown_tx),
364        })
365    }
366
367    /// Sends a message to SQS with FIFO parameters.
368    ///
369    /// # Arguments
370    /// * `queue_url` - SQS queue URL
371    /// * `body` - JSON-serialized job
372    /// * `message_group_id` - FIFO group ID (for ordering)
373    /// * `message_deduplication_id` - Deduplication ID (prevent duplicates)
374    /// * `delay_seconds` - Optional delay (0-900 seconds). Applied only for non-FIFO queues.
375    ///
376    /// # Returns
377    /// SQS message ID on success
378    async fn send_message_to_sqs(
379        &self,
380        queue_url: &str,
381        body: String,
382        message_group_id: String,
383        message_deduplication_id: String,
384        delay_seconds: Option<i32>,
385        target_scheduled_on: Option<i64>,
386    ) -> Result<String, QueueBackendError> {
387        if body.len() > SQS_MAX_MESSAGE_SIZE_BYTES {
388            return Err(QueueBackendError::SqsError(format!(
389                "Message body size ({} bytes) exceeds SQS limit ({} bytes)",
390                body.len(),
391                SQS_MAX_MESSAGE_SIZE_BYTES
392            )));
393        }
394
395        let mut request = self
396            .sqs_client
397            .send_message()
398            .queue_url(queue_url)
399            .message_body(body);
400
401        // FIFO queues require MessageGroupId and MessageDeduplicationId;
402        // standard queues reject these parameters.
403        if Self::is_fifo_queue_url(queue_url) {
404            request = request
405                .message_group_id(message_group_id)
406                .message_deduplication_id(message_deduplication_id);
407        }
408
409        if let Some(timestamp) = target_scheduled_on {
410            request = request.message_attributes(
411                "target_scheduled_on",
412                MessageAttributeValue::builder()
413                    .data_type("Number")
414                    .string_value(timestamp.to_string())
415                    .build()
416                    .map_err(|e| {
417                        QueueBackendError::SqsError(format!(
418                            "Failed to build scheduled-on attribute: {e}"
419                        ))
420                    })?,
421            );
422        }
423
424        // Add delay if specified (max 900 seconds = 15 minutes).
425        // FIFO queues do not support per-message DelaySeconds.
426        if let Some(delay) = delay_seconds {
427            let clamped_delay = delay.clamp(0, 900);
428            if Self::is_fifo_queue_url(queue_url) {
429                debug!(
430                    queue_url = %queue_url,
431                    requested_delay_seconds = delay,
432                    "Skipping per-message DelaySeconds for FIFO queue; worker-side scheduling will enforce target_scheduled_on"
433                );
434            } else {
435                request = request.delay_seconds(clamped_delay);
436                if delay != clamped_delay {
437                    warn!(
438                        requested = delay,
439                        clamped = clamped_delay,
440                        "Delay seconds clamped to SQS limit (0-900)"
441                    );
442                }
443            }
444        }
445
446        let response = request.send().await.map_err(|e| {
447            error!(error = %e, queue_url = %queue_url, "Failed to send message to SQS");
448            QueueBackendError::SqsError(format!("SendMessage failed: {e}"))
449        })?;
450
451        let message_id = response
452            .message_id()
453            .ok_or_else(|| QueueBackendError::SqsError("No message_id returned".to_string()))?
454            .to_string();
455
456        debug!(
457            message_id = %message_id,
458            queue_url = %queue_url,
459            "Message sent to SQS"
460        );
461
462        Ok(message_id)
463    }
464
465    /// Calculates delay in seconds from Unix timestamp.
466    ///
467    /// Returns None if scheduled_on is in the past or None.
468    fn calculate_delay_seconds(scheduled_on: Option<i64>) -> Option<i32> {
469        scheduled_on.and_then(|timestamp| {
470            let now = SystemTime::now()
471                .duration_since(SystemTime::UNIX_EPOCH)
472                .ok()?
473                .as_secs() as i64;
474
475            let delay = timestamp - now;
476            if delay > 0 {
477                Some(delay.min(900) as i32) // SQS max delay: 900 seconds
478            } else {
479                None // Already past scheduled time
480            }
481        })
482    }
483
484    /// Extracts the DLQ ARN from a redrive policy and resolves its queue URL.
485    ///
486    /// Called once at startup so the URL can be cached in `dlq_urls`.
487    async fn resolve_dlq_url_from_attrs(
488        sqs_client: &aws_sdk_sqs::Client,
489        attrs: Option<&HashMap<aws_sdk_sqs::types::QueueAttributeName, String>>,
490    ) -> Option<String> {
491        let redrive_policy =
492            attrs.and_then(|a| a.get(&aws_sdk_sqs::types::QueueAttributeName::RedrivePolicy))?;
493
494        let dlq_arn = serde_json::from_str::<serde_json::Value>(redrive_policy)
495            .ok()
496            .and_then(|v| v.get("deadLetterTargetArn").cloned())
497            .and_then(|v| v.as_str().map(|s| s.to_string()));
498
499        let dlq_name = dlq_arn.as_deref()?.rsplit(':').next()?;
500
501        match sqs_client.get_queue_url().queue_name(dlq_name).send().await {
502            Ok(output) => output.queue_url().map(str::to_string),
503            Err(err) => {
504                warn!(error = %err, dlq_name = %dlq_name, "Failed to resolve DLQ URL at startup");
505                None
506            }
507        }
508    }
509
510    /// Returns the approximate message count for a cached DLQ URL.
511    ///
512    /// Uses URLs resolved and cached at startup, requiring only a single
513    /// `get_queue_attributes` call per health check (no URL resolution).
514    async fn get_dlq_message_count(&self, queue_type: &QueueType) -> u64 {
515        let Some(dlq_url) = self.dlq_urls.get(queue_type) else {
516            return 0;
517        };
518
519        match self
520            .sqs_client
521            .get_queue_attributes()
522            .queue_url(dlq_url)
523            .attribute_names(aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages)
524            .send()
525            .await
526        {
527            Ok(output) => output
528                .attributes()
529                .and_then(|attrs| {
530                    attrs.get(&aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages)
531                })
532                .and_then(|value| value.parse::<u64>().ok())
533                .unwrap_or(0),
534            Err(err) => {
535                warn!(error = %err, dlq_url = %dlq_url, "Failed to fetch DLQ depth");
536                0
537            }
538        }
539    }
540}
541
542#[async_trait]
543impl QueueBackend for SqsBackend {
544    async fn produce_transaction_request(
545        &self,
546        job: Job<TransactionRequest>,
547        scheduled_on: Option<i64>,
548    ) -> Result<String, QueueBackendError> {
549        let queue_url = self
550            .queue_urls
551            .get(&QueueType::TransactionRequest)
552            .ok_or_else(|| QueueBackendError::QueueNotFound("TransactionRequest".to_string()))?;
553
554        let body = serde_json::to_string(&job).map_err(|e| {
555            error!(error = %e, "Failed to serialize TransactionRequest job");
556            QueueBackendError::SerializationError(e.to_string())
557        })?;
558
559        let message_group_id = transaction_message_group_id(
560            job.data.network_type.as_ref(),
561            &job.data.relayer_id,
562            &job.data.transaction_id,
563        );
564        let message_deduplication_id = job.message_id.clone();
565        let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
566
567        self.send_message_to_sqs(
568            queue_url,
569            body,
570            message_group_id,
571            message_deduplication_id,
572            delay_seconds,
573            scheduled_on,
574        )
575        .await
576    }
577
578    async fn produce_transaction_submission(
579        &self,
580        job: Job<TransactionSend>,
581        scheduled_on: Option<i64>,
582    ) -> Result<String, QueueBackendError> {
583        let queue_url = self
584            .queue_urls
585            .get(&QueueType::TransactionSubmission)
586            .ok_or_else(|| QueueBackendError::QueueNotFound("TransactionSubmission".to_string()))?;
587
588        let body = serde_json::to_string(&job).map_err(|e| {
589            error!(error = %e, "Failed to serialize TransactionSend job");
590            QueueBackendError::SerializationError(e.to_string())
591        })?;
592
593        let message_group_id = transaction_message_group_id(
594            job.data.network_type.as_ref(),
595            &job.data.relayer_id,
596            &job.data.transaction_id,
597        );
598        let message_deduplication_id = job.message_id.clone();
599        let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
600
601        self.send_message_to_sqs(
602            queue_url,
603            body,
604            message_group_id,
605            message_deduplication_id,
606            delay_seconds,
607            scheduled_on,
608        )
609        .await
610    }
611
612    async fn produce_transaction_status_check(
613        &self,
614        job: Job<TransactionStatusCheck>,
615        scheduled_on: Option<i64>,
616    ) -> Result<String, QueueBackendError> {
617        // Route to network-specific queue based on network type.
618        // EVM and Stellar get dedicated queues with tuned concurrency/polling;
619        // Solana and unknown network types use the generic StatusCheck queue.
620        let queue_type = status_check_queue_type(job.data.network_type.as_ref());
621        let queue_url = self
622            .queue_urls
623            .get(&queue_type)
624            .ok_or_else(|| QueueBackendError::QueueNotFound(format!("{queue_type}")))?;
625
626        let body = serde_json::to_string(&job).map_err(|e| {
627            error!(error = %e, "Failed to serialize TransactionStatusCheck job");
628            QueueBackendError::SerializationError(e.to_string())
629        })?;
630
631        let message_group_id = job.data.transaction_id.clone();
632        let message_deduplication_id = job.message_id.clone();
633        let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
634
635        self.send_message_to_sqs(
636            queue_url,
637            body,
638            message_group_id,
639            message_deduplication_id,
640            delay_seconds,
641            scheduled_on,
642        )
643        .await
644    }
645
646    async fn produce_notification(
647        &self,
648        job: Job<NotificationSend>,
649        scheduled_on: Option<i64>,
650    ) -> Result<String, QueueBackendError> {
651        let queue_url = self
652            .queue_urls
653            .get(&QueueType::Notification)
654            .ok_or_else(|| QueueBackendError::QueueNotFound("Notification".to_string()))?;
655
656        let body = serde_json::to_string(&job).map_err(|e| {
657            error!(error = %e, "Failed to serialize NotificationSend job");
658            QueueBackendError::SerializationError(e.to_string())
659        })?;
660
661        // Notifications use notification_id as the group ID
662        let message_group_id = job.data.notification_id.clone();
663        let message_deduplication_id = job.message_id.clone();
664        let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
665
666        self.send_message_to_sqs(
667            queue_url,
668            body,
669            message_group_id,
670            message_deduplication_id,
671            delay_seconds,
672            scheduled_on,
673        )
674        .await
675    }
676
677    async fn produce_token_swap_request(
678        &self,
679        job: Job<TokenSwapRequest>,
680        scheduled_on: Option<i64>,
681    ) -> Result<String, QueueBackendError> {
682        let queue_url = self
683            .queue_urls
684            .get(&QueueType::TokenSwapRequest)
685            .ok_or_else(|| QueueBackendError::QueueNotFound("TokenSwapRequest".to_string()))?;
686
687        let body = serde_json::to_string(&job).map_err(|e| {
688            error!(error = %e, "Failed to serialize TokenSwapRequest job");
689            QueueBackendError::SerializationError(e.to_string())
690        })?;
691
692        let message_group_id = job.data.relayer_id.clone();
693        let message_deduplication_id = job.message_id.clone();
694        let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
695
696        self.send_message_to_sqs(
697            queue_url,
698            body,
699            message_group_id,
700            message_deduplication_id,
701            delay_seconds,
702            scheduled_on,
703        )
704        .await
705    }
706
707    async fn produce_relayer_health_check(
708        &self,
709        job: Job<RelayerHealthCheck>,
710        scheduled_on: Option<i64>,
711    ) -> Result<String, QueueBackendError> {
712        let queue_url = self
713            .queue_urls
714            .get(&QueueType::RelayerHealthCheck)
715            .ok_or_else(|| QueueBackendError::QueueNotFound("RelayerHealthCheck".to_string()))?;
716
717        let body = serde_json::to_string(&job).map_err(|e| {
718            error!(error = %e, "Failed to serialize RelayerHealthCheck job");
719            QueueBackendError::SerializationError(e.to_string())
720        })?;
721
722        let message_group_id = job.data.relayer_id.clone();
723        let message_deduplication_id = job.message_id.clone();
724        let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
725
726        self.send_message_to_sqs(
727            queue_url,
728            body,
729            message_group_id,
730            message_deduplication_id,
731            delay_seconds,
732            scheduled_on,
733        )
734        .await
735    }
736
737    async fn initialize_workers(
738        &self,
739        app_state: Arc<ThinData<DefaultAppState>>,
740    ) -> Result<Vec<WorkerHandle>, QueueBackendError> {
741        info!(
742            "Initializing SQS workers for {} queues",
743            self.queue_urls.len()
744        );
745
746        let mut handles = Vec::new();
747
748        // Spawn a worker for each queue type
749        for (queue_type, queue_url) in &self.queue_urls {
750            let handle = super::sqs_worker::spawn_worker_for_queue(
751                self.sqs_client.clone(),
752                *queue_type,
753                queue_url.clone(),
754                app_state.clone(),
755                self.shutdown_tx.subscribe(),
756            )
757            .await?;
758
759            handles.push(handle);
760        }
761
762        // Start cron scheduler for periodic tasks (cleanup, token swaps)
763        let cron_scheduler =
764            super::sqs_cron::SqsCronScheduler::new(app_state.clone(), self.shutdown_tx.subscribe());
765        let cron_handles = cron_scheduler.start().await?;
766        handles.extend(cron_handles);
767
768        // Internal shutdown signal handler — listens for SIGINT/SIGTERM and
769        // broadcasts shutdown to all SQS workers and cron tasks.
770        // (Redis/Apalis workers handle signals via their own Monitor.)
771        {
772            let shutdown_tx = self.shutdown_tx.clone();
773            let handle = tokio::spawn(async move {
774                let mut sigint =
775                    tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
776                        .expect("Failed to create SIGINT handler");
777                let mut sigterm =
778                    tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
779                        .expect("Failed to create SIGTERM handler");
780
781                tokio::select! {
782                    _ = sigint.recv() => info!("SQS backend: received SIGINT, shutting down workers"),
783                    _ = sigterm.recv() => info!("SQS backend: received SIGTERM, shutting down workers"),
784                }
785
786                let _ = shutdown_tx.send(true);
787            });
788            handles.push(WorkerHandle::Tokio(handle));
789        }
790
791        info!(
792            "Successfully spawned {} SQS workers and cron tasks",
793            handles.len()
794        );
795        Ok(handles)
796    }
797
798    async fn health_check(&self) -> Result<Vec<QueueHealth>, QueueBackendError> {
799        let mut health_statuses = Vec::new();
800
801        for (queue_type, queue_url) in &self.queue_urls {
802            // Get queue attributes to check health
803            let result = self
804                .sqs_client
805                .get_queue_attributes()
806                .queue_url(queue_url)
807                .attribute_names(
808                    aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages,
809                )
810                .attribute_names(
811                    aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessagesNotVisible,
812                )
813                .send()
814                .await;
815
816            let (messages_visible, messages_in_flight, messages_dlq, is_healthy) = match result {
817                Ok(output) => {
818                    let attrs = output.attributes();
819                    let visible = attrs
820                        .and_then(|a| {
821                            a.get(&aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages)
822                        })
823                        .and_then(|v| v.parse::<u64>().ok())
824                        .unwrap_or(0);
825                    let in_flight = attrs
826                        .and_then(|a| {
827                            a.get(
828                                &aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessagesNotVisible,
829                            )
830                        })
831                        .and_then(|v| v.parse::<u64>().ok())
832                        .unwrap_or(0);
833                    let dlq_count = self.get_dlq_message_count(queue_type).await;
834                    (visible, in_flight, dlq_count, true)
835                }
836                Err(e) => {
837                    error!(
838                        error = %e,
839                        queue_type = ?queue_type,
840                        "Failed to get queue attributes"
841                    );
842                    (0, 0, 0, false)
843                }
844            };
845
846            health_statuses.push(QueueHealth {
847                queue_type: *queue_type,
848                messages_visible,
849                messages_in_flight,
850                messages_dlq,
851                backend: "sqs".to_string(),
852                is_healthy,
853            });
854        }
855
856        Ok(health_statuses)
857    }
858
859    fn backend_type(&self) -> QueueBackendType {
860        QueueBackendType::Sqs
861    }
862
863    fn shutdown(&self) {
864        info!("SQS backend: broadcasting shutdown signal to all workers");
865        let _ = self.shutdown_tx.send(true);
866    }
867}
868
869#[cfg(test)]
870mod tests {
871    use super::*;
872    use crate::jobs::{Job, JobType, TransactionStatusCheck};
873    use crate::models::NetworkType;
874
875    #[test]
876    fn test_calculate_delay_seconds() {
877        // No scheduled time
878        assert_eq!(SqsBackend::calculate_delay_seconds(None), None);
879
880        // Past time
881        let past = SystemTime::now()
882            .duration_since(SystemTime::UNIX_EPOCH)
883            .unwrap()
884            .as_secs() as i64
885            - 10;
886        assert_eq!(SqsBackend::calculate_delay_seconds(Some(past)), None);
887
888        // Future time within SQS limit (< 900s)
889        let future_5s = SystemTime::now()
890            .duration_since(SystemTime::UNIX_EPOCH)
891            .unwrap()
892            .as_secs() as i64
893            + 5;
894        assert_eq!(
895            SqsBackend::calculate_delay_seconds(Some(future_5s)),
896            Some(5)
897        );
898
899        // Future time beyond SQS limit (> 900s) - should clamp to 900
900        let future_1000s = SystemTime::now()
901            .duration_since(SystemTime::UNIX_EPOCH)
902            .unwrap()
903            .as_secs() as i64
904            + 1000;
905        assert_eq!(
906            SqsBackend::calculate_delay_seconds(Some(future_1000s)),
907            Some(900)
908        );
909    }
910
911    #[test]
912    fn test_calculate_delay_seconds_edge_cases() {
913        // Exactly at current time (should return None)
914        let now = SystemTime::now()
915            .duration_since(SystemTime::UNIX_EPOCH)
916            .unwrap()
917            .as_secs() as i64;
918        assert_eq!(SqsBackend::calculate_delay_seconds(Some(now)), None);
919
920        // Exactly at SQS limit (900s)
921        let future_900s = now + 900;
922        assert_eq!(
923            SqsBackend::calculate_delay_seconds(Some(future_900s)),
924            Some(900)
925        );
926
927        // Just over SQS limit (901s) - should clamp to 900
928        let future_901s = now + 901;
929        assert_eq!(
930            SqsBackend::calculate_delay_seconds(Some(future_901s)),
931            Some(900)
932        );
933    }
934
935    #[test]
936    fn test_sqs_backend_type_value() {
937        assert_eq!(QueueBackendType::Sqs.as_str(), "sqs");
938        assert_eq!(QueueBackendType::Sqs.to_string(), "sqs");
939    }
940
941    #[test]
942    fn test_queue_url_construction() {
943        // Test that queue URLs are correctly constructed
944        let mut queue_urls = HashMap::new();
945        let prefix = "https://sqs.us-east-1.amazonaws.com/123456789/relayer-";
946
947        queue_urls.insert(
948            QueueType::TransactionRequest,
949            format!("{prefix}transaction-request.fifo"),
950        );
951        queue_urls.insert(
952            QueueType::TransactionSubmission,
953            format!("{prefix}transaction-submission.fifo"),
954        );
955        queue_urls.insert(QueueType::StatusCheck, format!("{prefix}status-check.fifo"));
956        queue_urls.insert(
957            QueueType::Notification,
958            format!("{prefix}notification.fifo"),
959        );
960        queue_urls.insert(
961            QueueType::TokenSwapRequest,
962            format!("{prefix}token-swap-request.fifo"),
963        );
964        queue_urls.insert(
965            QueueType::RelayerHealthCheck,
966            format!("{prefix}relayer-health-check.fifo"),
967        );
968        queue_urls.insert(
969            QueueType::StatusCheckEvm,
970            format!("{prefix}status-check-evm.fifo"),
971        );
972        queue_urls.insert(
973            QueueType::StatusCheckStellar,
974            format!("{prefix}status-check-stellar.fifo"),
975        );
976
977        // Verify all queue types have URLs
978        assert_eq!(queue_urls.len(), 8);
979        assert!(queue_urls
980            .get(&QueueType::TransactionRequest)
981            .unwrap()
982            .ends_with(".fifo"));
983        assert!(queue_urls
984            .get(&QueueType::TransactionSubmission)
985            .unwrap()
986            .contains("transaction-submission"));
987        assert!(queue_urls
988            .get(&QueueType::StatusCheckEvm)
989            .unwrap()
990            .contains("status-check-evm"));
991        assert!(queue_urls
992            .get(&QueueType::StatusCheckStellar)
993            .unwrap()
994            .contains("status-check-stellar"));
995    }
996
997    #[test]
998    fn test_queue_url_construction_standard() {
999        // Test that standard queue URLs do not have .fifo suffix
1000        let mut queue_urls = HashMap::new();
1001        let prefix = "https://sqs.us-east-1.amazonaws.com/123456789/relayer-";
1002
1003        queue_urls.insert(
1004            QueueType::TransactionRequest,
1005            format!("{prefix}transaction-request"),
1006        );
1007        queue_urls.insert(
1008            QueueType::TransactionSubmission,
1009            format!("{prefix}transaction-submission"),
1010        );
1011        queue_urls.insert(QueueType::StatusCheck, format!("{prefix}status-check"));
1012        queue_urls.insert(QueueType::Notification, format!("{prefix}notification"));
1013        queue_urls.insert(
1014            QueueType::TokenSwapRequest,
1015            format!("{prefix}token-swap-request"),
1016        );
1017        queue_urls.insert(
1018            QueueType::RelayerHealthCheck,
1019            format!("{prefix}relayer-health-check"),
1020        );
1021        queue_urls.insert(
1022            QueueType::StatusCheckEvm,
1023            format!("{prefix}status-check-evm"),
1024        );
1025        queue_urls.insert(
1026            QueueType::StatusCheckStellar,
1027            format!("{prefix}status-check-stellar"),
1028        );
1029
1030        assert_eq!(queue_urls.len(), 8);
1031        // Standard queue URLs should NOT end with .fifo
1032        for (_, url) in &queue_urls {
1033            assert!(
1034                !url.ends_with(".fifo"),
1035                "Standard queue URL should not end with .fifo: {url}"
1036            );
1037        }
1038        assert!(queue_urls
1039            .get(&QueueType::TransactionRequest)
1040            .unwrap()
1041            .contains("transaction-request"));
1042    }
1043
1044    #[test]
1045    fn test_is_fifo_queue_url_standard() {
1046        assert!(!SqsBackend::is_fifo_queue_url(
1047            "https://sqs.us-east-1.amazonaws.com/123/relayer-transaction-request"
1048        ));
1049        assert!(!SqsBackend::is_fifo_queue_url(
1050            "http://localstack:4566/000000000000/relayer-status-check"
1051        ));
1052    }
1053
1054    #[test]
1055    fn test_transaction_message_group_id_evm_uses_transaction() {
1056        let group = transaction_message_group_id(Some(&NetworkType::Evm), "relayer-1", "tx-123");
1057        assert_eq!(group, "tx-123");
1058    }
1059
1060    #[test]
1061    fn test_transaction_message_group_id_stellar_uses_transaction() {
1062        let group =
1063            transaction_message_group_id(Some(&NetworkType::Stellar), "relayer-1", "tx-123");
1064        assert_eq!(group, "tx-123");
1065    }
1066
1067    #[test]
1068    fn test_transaction_message_group_id_solana_uses_transaction() {
1069        let group = transaction_message_group_id(Some(&NetworkType::Solana), "relayer-1", "tx-123");
1070        assert_eq!(group, "tx-123");
1071    }
1072
1073    #[test]
1074    fn test_transaction_message_group_id_none_defaults_to_transaction() {
1075        let group = transaction_message_group_id(None, "relayer-1", "tx-123");
1076        assert_eq!(
1077            group, "tx-123",
1078            "Unknown network should default to transaction id"
1079        );
1080    }
1081
1082    #[test]
1083    fn test_status_check_queue_type_evm() {
1084        assert_eq!(
1085            status_check_queue_type(Some(&NetworkType::Evm)),
1086            QueueType::StatusCheckEvm
1087        );
1088    }
1089
1090    #[test]
1091    fn test_status_check_queue_type_stellar() {
1092        assert_eq!(
1093            status_check_queue_type(Some(&NetworkType::Stellar)),
1094            QueueType::StatusCheckStellar
1095        );
1096    }
1097
1098    #[test]
1099    fn test_status_check_queue_type_solana_defaults_to_generic() {
1100        assert_eq!(
1101            status_check_queue_type(Some(&NetworkType::Solana)),
1102            QueueType::StatusCheck
1103        );
1104    }
1105
1106    #[test]
1107    fn test_status_check_queue_type_none_defaults_to_generic() {
1108        assert_eq!(status_check_queue_type(None), QueueType::StatusCheck);
1109    }
1110
1111    #[test]
1112    fn test_sqs_max_message_size_constant() {
1113        assert_eq!(SQS_MAX_MESSAGE_SIZE_BYTES, 256 * 1024);
1114    }
1115
1116    #[test]
1117    fn test_is_fifo_queue_url() {
1118        assert!(SqsBackend::is_fifo_queue_url(
1119            "https://sqs.us-east-1.amazonaws.com/123/queue.fifo"
1120        ));
1121        assert!(!SqsBackend::is_fifo_queue_url(
1122            "https://sqs.us-east-1.amazonaws.com/123/queue"
1123        ));
1124    }
1125
1126    // --- resolve_queue_type tests ---
1127
1128    const REF_STD: &str = "http://localhost:4566/000000000000/relayer-transaction-request";
1129    const REF_FIFO: &str = "http://localhost:4566/000000000000/relayer-transaction-request.fifo";
1130
1131    #[test]
1132    fn test_resolve_queue_type_explicit_standard() {
1133        let result = resolve_queue_type("standard", None, REF_STD, REF_FIFO);
1134        assert_eq!(result.unwrap(), false);
1135    }
1136
1137    #[test]
1138    fn test_resolve_queue_type_explicit_fifo() {
1139        let result = resolve_queue_type("fifo", None, REF_STD, REF_FIFO);
1140        assert_eq!(result.unwrap(), true);
1141    }
1142
1143    #[test]
1144    fn test_resolve_queue_type_explicit_ignores_probes() {
1145        // Even if probes say FIFO exists, explicit "standard" wins
1146        let result = resolve_queue_type("standard", Some((false, true)), REF_STD, REF_FIFO);
1147        assert_eq!(result.unwrap(), false);
1148    }
1149
1150    #[test]
1151    fn test_resolve_queue_type_auto_standard_only() {
1152        let result = resolve_queue_type("auto", Some((true, false)), REF_STD, REF_FIFO);
1153        assert_eq!(result.unwrap(), false);
1154    }
1155
1156    #[test]
1157    fn test_resolve_queue_type_auto_fifo_only() {
1158        let result = resolve_queue_type("auto", Some((false, true)), REF_STD, REF_FIFO);
1159        assert_eq!(result.unwrap(), true);
1160    }
1161
1162    #[test]
1163    fn test_resolve_queue_type_auto_both_exist_errors() {
1164        let result = resolve_queue_type("auto", Some((true, true)), REF_STD, REF_FIFO);
1165        assert!(result.is_err());
1166        let err = result.unwrap_err().to_string();
1167        assert!(
1168            err.contains("Ambiguous"),
1169            "Expected 'Ambiguous' error, got: {err}"
1170        );
1171    }
1172
1173    #[test]
1174    fn test_resolve_queue_type_auto_neither_exists_errors() {
1175        let result = resolve_queue_type("auto", Some((false, false)), REF_STD, REF_FIFO);
1176        assert!(result.is_err());
1177        let err = result.unwrap_err().to_string();
1178        assert!(
1179            err.contains("No SQS queues found"),
1180            "Expected 'No SQS queues found' error, got: {err}"
1181        );
1182    }
1183
1184    #[test]
1185    fn test_resolve_queue_type_auto_no_probes_defaults_to_neither() {
1186        // None probe results (shouldn't happen in practice) treated as (false, false)
1187        let result = resolve_queue_type("auto", None, REF_STD, REF_FIFO);
1188        assert!(result.is_err());
1189    }
1190
1191    #[test]
1192    fn test_resolve_queue_type_unknown_value_errors() {
1193        let result = resolve_queue_type("invalid", None, REF_STD, REF_FIFO);
1194        assert!(result.is_err());
1195        let err = result.unwrap_err().to_string();
1196        assert!(
1197            err.contains("Unsupported SQS_QUEUE_TYPE"),
1198            "Expected unsupported error, got: {err}"
1199        );
1200    }
1201
1202    // ── resolve_queue_type: error variant and message checks ──────────
1203
1204    #[test]
1205    fn test_resolve_queue_type_auto_neither_error_includes_urls() {
1206        let std_url = "https://sqs.us-east-1.amazonaws.com/123/relayer-transaction-request";
1207        let fifo_url = "https://sqs.us-east-1.amazonaws.com/123/relayer-transaction-request.fifo";
1208        let result = resolve_queue_type("auto", Some((false, false)), std_url, fifo_url);
1209        let err = result.unwrap_err();
1210        let msg = err.to_string();
1211        assert!(
1212            msg.contains(std_url),
1213            "Error should include standard URL: {msg}"
1214        );
1215        assert!(
1216            msg.contains(fifo_url),
1217            "Error should include FIFO URL: {msg}"
1218        );
1219    }
1220
1221    #[test]
1222    fn test_resolve_queue_type_returns_config_error_variant() {
1223        let result = resolve_queue_type("invalid", None, REF_STD, REF_FIFO);
1224        assert!(
1225            matches!(result, Err(QueueBackendError::ConfigError(_))),
1226            "Expected ConfigError variant"
1227        );
1228
1229        let result = resolve_queue_type("auto", Some((true, true)), REF_STD, REF_FIFO);
1230        assert!(
1231            matches!(result, Err(QueueBackendError::ConfigError(_))),
1232            "Ambiguous case should be ConfigError"
1233        );
1234
1235        let result = resolve_queue_type("auto", Some((false, false)), REF_STD, REF_FIFO);
1236        assert!(
1237            matches!(result, Err(QueueBackendError::ConfigError(_))),
1238            "No queues case should be ConfigError"
1239        );
1240    }
1241
1242    #[test]
1243    fn test_resolve_queue_type_unknown_includes_value_in_error() {
1244        let result = resolve_queue_type("redis", None, REF_STD, REF_FIFO);
1245        let msg = result.unwrap_err().to_string();
1246        assert!(
1247            msg.contains("redis"),
1248            "Error should echo the invalid value: {msg}"
1249        );
1250
1251        let result = resolve_queue_type("", None, REF_STD, REF_FIFO);
1252        assert!(result.is_err(), "Empty string should be rejected");
1253    }
1254
1255    #[test]
1256    fn test_resolve_queue_type_case_sensitive() {
1257        // The function matches exact lowercase strings; mixed case is unsupported
1258        assert!(resolve_queue_type("Standard", None, REF_STD, REF_FIFO).is_err());
1259        assert!(resolve_queue_type("FIFO", None, REF_STD, REF_FIFO).is_err());
1260        assert!(resolve_queue_type("Auto", None, REF_STD, REF_FIFO).is_err());
1261    }
1262
1263    // ── calculate_delay_seconds: additional edge cases ────────────────
1264
1265    #[test]
1266    fn test_calculate_delay_seconds_one_second_future() {
1267        let future_1s = SystemTime::now()
1268            .duration_since(SystemTime::UNIX_EPOCH)
1269            .unwrap()
1270            .as_secs() as i64
1271            + 2; // +2 to avoid race with timing
1272        let result = SqsBackend::calculate_delay_seconds(Some(future_1s));
1273        assert!(result.is_some(), "1-2s in future should yield Some");
1274        assert!(result.unwrap() > 0, "Delay should be positive");
1275        assert!(result.unwrap() <= 2, "Delay should be at most 2s");
1276    }
1277
1278    #[test]
1279    fn test_calculate_delay_seconds_far_past() {
1280        // Unix epoch itself
1281        assert_eq!(SqsBackend::calculate_delay_seconds(Some(0)), None);
1282        // Negative timestamp (before epoch)
1283        assert_eq!(SqsBackend::calculate_delay_seconds(Some(-1000)), None);
1284    }
1285
1286    #[test]
1287    fn test_calculate_delay_seconds_very_far_future() {
1288        // Year ~2100 — should still clamp to 900
1289        let far_future = 4_102_444_800_i64; // 2100-01-01T00:00:00Z
1290        assert_eq!(
1291            SqsBackend::calculate_delay_seconds(Some(far_future)),
1292            Some(900)
1293        );
1294    }
1295
1296    #[test]
1297    fn test_calculate_delay_seconds_exactly_900_boundary() {
1298        let now = SystemTime::now()
1299            .duration_since(SystemTime::UNIX_EPOCH)
1300            .unwrap()
1301            .as_secs() as i64;
1302
1303        // 899s future → should return 899 (under the cap)
1304        let result = SqsBackend::calculate_delay_seconds(Some(now + 899));
1305        assert!(result.is_some());
1306        // Allow ±1 for timing
1307        let val = result.unwrap();
1308        assert!((898..=899).contains(&val), "Expected ~899, got {val}");
1309    }
1310
1311    // ── is_fifo_queue_url: edge cases ─────────────────────────────────
1312
1313    #[test]
1314    fn test_is_fifo_queue_url_empty() {
1315        assert!(!SqsBackend::is_fifo_queue_url(""));
1316    }
1317
1318    #[test]
1319    fn test_is_fifo_queue_url_case_sensitive() {
1320        assert!(!SqsBackend::is_fifo_queue_url(
1321            "https://sqs.us-east-1.amazonaws.com/123/queue.FIFO"
1322        ));
1323        assert!(!SqsBackend::is_fifo_queue_url(
1324            "https://sqs.us-east-1.amazonaws.com/123/queue.Fifo"
1325        ));
1326    }
1327
1328    #[test]
1329    fn test_is_fifo_queue_url_fifo_in_middle() {
1330        assert!(!SqsBackend::is_fifo_queue_url(
1331            "https://sqs.us-east-1.amazonaws.com/123/.fifo/queue"
1332        ));
1333    }
1334
1335    #[test]
1336    fn test_is_fifo_queue_url_just_suffix() {
1337        assert!(SqsBackend::is_fifo_queue_url(".fifo"));
1338    }
1339
1340    #[test]
1341    fn test_is_fifo_queue_url_localstack() {
1342        assert!(SqsBackend::is_fifo_queue_url(
1343            "http://localhost:4566/000000000000/relayer-tx.fifo"
1344        ));
1345    }
1346
1347    // ── transaction_message_group_id: consistency ─────────────────────
1348
1349    #[test]
1350    fn test_transaction_message_group_id_always_returns_transaction_id() {
1351        // Current implementation returns transaction_id for all network types.
1352        // This test documents that behavior and catches accidental changes.
1353        let networks: &[Option<NetworkType>] = &[
1354            Some(NetworkType::Evm),
1355            Some(NetworkType::Stellar),
1356            Some(NetworkType::Solana),
1357            None,
1358        ];
1359
1360        for network in networks {
1361            let group = transaction_message_group_id(network.as_ref(), "relayer-99", "tx-abc");
1362            assert_eq!(
1363                group, "tx-abc",
1364                "Expected transaction_id for network {network:?}"
1365            );
1366        }
1367    }
1368
1369    // ── status_check_queue_type: returned queue names ──────────────────
1370
1371    #[test]
1372    fn test_status_check_queue_type_returns_distinct_queue_names() {
1373        let evm = status_check_queue_type(Some(&NetworkType::Evm));
1374        let stellar = status_check_queue_type(Some(&NetworkType::Stellar));
1375        let generic = status_check_queue_type(None);
1376
1377        assert_ne!(evm.queue_name(), stellar.queue_name());
1378        assert_ne!(evm.queue_name(), generic.queue_name());
1379        assert_ne!(stellar.queue_name(), generic.queue_name());
1380    }
1381
1382    #[test]
1383    fn test_status_check_queue_type_all_are_status_checks() {
1384        let networks: &[Option<&NetworkType>] = &[
1385            Some(&NetworkType::Evm),
1386            Some(&NetworkType::Stellar),
1387            Some(&NetworkType::Solana),
1388            None,
1389        ];
1390
1391        for network in networks {
1392            let qt = status_check_queue_type(*network);
1393            assert!(
1394                qt.is_status_check(),
1395                "{qt:?} should be a status check variant"
1396            );
1397        }
1398    }
1399
1400    // ── Queue URL construction algorithm ──────────────────────────────
1401
1402    #[test]
1403    fn test_queue_url_construction_algorithm_fifo() {
1404        // Replicate the algorithm from SqsBackend::new()
1405        let prefix = "https://sqs.us-east-1.amazonaws.com/123456789/relayer-";
1406        let suffix = ".fifo";
1407
1408        let expected_urls = [
1409            (
1410                QueueType::TransactionRequest,
1411                format!("{prefix}transaction-request{suffix}"),
1412            ),
1413            (
1414                QueueType::TransactionSubmission,
1415                format!("{prefix}transaction-submission{suffix}"),
1416            ),
1417            (
1418                QueueType::StatusCheck,
1419                format!("{prefix}status-check{suffix}"),
1420            ),
1421            (
1422                QueueType::StatusCheckEvm,
1423                format!("{prefix}status-check-evm{suffix}"),
1424            ),
1425            (
1426                QueueType::StatusCheckStellar,
1427                format!("{prefix}status-check-stellar{suffix}"),
1428            ),
1429            (
1430                QueueType::Notification,
1431                format!("{prefix}notification{suffix}"),
1432            ),
1433            (
1434                QueueType::TokenSwapRequest,
1435                format!("{prefix}token-swap-request{suffix}"),
1436            ),
1437            (
1438                QueueType::RelayerHealthCheck,
1439                format!("{prefix}relayer-health-check{suffix}"),
1440            ),
1441        ];
1442
1443        for (qt, expected) in &expected_urls {
1444            assert!(
1445                SqsBackend::is_fifo_queue_url(expected),
1446                "{qt:?}: URL should be FIFO: {expected}"
1447            );
1448        }
1449    }
1450
1451    #[test]
1452    fn test_queue_url_construction_algorithm_standard() {
1453        let prefix = "https://sqs.us-east-1.amazonaws.com/123456789/relayer-";
1454        let suffix = "";
1455
1456        let urls = [
1457            format!("{prefix}transaction-request{suffix}"),
1458            format!("{prefix}transaction-submission{suffix}"),
1459            format!("{prefix}status-check{suffix}"),
1460            format!("{prefix}notification{suffix}"),
1461        ];
1462
1463        for url in &urls {
1464            assert!(
1465                !SqsBackend::is_fifo_queue_url(url),
1466                "Standard URL should not be FIFO: {url}"
1467            );
1468        }
1469    }
1470
1471    // ── SQS_MAX_MESSAGE_SIZE_BYTES ────────────────────────────────────
1472
1473    #[test]
1474    fn test_sqs_max_message_size_matches_aws_limit() {
1475        // AWS SQS maximum message body size is exactly 256 KiB
1476        assert_eq!(SQS_MAX_MESSAGE_SIZE_BYTES, 262_144);
1477    }
1478
1479    // ── resolve_queue_type: all branches produce expected is_fifo ─────
1480
1481    #[test]
1482    fn test_resolve_queue_type_suffix_logic() {
1483        // Verify the suffix derived from resolve_queue_type produces correct URLs
1484        let test_cases = [
1485            ("standard", None, false),
1486            ("fifo", None, true),
1487            ("auto", Some((true, false)), false),
1488            ("auto", Some((false, true)), true),
1489        ];
1490
1491        for (sqs_type, probes, expected_fifo) in test_cases {
1492            let is_fifo = resolve_queue_type(sqs_type, probes, REF_STD, REF_FIFO).unwrap();
1493            assert_eq!(is_fifo, expected_fifo, "sqs_type={sqs_type}");
1494
1495            let suffix = if is_fifo { ".fifo" } else { "" };
1496            let url = format!("https://sqs.us-east-1.amazonaws.com/123/relayer-tx{suffix}");
1497            assert_eq!(
1498                SqsBackend::is_fifo_queue_url(&url),
1499                expected_fifo,
1500                "URL FIFO detection mismatch for sqs_type={sqs_type}"
1501            );
1502        }
1503    }
1504
1505    #[tokio::test]
1506    #[ignore]
1507    async fn smoke_push_status_check_to_sqs() {
1508        // Requires real AWS credentials and queue env config.
1509        // Expected env:
1510        // - AWS_REGION
1511        // - SQS_QUEUE_URL_PREFIX
1512        // - optional AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY/AWS_SESSION_TOKEN
1513        let backend = SqsBackend::new()
1514            .await
1515            .expect("SQS backend initialization failed");
1516        let job = Job::new(
1517            JobType::TransactionStatusCheck,
1518            TransactionStatusCheck::new("smoke-tx-id", "smoke-relayer", NetworkType::Stellar),
1519        );
1520        let now = SystemTime::now()
1521            .duration_since(SystemTime::UNIX_EPOCH)
1522            .expect("system time before unix epoch")
1523            .as_secs() as i64;
1524        let scheduled_on = Some(now + 2);
1525        let result = backend
1526            .produce_transaction_status_check(job, scheduled_on)
1527            .await;
1528        assert!(
1529            result.is_ok(),
1530            "Expected SendMessage via SQS backend to succeed, got: {result:?}"
1531        );
1532    }
1533}