1use async_trait::async_trait;
9use aws_sdk_sqs::types::MessageAttributeValue;
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::SystemTime;
13use tokio::sync::watch;
14use tracing::{debug, error, info, warn};
15
16use crate::{
17 config::ServerConfig,
18 jobs::{
19 Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
20 TransactionSend, TransactionStatusCheck,
21 },
22 models::{DefaultAppState, NetworkType},
23 queues::QueueBackendType,
24};
25use actix_web::web::ThinData;
26
27use super::{QueueBackend, QueueBackendError, QueueHealth, QueueType, WorkerHandle};
28
29const SQS_MAX_MESSAGE_SIZE_BYTES: usize = 256 * 1024;
31
32fn transaction_message_group_id(
39 network_type: Option<&NetworkType>,
40 _relayer_id: &str,
41 transaction_id: &str,
42) -> String {
43 match network_type {
44 Some(_) | None => transaction_id.to_string(),
45 }
46}
47
48fn status_check_queue_type(network_type: Option<&NetworkType>) -> QueueType {
53 match network_type {
54 Some(NetworkType::Evm) => QueueType::StatusCheckEvm,
55 Some(NetworkType::Stellar) => QueueType::StatusCheckStellar,
56 _ => QueueType::StatusCheck,
57 }
58}
59
60#[derive(Clone)]
66pub struct SqsBackend {
67 sqs_client: aws_sdk_sqs::Client,
69 queue_urls: HashMap<QueueType, String>,
71 dlq_urls: HashMap<QueueType, String>,
74 region: String,
76 shutdown_tx: Arc<watch::Sender<bool>>,
78}
79
80impl std::fmt::Debug for SqsBackend {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 f.debug_struct("SqsBackend")
83 .field("backend_type", &"sqs")
84 .field("region", &self.region)
85 .field("queue_count", &self.queue_urls.len())
86 .finish()
87 }
88}
89
90fn resolve_queue_type(
100 sqs_queue_type: &str,
101 probe_results: Option<(bool, bool)>,
102 ref_standard_url: &str,
103 ref_fifo_url: &str,
104) -> Result<bool, QueueBackendError> {
105 match sqs_queue_type {
106 "standard" => {
107 info!("Using explicit SQS queue type: standard");
108 Ok(false)
109 }
110 "fifo" => {
111 info!("Using explicit SQS queue type: fifo");
112 Ok(true)
113 }
114 "auto" => {
115 let (standard_exists, fifo_exists) = probe_results.unwrap_or((false, false));
116 match (standard_exists, fifo_exists) {
117 (true, false) => {
118 info!("Detected SQS queue type: standard");
119 Ok(false)
120 }
121 (false, true) => {
122 info!("Detected SQS queue type: fifo");
123 Ok(true)
124 }
125 (true, true) => Err(QueueBackendError::ConfigError(
126 "Ambiguous SQS queue type: both standard and FIFO \
127 'transaction-request' queues exist. Remove one set or set \
128 SQS_QUEUE_TYPE explicitly."
129 .to_string(),
130 )),
131 (false, false) => Err(QueueBackendError::ConfigError(format!(
132 "No SQS queues found. Neither '{ref_standard_url}' nor \
133 '{ref_fifo_url}' is accessible. Create queues before starting \
134 the relayer, or set SQS_QUEUE_TYPE explicitly."
135 ))),
136 }
137 }
138 other => Err(QueueBackendError::ConfigError(format!(
139 "Unsupported SQS_QUEUE_TYPE: '{other}'. Must be 'auto', 'standard', or 'fifo'."
140 ))),
141 }
142}
143
144impl SqsBackend {
145 fn is_fifo_queue_url(queue_url: &str) -> bool {
146 queue_url.ends_with(".fifo")
147 }
148
149 pub async fn new() -> Result<Self, QueueBackendError> {
166 info!("Initializing SQS queue backend");
167
168 let config = aws_config::load_from_env().await;
170 let sqs_client = aws_sdk_sqs::Client::new(&config);
171 let region = config
172 .region()
173 .ok_or_else(|| {
174 QueueBackendError::ConfigError(
175 "AWS_REGION not set. Required for SQS backend.".to_string(),
176 )
177 })?
178 .to_string();
179
180 let prefix = match std::env::var("SQS_QUEUE_URL_PREFIX") {
183 Ok(prefix) => prefix,
184 Err(_) => {
185 let account_id =
186 ServerConfig::get_aws_account_id().map_err(QueueBackendError::ConfigError)?;
187 format!("https://sqs.{region}.amazonaws.com/{account_id}/relayer-")
188 }
189 };
190 info!(
191 region = %region,
192 queue_url_prefix = %prefix,
193 "Resolved SQS queue URL prefix"
194 );
195
196 let sqs_queue_type = ServerConfig::get_sqs_queue_type().to_lowercase();
198 let ref_standard_url = format!("{prefix}transaction-request");
199 let ref_fifo_url = format!("{prefix}transaction-request.fifo");
200
201 let probe_results = if sqs_queue_type == "auto" {
203 let (standard_probe, fifo_probe) = {
204 let client_s = sqs_client.clone();
205 let client_f = sqs_client.clone();
206 let url_s = ref_standard_url.clone();
207 let url_f = ref_fifo_url.clone();
208 tokio::join!(
209 async move {
210 client_s
211 .get_queue_attributes()
212 .queue_url(&url_s)
213 .attribute_names(aws_sdk_sqs::types::QueueAttributeName::QueueArn)
214 .send()
215 .await
216 },
217 async move {
218 client_f
219 .get_queue_attributes()
220 .queue_url(&url_f)
221 .attribute_names(aws_sdk_sqs::types::QueueAttributeName::QueueArn)
222 .send()
223 .await
224 }
225 )
226 };
227 Some((standard_probe.is_ok(), fifo_probe.is_ok()))
228 } else {
229 None
230 };
231
232 let is_fifo = resolve_queue_type(
233 &sqs_queue_type,
234 probe_results,
235 &ref_standard_url,
236 &ref_fifo_url,
237 )?;
238 let suffix = if is_fifo { ".fifo" } else { "" };
239
240 let queue_urls = HashMap::from([
245 (
246 QueueType::TransactionRequest,
247 format!("{prefix}transaction-request{suffix}"),
248 ),
249 (
250 QueueType::TransactionSubmission,
251 format!("{prefix}transaction-submission{suffix}"),
252 ),
253 (
254 QueueType::StatusCheck,
255 format!("{prefix}status-check{suffix}"),
256 ),
257 (
258 QueueType::StatusCheckEvm,
259 format!("{prefix}status-check-evm{suffix}"),
260 ),
261 (
262 QueueType::StatusCheckStellar,
263 format!("{prefix}status-check-stellar{suffix}"),
264 ),
265 (
266 QueueType::Notification,
267 format!("{prefix}notification{suffix}"),
268 ),
269 (
270 QueueType::TokenSwapRequest,
271 format!("{prefix}token-swap-request{suffix}"),
272 ),
273 (
274 QueueType::RelayerHealthCheck,
275 format!("{prefix}relayer-health-check{suffix}"),
276 ),
277 ]);
278
279 let probe_futures: Vec<_> = queue_urls
284 .iter()
285 .map(|(queue_type, queue_url)| {
286 let client = sqs_client.clone();
287 let qt = *queue_type;
288 let url = queue_url.clone();
289 async move {
290 debug!(
291 queue_type = %qt,
292 queue_url = %url,
293 "Probing SQS queue accessibility at startup"
294 );
295 let probe = client
296 .get_queue_attributes()
297 .queue_url(&url)
298 .attribute_names(aws_sdk_sqs::types::QueueAttributeName::QueueArn)
299 .attribute_names(aws_sdk_sqs::types::QueueAttributeName::RedrivePolicy)
300 .send()
301 .await;
302 (qt, url, probe)
303 }
304 })
305 .collect();
306
307 let probe_results = futures::future::join_all(probe_futures).await;
308
309 let mut missing_queues = Vec::new();
310 let mut dlq_urls: HashMap<QueueType, String> = HashMap::new();
311
312 for (queue_type, queue_url, probe) in probe_results {
313 match probe {
314 Ok(output) => {
315 debug!(
316 queue_type = %queue_type,
317 queue_url = %queue_url,
318 is_fifo = is_fifo,
319 "SQS queue probe succeeded"
320 );
321
322 if let Some(dlq_url) =
325 Self::resolve_dlq_url_from_attrs(&sqs_client, output.attributes()).await
326 {
327 dlq_urls.insert(queue_type, dlq_url);
328 }
329 }
330 Err(err) => {
331 error!(
333 queue_type = %queue_type,
334 queue_url = %queue_url,
335 error = ?err,
336 "SQS queue probe failed"
337 );
338 missing_queues.push(format!("{queue_type} ({queue_url}): {err:?}"));
339 }
340 }
341 }
342
343 if !missing_queues.is_empty() {
344 return Err(QueueBackendError::ConfigError(format!(
345 "SQS backend initialization failed. Missing/inaccessible queues: {}",
346 missing_queues.join(", ")
347 )));
348 }
349
350 info!(
351 region = %region,
352 queue_count = queue_urls.len(),
353 "SQS backend initialized"
354 );
355
356 let (shutdown_tx, _) = watch::channel(false);
357
358 Ok(Self {
359 sqs_client,
360 queue_urls,
361 dlq_urls,
362 region,
363 shutdown_tx: Arc::new(shutdown_tx),
364 })
365 }
366
367 async fn send_message_to_sqs(
379 &self,
380 queue_url: &str,
381 body: String,
382 message_group_id: String,
383 message_deduplication_id: String,
384 delay_seconds: Option<i32>,
385 target_scheduled_on: Option<i64>,
386 ) -> Result<String, QueueBackendError> {
387 if body.len() > SQS_MAX_MESSAGE_SIZE_BYTES {
388 return Err(QueueBackendError::SqsError(format!(
389 "Message body size ({} bytes) exceeds SQS limit ({} bytes)",
390 body.len(),
391 SQS_MAX_MESSAGE_SIZE_BYTES
392 )));
393 }
394
395 let mut request = self
396 .sqs_client
397 .send_message()
398 .queue_url(queue_url)
399 .message_body(body);
400
401 if Self::is_fifo_queue_url(queue_url) {
404 request = request
405 .message_group_id(message_group_id)
406 .message_deduplication_id(message_deduplication_id);
407 }
408
409 if let Some(timestamp) = target_scheduled_on {
410 request = request.message_attributes(
411 "target_scheduled_on",
412 MessageAttributeValue::builder()
413 .data_type("Number")
414 .string_value(timestamp.to_string())
415 .build()
416 .map_err(|e| {
417 QueueBackendError::SqsError(format!(
418 "Failed to build scheduled-on attribute: {e}"
419 ))
420 })?,
421 );
422 }
423
424 if let Some(delay) = delay_seconds {
427 let clamped_delay = delay.clamp(0, 900);
428 if Self::is_fifo_queue_url(queue_url) {
429 debug!(
430 queue_url = %queue_url,
431 requested_delay_seconds = delay,
432 "Skipping per-message DelaySeconds for FIFO queue; worker-side scheduling will enforce target_scheduled_on"
433 );
434 } else {
435 request = request.delay_seconds(clamped_delay);
436 if delay != clamped_delay {
437 warn!(
438 requested = delay,
439 clamped = clamped_delay,
440 "Delay seconds clamped to SQS limit (0-900)"
441 );
442 }
443 }
444 }
445
446 let response = request.send().await.map_err(|e| {
447 error!(error = %e, queue_url = %queue_url, "Failed to send message to SQS");
448 QueueBackendError::SqsError(format!("SendMessage failed: {e}"))
449 })?;
450
451 let message_id = response
452 .message_id()
453 .ok_or_else(|| QueueBackendError::SqsError("No message_id returned".to_string()))?
454 .to_string();
455
456 debug!(
457 message_id = %message_id,
458 queue_url = %queue_url,
459 "Message sent to SQS"
460 );
461
462 Ok(message_id)
463 }
464
465 fn calculate_delay_seconds(scheduled_on: Option<i64>) -> Option<i32> {
469 scheduled_on.and_then(|timestamp| {
470 let now = SystemTime::now()
471 .duration_since(SystemTime::UNIX_EPOCH)
472 .ok()?
473 .as_secs() as i64;
474
475 let delay = timestamp - now;
476 if delay > 0 {
477 Some(delay.min(900) as i32) } else {
479 None }
481 })
482 }
483
484 async fn resolve_dlq_url_from_attrs(
488 sqs_client: &aws_sdk_sqs::Client,
489 attrs: Option<&HashMap<aws_sdk_sqs::types::QueueAttributeName, String>>,
490 ) -> Option<String> {
491 let redrive_policy =
492 attrs.and_then(|a| a.get(&aws_sdk_sqs::types::QueueAttributeName::RedrivePolicy))?;
493
494 let dlq_arn = serde_json::from_str::<serde_json::Value>(redrive_policy)
495 .ok()
496 .and_then(|v| v.get("deadLetterTargetArn").cloned())
497 .and_then(|v| v.as_str().map(|s| s.to_string()));
498
499 let dlq_name = dlq_arn.as_deref()?.rsplit(':').next()?;
500
501 match sqs_client.get_queue_url().queue_name(dlq_name).send().await {
502 Ok(output) => output.queue_url().map(str::to_string),
503 Err(err) => {
504 warn!(error = %err, dlq_name = %dlq_name, "Failed to resolve DLQ URL at startup");
505 None
506 }
507 }
508 }
509
510 async fn get_dlq_message_count(&self, queue_type: &QueueType) -> u64 {
515 let Some(dlq_url) = self.dlq_urls.get(queue_type) else {
516 return 0;
517 };
518
519 match self
520 .sqs_client
521 .get_queue_attributes()
522 .queue_url(dlq_url)
523 .attribute_names(aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages)
524 .send()
525 .await
526 {
527 Ok(output) => output
528 .attributes()
529 .and_then(|attrs| {
530 attrs.get(&aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages)
531 })
532 .and_then(|value| value.parse::<u64>().ok())
533 .unwrap_or(0),
534 Err(err) => {
535 warn!(error = %err, dlq_url = %dlq_url, "Failed to fetch DLQ depth");
536 0
537 }
538 }
539 }
540}
541
542#[async_trait]
543impl QueueBackend for SqsBackend {
544 async fn produce_transaction_request(
545 &self,
546 job: Job<TransactionRequest>,
547 scheduled_on: Option<i64>,
548 ) -> Result<String, QueueBackendError> {
549 let queue_url = self
550 .queue_urls
551 .get(&QueueType::TransactionRequest)
552 .ok_or_else(|| QueueBackendError::QueueNotFound("TransactionRequest".to_string()))?;
553
554 let body = serde_json::to_string(&job).map_err(|e| {
555 error!(error = %e, "Failed to serialize TransactionRequest job");
556 QueueBackendError::SerializationError(e.to_string())
557 })?;
558
559 let message_group_id = transaction_message_group_id(
560 job.data.network_type.as_ref(),
561 &job.data.relayer_id,
562 &job.data.transaction_id,
563 );
564 let message_deduplication_id = job.message_id.clone();
565 let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
566
567 self.send_message_to_sqs(
568 queue_url,
569 body,
570 message_group_id,
571 message_deduplication_id,
572 delay_seconds,
573 scheduled_on,
574 )
575 .await
576 }
577
578 async fn produce_transaction_submission(
579 &self,
580 job: Job<TransactionSend>,
581 scheduled_on: Option<i64>,
582 ) -> Result<String, QueueBackendError> {
583 let queue_url = self
584 .queue_urls
585 .get(&QueueType::TransactionSubmission)
586 .ok_or_else(|| QueueBackendError::QueueNotFound("TransactionSubmission".to_string()))?;
587
588 let body = serde_json::to_string(&job).map_err(|e| {
589 error!(error = %e, "Failed to serialize TransactionSend job");
590 QueueBackendError::SerializationError(e.to_string())
591 })?;
592
593 let message_group_id = transaction_message_group_id(
594 job.data.network_type.as_ref(),
595 &job.data.relayer_id,
596 &job.data.transaction_id,
597 );
598 let message_deduplication_id = job.message_id.clone();
599 let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
600
601 self.send_message_to_sqs(
602 queue_url,
603 body,
604 message_group_id,
605 message_deduplication_id,
606 delay_seconds,
607 scheduled_on,
608 )
609 .await
610 }
611
612 async fn produce_transaction_status_check(
613 &self,
614 job: Job<TransactionStatusCheck>,
615 scheduled_on: Option<i64>,
616 ) -> Result<String, QueueBackendError> {
617 let queue_type = status_check_queue_type(job.data.network_type.as_ref());
621 let queue_url = self
622 .queue_urls
623 .get(&queue_type)
624 .ok_or_else(|| QueueBackendError::QueueNotFound(format!("{queue_type}")))?;
625
626 let body = serde_json::to_string(&job).map_err(|e| {
627 error!(error = %e, "Failed to serialize TransactionStatusCheck job");
628 QueueBackendError::SerializationError(e.to_string())
629 })?;
630
631 let message_group_id = job.data.transaction_id.clone();
632 let message_deduplication_id = job.message_id.clone();
633 let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
634
635 self.send_message_to_sqs(
636 queue_url,
637 body,
638 message_group_id,
639 message_deduplication_id,
640 delay_seconds,
641 scheduled_on,
642 )
643 .await
644 }
645
646 async fn produce_notification(
647 &self,
648 job: Job<NotificationSend>,
649 scheduled_on: Option<i64>,
650 ) -> Result<String, QueueBackendError> {
651 let queue_url = self
652 .queue_urls
653 .get(&QueueType::Notification)
654 .ok_or_else(|| QueueBackendError::QueueNotFound("Notification".to_string()))?;
655
656 let body = serde_json::to_string(&job).map_err(|e| {
657 error!(error = %e, "Failed to serialize NotificationSend job");
658 QueueBackendError::SerializationError(e.to_string())
659 })?;
660
661 let message_group_id = job.data.notification_id.clone();
663 let message_deduplication_id = job.message_id.clone();
664 let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
665
666 self.send_message_to_sqs(
667 queue_url,
668 body,
669 message_group_id,
670 message_deduplication_id,
671 delay_seconds,
672 scheduled_on,
673 )
674 .await
675 }
676
677 async fn produce_token_swap_request(
678 &self,
679 job: Job<TokenSwapRequest>,
680 scheduled_on: Option<i64>,
681 ) -> Result<String, QueueBackendError> {
682 let queue_url = self
683 .queue_urls
684 .get(&QueueType::TokenSwapRequest)
685 .ok_or_else(|| QueueBackendError::QueueNotFound("TokenSwapRequest".to_string()))?;
686
687 let body = serde_json::to_string(&job).map_err(|e| {
688 error!(error = %e, "Failed to serialize TokenSwapRequest job");
689 QueueBackendError::SerializationError(e.to_string())
690 })?;
691
692 let message_group_id = job.data.relayer_id.clone();
693 let message_deduplication_id = job.message_id.clone();
694 let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
695
696 self.send_message_to_sqs(
697 queue_url,
698 body,
699 message_group_id,
700 message_deduplication_id,
701 delay_seconds,
702 scheduled_on,
703 )
704 .await
705 }
706
707 async fn produce_relayer_health_check(
708 &self,
709 job: Job<RelayerHealthCheck>,
710 scheduled_on: Option<i64>,
711 ) -> Result<String, QueueBackendError> {
712 let queue_url = self
713 .queue_urls
714 .get(&QueueType::RelayerHealthCheck)
715 .ok_or_else(|| QueueBackendError::QueueNotFound("RelayerHealthCheck".to_string()))?;
716
717 let body = serde_json::to_string(&job).map_err(|e| {
718 error!(error = %e, "Failed to serialize RelayerHealthCheck job");
719 QueueBackendError::SerializationError(e.to_string())
720 })?;
721
722 let message_group_id = job.data.relayer_id.clone();
723 let message_deduplication_id = job.message_id.clone();
724 let delay_seconds = Self::calculate_delay_seconds(scheduled_on);
725
726 self.send_message_to_sqs(
727 queue_url,
728 body,
729 message_group_id,
730 message_deduplication_id,
731 delay_seconds,
732 scheduled_on,
733 )
734 .await
735 }
736
737 async fn initialize_workers(
738 &self,
739 app_state: Arc<ThinData<DefaultAppState>>,
740 ) -> Result<Vec<WorkerHandle>, QueueBackendError> {
741 info!(
742 "Initializing SQS workers for {} queues",
743 self.queue_urls.len()
744 );
745
746 let mut handles = Vec::new();
747
748 for (queue_type, queue_url) in &self.queue_urls {
750 let handle = super::sqs_worker::spawn_worker_for_queue(
751 self.sqs_client.clone(),
752 *queue_type,
753 queue_url.clone(),
754 app_state.clone(),
755 self.shutdown_tx.subscribe(),
756 )
757 .await?;
758
759 handles.push(handle);
760 }
761
762 let cron_scheduler =
764 super::sqs_cron::SqsCronScheduler::new(app_state.clone(), self.shutdown_tx.subscribe());
765 let cron_handles = cron_scheduler.start().await?;
766 handles.extend(cron_handles);
767
768 {
772 let shutdown_tx = self.shutdown_tx.clone();
773 let handle = tokio::spawn(async move {
774 let mut sigint =
775 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
776 .expect("Failed to create SIGINT handler");
777 let mut sigterm =
778 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
779 .expect("Failed to create SIGTERM handler");
780
781 tokio::select! {
782 _ = sigint.recv() => info!("SQS backend: received SIGINT, shutting down workers"),
783 _ = sigterm.recv() => info!("SQS backend: received SIGTERM, shutting down workers"),
784 }
785
786 let _ = shutdown_tx.send(true);
787 });
788 handles.push(WorkerHandle::Tokio(handle));
789 }
790
791 info!(
792 "Successfully spawned {} SQS workers and cron tasks",
793 handles.len()
794 );
795 Ok(handles)
796 }
797
798 async fn health_check(&self) -> Result<Vec<QueueHealth>, QueueBackendError> {
799 let mut health_statuses = Vec::new();
800
801 for (queue_type, queue_url) in &self.queue_urls {
802 let result = self
804 .sqs_client
805 .get_queue_attributes()
806 .queue_url(queue_url)
807 .attribute_names(
808 aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages,
809 )
810 .attribute_names(
811 aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessagesNotVisible,
812 )
813 .send()
814 .await;
815
816 let (messages_visible, messages_in_flight, messages_dlq, is_healthy) = match result {
817 Ok(output) => {
818 let attrs = output.attributes();
819 let visible = attrs
820 .and_then(|a| {
821 a.get(&aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessages)
822 })
823 .and_then(|v| v.parse::<u64>().ok())
824 .unwrap_or(0);
825 let in_flight = attrs
826 .and_then(|a| {
827 a.get(
828 &aws_sdk_sqs::types::QueueAttributeName::ApproximateNumberOfMessagesNotVisible,
829 )
830 })
831 .and_then(|v| v.parse::<u64>().ok())
832 .unwrap_or(0);
833 let dlq_count = self.get_dlq_message_count(queue_type).await;
834 (visible, in_flight, dlq_count, true)
835 }
836 Err(e) => {
837 error!(
838 error = %e,
839 queue_type = ?queue_type,
840 "Failed to get queue attributes"
841 );
842 (0, 0, 0, false)
843 }
844 };
845
846 health_statuses.push(QueueHealth {
847 queue_type: *queue_type,
848 messages_visible,
849 messages_in_flight,
850 messages_dlq,
851 backend: "sqs".to_string(),
852 is_healthy,
853 });
854 }
855
856 Ok(health_statuses)
857 }
858
859 fn backend_type(&self) -> QueueBackendType {
860 QueueBackendType::Sqs
861 }
862
863 fn shutdown(&self) {
864 info!("SQS backend: broadcasting shutdown signal to all workers");
865 let _ = self.shutdown_tx.send(true);
866 }
867}
868
869#[cfg(test)]
870mod tests {
871 use super::*;
872 use crate::jobs::{Job, JobType, TransactionStatusCheck};
873 use crate::models::NetworkType;
874
875 #[test]
876 fn test_calculate_delay_seconds() {
877 assert_eq!(SqsBackend::calculate_delay_seconds(None), None);
879
880 let past = SystemTime::now()
882 .duration_since(SystemTime::UNIX_EPOCH)
883 .unwrap()
884 .as_secs() as i64
885 - 10;
886 assert_eq!(SqsBackend::calculate_delay_seconds(Some(past)), None);
887
888 let future_5s = SystemTime::now()
890 .duration_since(SystemTime::UNIX_EPOCH)
891 .unwrap()
892 .as_secs() as i64
893 + 5;
894 assert_eq!(
895 SqsBackend::calculate_delay_seconds(Some(future_5s)),
896 Some(5)
897 );
898
899 let future_1000s = SystemTime::now()
901 .duration_since(SystemTime::UNIX_EPOCH)
902 .unwrap()
903 .as_secs() as i64
904 + 1000;
905 assert_eq!(
906 SqsBackend::calculate_delay_seconds(Some(future_1000s)),
907 Some(900)
908 );
909 }
910
911 #[test]
912 fn test_calculate_delay_seconds_edge_cases() {
913 let now = SystemTime::now()
915 .duration_since(SystemTime::UNIX_EPOCH)
916 .unwrap()
917 .as_secs() as i64;
918 assert_eq!(SqsBackend::calculate_delay_seconds(Some(now)), None);
919
920 let future_900s = now + 900;
922 assert_eq!(
923 SqsBackend::calculate_delay_seconds(Some(future_900s)),
924 Some(900)
925 );
926
927 let future_901s = now + 901;
929 assert_eq!(
930 SqsBackend::calculate_delay_seconds(Some(future_901s)),
931 Some(900)
932 );
933 }
934
935 #[test]
936 fn test_sqs_backend_type_value() {
937 assert_eq!(QueueBackendType::Sqs.as_str(), "sqs");
938 assert_eq!(QueueBackendType::Sqs.to_string(), "sqs");
939 }
940
941 #[test]
942 fn test_queue_url_construction() {
943 let mut queue_urls = HashMap::new();
945 let prefix = "https://sqs.us-east-1.amazonaws.com/123456789/relayer-";
946
947 queue_urls.insert(
948 QueueType::TransactionRequest,
949 format!("{prefix}transaction-request.fifo"),
950 );
951 queue_urls.insert(
952 QueueType::TransactionSubmission,
953 format!("{prefix}transaction-submission.fifo"),
954 );
955 queue_urls.insert(QueueType::StatusCheck, format!("{prefix}status-check.fifo"));
956 queue_urls.insert(
957 QueueType::Notification,
958 format!("{prefix}notification.fifo"),
959 );
960 queue_urls.insert(
961 QueueType::TokenSwapRequest,
962 format!("{prefix}token-swap-request.fifo"),
963 );
964 queue_urls.insert(
965 QueueType::RelayerHealthCheck,
966 format!("{prefix}relayer-health-check.fifo"),
967 );
968 queue_urls.insert(
969 QueueType::StatusCheckEvm,
970 format!("{prefix}status-check-evm.fifo"),
971 );
972 queue_urls.insert(
973 QueueType::StatusCheckStellar,
974 format!("{prefix}status-check-stellar.fifo"),
975 );
976
977 assert_eq!(queue_urls.len(), 8);
979 assert!(queue_urls
980 .get(&QueueType::TransactionRequest)
981 .unwrap()
982 .ends_with(".fifo"));
983 assert!(queue_urls
984 .get(&QueueType::TransactionSubmission)
985 .unwrap()
986 .contains("transaction-submission"));
987 assert!(queue_urls
988 .get(&QueueType::StatusCheckEvm)
989 .unwrap()
990 .contains("status-check-evm"));
991 assert!(queue_urls
992 .get(&QueueType::StatusCheckStellar)
993 .unwrap()
994 .contains("status-check-stellar"));
995 }
996
997 #[test]
998 fn test_queue_url_construction_standard() {
999 let mut queue_urls = HashMap::new();
1001 let prefix = "https://sqs.us-east-1.amazonaws.com/123456789/relayer-";
1002
1003 queue_urls.insert(
1004 QueueType::TransactionRequest,
1005 format!("{prefix}transaction-request"),
1006 );
1007 queue_urls.insert(
1008 QueueType::TransactionSubmission,
1009 format!("{prefix}transaction-submission"),
1010 );
1011 queue_urls.insert(QueueType::StatusCheck, format!("{prefix}status-check"));
1012 queue_urls.insert(QueueType::Notification, format!("{prefix}notification"));
1013 queue_urls.insert(
1014 QueueType::TokenSwapRequest,
1015 format!("{prefix}token-swap-request"),
1016 );
1017 queue_urls.insert(
1018 QueueType::RelayerHealthCheck,
1019 format!("{prefix}relayer-health-check"),
1020 );
1021 queue_urls.insert(
1022 QueueType::StatusCheckEvm,
1023 format!("{prefix}status-check-evm"),
1024 );
1025 queue_urls.insert(
1026 QueueType::StatusCheckStellar,
1027 format!("{prefix}status-check-stellar"),
1028 );
1029
1030 assert_eq!(queue_urls.len(), 8);
1031 for (_, url) in &queue_urls {
1033 assert!(
1034 !url.ends_with(".fifo"),
1035 "Standard queue URL should not end with .fifo: {url}"
1036 );
1037 }
1038 assert!(queue_urls
1039 .get(&QueueType::TransactionRequest)
1040 .unwrap()
1041 .contains("transaction-request"));
1042 }
1043
1044 #[test]
1045 fn test_is_fifo_queue_url_standard() {
1046 assert!(!SqsBackend::is_fifo_queue_url(
1047 "https://sqs.us-east-1.amazonaws.com/123/relayer-transaction-request"
1048 ));
1049 assert!(!SqsBackend::is_fifo_queue_url(
1050 "http://localstack:4566/000000000000/relayer-status-check"
1051 ));
1052 }
1053
1054 #[test]
1055 fn test_transaction_message_group_id_evm_uses_transaction() {
1056 let group = transaction_message_group_id(Some(&NetworkType::Evm), "relayer-1", "tx-123");
1057 assert_eq!(group, "tx-123");
1058 }
1059
1060 #[test]
1061 fn test_transaction_message_group_id_stellar_uses_transaction() {
1062 let group =
1063 transaction_message_group_id(Some(&NetworkType::Stellar), "relayer-1", "tx-123");
1064 assert_eq!(group, "tx-123");
1065 }
1066
1067 #[test]
1068 fn test_transaction_message_group_id_solana_uses_transaction() {
1069 let group = transaction_message_group_id(Some(&NetworkType::Solana), "relayer-1", "tx-123");
1070 assert_eq!(group, "tx-123");
1071 }
1072
1073 #[test]
1074 fn test_transaction_message_group_id_none_defaults_to_transaction() {
1075 let group = transaction_message_group_id(None, "relayer-1", "tx-123");
1076 assert_eq!(
1077 group, "tx-123",
1078 "Unknown network should default to transaction id"
1079 );
1080 }
1081
1082 #[test]
1083 fn test_status_check_queue_type_evm() {
1084 assert_eq!(
1085 status_check_queue_type(Some(&NetworkType::Evm)),
1086 QueueType::StatusCheckEvm
1087 );
1088 }
1089
1090 #[test]
1091 fn test_status_check_queue_type_stellar() {
1092 assert_eq!(
1093 status_check_queue_type(Some(&NetworkType::Stellar)),
1094 QueueType::StatusCheckStellar
1095 );
1096 }
1097
1098 #[test]
1099 fn test_status_check_queue_type_solana_defaults_to_generic() {
1100 assert_eq!(
1101 status_check_queue_type(Some(&NetworkType::Solana)),
1102 QueueType::StatusCheck
1103 );
1104 }
1105
1106 #[test]
1107 fn test_status_check_queue_type_none_defaults_to_generic() {
1108 assert_eq!(status_check_queue_type(None), QueueType::StatusCheck);
1109 }
1110
1111 #[test]
1112 fn test_sqs_max_message_size_constant() {
1113 assert_eq!(SQS_MAX_MESSAGE_SIZE_BYTES, 256 * 1024);
1114 }
1115
1116 #[test]
1117 fn test_is_fifo_queue_url() {
1118 assert!(SqsBackend::is_fifo_queue_url(
1119 "https://sqs.us-east-1.amazonaws.com/123/queue.fifo"
1120 ));
1121 assert!(!SqsBackend::is_fifo_queue_url(
1122 "https://sqs.us-east-1.amazonaws.com/123/queue"
1123 ));
1124 }
1125
1126 const REF_STD: &str = "http://localhost:4566/000000000000/relayer-transaction-request";
1129 const REF_FIFO: &str = "http://localhost:4566/000000000000/relayer-transaction-request.fifo";
1130
1131 #[test]
1132 fn test_resolve_queue_type_explicit_standard() {
1133 let result = resolve_queue_type("standard", None, REF_STD, REF_FIFO);
1134 assert_eq!(result.unwrap(), false);
1135 }
1136
1137 #[test]
1138 fn test_resolve_queue_type_explicit_fifo() {
1139 let result = resolve_queue_type("fifo", None, REF_STD, REF_FIFO);
1140 assert_eq!(result.unwrap(), true);
1141 }
1142
1143 #[test]
1144 fn test_resolve_queue_type_explicit_ignores_probes() {
1145 let result = resolve_queue_type("standard", Some((false, true)), REF_STD, REF_FIFO);
1147 assert_eq!(result.unwrap(), false);
1148 }
1149
1150 #[test]
1151 fn test_resolve_queue_type_auto_standard_only() {
1152 let result = resolve_queue_type("auto", Some((true, false)), REF_STD, REF_FIFO);
1153 assert_eq!(result.unwrap(), false);
1154 }
1155
1156 #[test]
1157 fn test_resolve_queue_type_auto_fifo_only() {
1158 let result = resolve_queue_type("auto", Some((false, true)), REF_STD, REF_FIFO);
1159 assert_eq!(result.unwrap(), true);
1160 }
1161
1162 #[test]
1163 fn test_resolve_queue_type_auto_both_exist_errors() {
1164 let result = resolve_queue_type("auto", Some((true, true)), REF_STD, REF_FIFO);
1165 assert!(result.is_err());
1166 let err = result.unwrap_err().to_string();
1167 assert!(
1168 err.contains("Ambiguous"),
1169 "Expected 'Ambiguous' error, got: {err}"
1170 );
1171 }
1172
1173 #[test]
1174 fn test_resolve_queue_type_auto_neither_exists_errors() {
1175 let result = resolve_queue_type("auto", Some((false, false)), REF_STD, REF_FIFO);
1176 assert!(result.is_err());
1177 let err = result.unwrap_err().to_string();
1178 assert!(
1179 err.contains("No SQS queues found"),
1180 "Expected 'No SQS queues found' error, got: {err}"
1181 );
1182 }
1183
1184 #[test]
1185 fn test_resolve_queue_type_auto_no_probes_defaults_to_neither() {
1186 let result = resolve_queue_type("auto", None, REF_STD, REF_FIFO);
1188 assert!(result.is_err());
1189 }
1190
1191 #[test]
1192 fn test_resolve_queue_type_unknown_value_errors() {
1193 let result = resolve_queue_type("invalid", None, REF_STD, REF_FIFO);
1194 assert!(result.is_err());
1195 let err = result.unwrap_err().to_string();
1196 assert!(
1197 err.contains("Unsupported SQS_QUEUE_TYPE"),
1198 "Expected unsupported error, got: {err}"
1199 );
1200 }
1201
1202 #[test]
1205 fn test_resolve_queue_type_auto_neither_error_includes_urls() {
1206 let std_url = "https://sqs.us-east-1.amazonaws.com/123/relayer-transaction-request";
1207 let fifo_url = "https://sqs.us-east-1.amazonaws.com/123/relayer-transaction-request.fifo";
1208 let result = resolve_queue_type("auto", Some((false, false)), std_url, fifo_url);
1209 let err = result.unwrap_err();
1210 let msg = err.to_string();
1211 assert!(
1212 msg.contains(std_url),
1213 "Error should include standard URL: {msg}"
1214 );
1215 assert!(
1216 msg.contains(fifo_url),
1217 "Error should include FIFO URL: {msg}"
1218 );
1219 }
1220
1221 #[test]
1222 fn test_resolve_queue_type_returns_config_error_variant() {
1223 let result = resolve_queue_type("invalid", None, REF_STD, REF_FIFO);
1224 assert!(
1225 matches!(result, Err(QueueBackendError::ConfigError(_))),
1226 "Expected ConfigError variant"
1227 );
1228
1229 let result = resolve_queue_type("auto", Some((true, true)), REF_STD, REF_FIFO);
1230 assert!(
1231 matches!(result, Err(QueueBackendError::ConfigError(_))),
1232 "Ambiguous case should be ConfigError"
1233 );
1234
1235 let result = resolve_queue_type("auto", Some((false, false)), REF_STD, REF_FIFO);
1236 assert!(
1237 matches!(result, Err(QueueBackendError::ConfigError(_))),
1238 "No queues case should be ConfigError"
1239 );
1240 }
1241
1242 #[test]
1243 fn test_resolve_queue_type_unknown_includes_value_in_error() {
1244 let result = resolve_queue_type("redis", None, REF_STD, REF_FIFO);
1245 let msg = result.unwrap_err().to_string();
1246 assert!(
1247 msg.contains("redis"),
1248 "Error should echo the invalid value: {msg}"
1249 );
1250
1251 let result = resolve_queue_type("", None, REF_STD, REF_FIFO);
1252 assert!(result.is_err(), "Empty string should be rejected");
1253 }
1254
1255 #[test]
1256 fn test_resolve_queue_type_case_sensitive() {
1257 assert!(resolve_queue_type("Standard", None, REF_STD, REF_FIFO).is_err());
1259 assert!(resolve_queue_type("FIFO", None, REF_STD, REF_FIFO).is_err());
1260 assert!(resolve_queue_type("Auto", None, REF_STD, REF_FIFO).is_err());
1261 }
1262
1263 #[test]
1266 fn test_calculate_delay_seconds_one_second_future() {
1267 let future_1s = SystemTime::now()
1268 .duration_since(SystemTime::UNIX_EPOCH)
1269 .unwrap()
1270 .as_secs() as i64
1271 + 2; let result = SqsBackend::calculate_delay_seconds(Some(future_1s));
1273 assert!(result.is_some(), "1-2s in future should yield Some");
1274 assert!(result.unwrap() > 0, "Delay should be positive");
1275 assert!(result.unwrap() <= 2, "Delay should be at most 2s");
1276 }
1277
1278 #[test]
1279 fn test_calculate_delay_seconds_far_past() {
1280 assert_eq!(SqsBackend::calculate_delay_seconds(Some(0)), None);
1282 assert_eq!(SqsBackend::calculate_delay_seconds(Some(-1000)), None);
1284 }
1285
1286 #[test]
1287 fn test_calculate_delay_seconds_very_far_future() {
1288 let far_future = 4_102_444_800_i64; assert_eq!(
1291 SqsBackend::calculate_delay_seconds(Some(far_future)),
1292 Some(900)
1293 );
1294 }
1295
1296 #[test]
1297 fn test_calculate_delay_seconds_exactly_900_boundary() {
1298 let now = SystemTime::now()
1299 .duration_since(SystemTime::UNIX_EPOCH)
1300 .unwrap()
1301 .as_secs() as i64;
1302
1303 let result = SqsBackend::calculate_delay_seconds(Some(now + 899));
1305 assert!(result.is_some());
1306 let val = result.unwrap();
1308 assert!((898..=899).contains(&val), "Expected ~899, got {val}");
1309 }
1310
1311 #[test]
1314 fn test_is_fifo_queue_url_empty() {
1315 assert!(!SqsBackend::is_fifo_queue_url(""));
1316 }
1317
1318 #[test]
1319 fn test_is_fifo_queue_url_case_sensitive() {
1320 assert!(!SqsBackend::is_fifo_queue_url(
1321 "https://sqs.us-east-1.amazonaws.com/123/queue.FIFO"
1322 ));
1323 assert!(!SqsBackend::is_fifo_queue_url(
1324 "https://sqs.us-east-1.amazonaws.com/123/queue.Fifo"
1325 ));
1326 }
1327
1328 #[test]
1329 fn test_is_fifo_queue_url_fifo_in_middle() {
1330 assert!(!SqsBackend::is_fifo_queue_url(
1331 "https://sqs.us-east-1.amazonaws.com/123/.fifo/queue"
1332 ));
1333 }
1334
1335 #[test]
1336 fn test_is_fifo_queue_url_just_suffix() {
1337 assert!(SqsBackend::is_fifo_queue_url(".fifo"));
1338 }
1339
1340 #[test]
1341 fn test_is_fifo_queue_url_localstack() {
1342 assert!(SqsBackend::is_fifo_queue_url(
1343 "http://localhost:4566/000000000000/relayer-tx.fifo"
1344 ));
1345 }
1346
1347 #[test]
1350 fn test_transaction_message_group_id_always_returns_transaction_id() {
1351 let networks: &[Option<NetworkType>] = &[
1354 Some(NetworkType::Evm),
1355 Some(NetworkType::Stellar),
1356 Some(NetworkType::Solana),
1357 None,
1358 ];
1359
1360 for network in networks {
1361 let group = transaction_message_group_id(network.as_ref(), "relayer-99", "tx-abc");
1362 assert_eq!(
1363 group, "tx-abc",
1364 "Expected transaction_id for network {network:?}"
1365 );
1366 }
1367 }
1368
1369 #[test]
1372 fn test_status_check_queue_type_returns_distinct_queue_names() {
1373 let evm = status_check_queue_type(Some(&NetworkType::Evm));
1374 let stellar = status_check_queue_type(Some(&NetworkType::Stellar));
1375 let generic = status_check_queue_type(None);
1376
1377 assert_ne!(evm.queue_name(), stellar.queue_name());
1378 assert_ne!(evm.queue_name(), generic.queue_name());
1379 assert_ne!(stellar.queue_name(), generic.queue_name());
1380 }
1381
1382 #[test]
1383 fn test_status_check_queue_type_all_are_status_checks() {
1384 let networks: &[Option<&NetworkType>] = &[
1385 Some(&NetworkType::Evm),
1386 Some(&NetworkType::Stellar),
1387 Some(&NetworkType::Solana),
1388 None,
1389 ];
1390
1391 for network in networks {
1392 let qt = status_check_queue_type(*network);
1393 assert!(
1394 qt.is_status_check(),
1395 "{qt:?} should be a status check variant"
1396 );
1397 }
1398 }
1399
1400 #[test]
1403 fn test_queue_url_construction_algorithm_fifo() {
1404 let prefix = "https://sqs.us-east-1.amazonaws.com/123456789/relayer-";
1406 let suffix = ".fifo";
1407
1408 let expected_urls = [
1409 (
1410 QueueType::TransactionRequest,
1411 format!("{prefix}transaction-request{suffix}"),
1412 ),
1413 (
1414 QueueType::TransactionSubmission,
1415 format!("{prefix}transaction-submission{suffix}"),
1416 ),
1417 (
1418 QueueType::StatusCheck,
1419 format!("{prefix}status-check{suffix}"),
1420 ),
1421 (
1422 QueueType::StatusCheckEvm,
1423 format!("{prefix}status-check-evm{suffix}"),
1424 ),
1425 (
1426 QueueType::StatusCheckStellar,
1427 format!("{prefix}status-check-stellar{suffix}"),
1428 ),
1429 (
1430 QueueType::Notification,
1431 format!("{prefix}notification{suffix}"),
1432 ),
1433 (
1434 QueueType::TokenSwapRequest,
1435 format!("{prefix}token-swap-request{suffix}"),
1436 ),
1437 (
1438 QueueType::RelayerHealthCheck,
1439 format!("{prefix}relayer-health-check{suffix}"),
1440 ),
1441 ];
1442
1443 for (qt, expected) in &expected_urls {
1444 assert!(
1445 SqsBackend::is_fifo_queue_url(expected),
1446 "{qt:?}: URL should be FIFO: {expected}"
1447 );
1448 }
1449 }
1450
1451 #[test]
1452 fn test_queue_url_construction_algorithm_standard() {
1453 let prefix = "https://sqs.us-east-1.amazonaws.com/123456789/relayer-";
1454 let suffix = "";
1455
1456 let urls = [
1457 format!("{prefix}transaction-request{suffix}"),
1458 format!("{prefix}transaction-submission{suffix}"),
1459 format!("{prefix}status-check{suffix}"),
1460 format!("{prefix}notification{suffix}"),
1461 ];
1462
1463 for url in &urls {
1464 assert!(
1465 !SqsBackend::is_fifo_queue_url(url),
1466 "Standard URL should not be FIFO: {url}"
1467 );
1468 }
1469 }
1470
1471 #[test]
1474 fn test_sqs_max_message_size_matches_aws_limit() {
1475 assert_eq!(SQS_MAX_MESSAGE_SIZE_BYTES, 262_144);
1477 }
1478
1479 #[test]
1482 fn test_resolve_queue_type_suffix_logic() {
1483 let test_cases = [
1485 ("standard", None, false),
1486 ("fifo", None, true),
1487 ("auto", Some((true, false)), false),
1488 ("auto", Some((false, true)), true),
1489 ];
1490
1491 for (sqs_type, probes, expected_fifo) in test_cases {
1492 let is_fifo = resolve_queue_type(sqs_type, probes, REF_STD, REF_FIFO).unwrap();
1493 assert_eq!(is_fifo, expected_fifo, "sqs_type={sqs_type}");
1494
1495 let suffix = if is_fifo { ".fifo" } else { "" };
1496 let url = format!("https://sqs.us-east-1.amazonaws.com/123/relayer-tx{suffix}");
1497 assert_eq!(
1498 SqsBackend::is_fifo_queue_url(&url),
1499 expected_fifo,
1500 "URL FIFO detection mismatch for sqs_type={sqs_type}"
1501 );
1502 }
1503 }
1504
1505 #[tokio::test]
1506 #[ignore]
1507 async fn smoke_push_status_check_to_sqs() {
1508 let backend = SqsBackend::new()
1514 .await
1515 .expect("SQS backend initialization failed");
1516 let job = Job::new(
1517 JobType::TransactionStatusCheck,
1518 TransactionStatusCheck::new("smoke-tx-id", "smoke-relayer", NetworkType::Stellar),
1519 );
1520 let now = SystemTime::now()
1521 .duration_since(SystemTime::UNIX_EPOCH)
1522 .expect("system time before unix epoch")
1523 .as_secs() as i64;
1524 let scheduled_on = Some(now + 2);
1525 let result = backend
1526 .produce_transaction_status_check(job, scheduled_on)
1527 .await;
1528 assert!(
1529 result.is_ok(),
1530 "Expected SendMessage via SQS backend to succeed, got: {result:?}"
1531 );
1532 }
1533}