1use std::future::Future;
7use std::panic::AssertUnwindSafe;
8use std::sync::Arc;
9use std::time::Duration;
10
11use actix_web::web::ThinData;
12use aws_sdk_sqs::error::{ProvideErrorMetadata, SdkError};
13use aws_sdk_sqs::types::{
14 DeleteMessageBatchRequestEntry, Message, MessageAttributeValue, MessageSystemAttributeName,
15};
16use futures::FutureExt;
17use serde::de::DeserializeOwned;
18use tokio::sync::watch;
19use tokio::task::{JoinHandle, JoinSet};
20use tracing::{debug, error, info, warn};
21
22use crate::queues::{backoff_config_for_queue, retry_delay_secs};
23use crate::{
24 config::ServerConfig,
25 jobs::{
26 notification_handler, relayer_health_check_handler, token_swap_request_handler,
27 transaction_request_handler, transaction_status_handler, transaction_submission_handler,
28 Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
29 TransactionSend, TransactionStatusCheck,
30 },
31};
32
33use super::{HandlerError, WorkerContext};
34use super::{QueueBackendError, QueueType, WorkerHandle};
35
36#[derive(Debug)]
37enum ProcessingError {
38 Retryable(String),
39 Permanent(String),
40}
41
42#[derive(Debug)]
45enum MessageOutcome {
46 Delete { receipt_handle: String },
48 Retain,
51}
52
53pub async fn spawn_worker_for_queue(
67 sqs_client: aws_sdk_sqs::Client,
68 queue_type: QueueType,
69 queue_url: String,
70 app_state: Arc<ThinData<crate::models::DefaultAppState>>,
71 mut shutdown_rx: watch::Receiver<bool>,
72) -> Result<WorkerHandle, QueueBackendError> {
73 let concurrency = get_concurrency_for_queue(queue_type);
74 let max_retries = queue_type.max_retries();
75 let polling_interval = queue_type.polling_interval_secs();
76 let visibility_timeout = queue_type.visibility_timeout_secs();
77 let handler_timeout_secs = handler_timeout_secs(queue_type);
78 let handler_timeout = Duration::from_secs(handler_timeout_secs);
79
80 info!(
81 queue_type = ?queue_type,
82 queue_url = %queue_url,
83 concurrency = concurrency,
84 max_retries = max_retries,
85 polling_interval_secs = polling_interval,
86 visibility_timeout_secs = visibility_timeout,
87 handler_timeout_secs = handler_timeout_secs,
88 "Spawning SQS worker"
89 );
90
91 let handle: JoinHandle<()> = tokio::spawn(async move {
92 let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency));
93 let mut inflight: JoinSet<Option<String>> = JoinSet::new();
94 let mut consecutive_poll_errors: u32 = 0;
95 let mut pending_deletes: Vec<String> = Vec::new();
96
97 loop {
98 while let Some(result) = inflight.try_join_next() {
100 match result {
101 Ok(Some(receipt_handle)) => pending_deletes.push(receipt_handle),
102 Ok(None) => {} Err(e) => {
104 warn!(
105 queue_type = ?queue_type,
106 error = %e,
107 "In-flight task failed"
108 );
109 }
110 }
111 }
112
113 if !pending_deletes.is_empty() {
115 flush_delete_batch(&sqs_client, &queue_url, &pending_deletes, queue_type).await;
116 pending_deletes.clear();
117 }
118
119 if *shutdown_rx.borrow() {
121 info!(queue_type = ?queue_type, "Shutdown signal received, stopping SQS worker");
122 break;
123 }
124
125 let available_permits = semaphore.available_permits();
128 if available_permits == 0 {
129 tokio::select! {
130 _ = tokio::time::sleep(Duration::from_millis(50)) => continue,
131 _ = shutdown_rx.changed() => {
132 info!(queue_type = ?queue_type, "Shutdown signal received, stopping SQS worker");
133 break;
134 }
135 }
136 }
137
138 let batch_size = available_permits.min(10) as i32;
139
140 let messages_result = tokio::select! {
142 result = sqs_client
143 .receive_message()
144 .queue_url(&queue_url)
145 .max_number_of_messages(batch_size) .wait_time_seconds(polling_interval as i32)
147 .visibility_timeout(visibility_timeout as i32)
148 .message_system_attribute_names(MessageSystemAttributeName::ApproximateReceiveCount)
149 .message_system_attribute_names(MessageSystemAttributeName::MessageGroupId)
150 .message_attribute_names("target_scheduled_on")
151 .message_attribute_names("retry_attempt")
152 .send() => result,
153 _ = shutdown_rx.changed() => {
154 info!(queue_type = ?queue_type, "Shutdown signal received during SQS poll, stopping worker");
155 break;
156 }
157 };
158
159 match messages_result {
160 Ok(output) => {
161 if consecutive_poll_errors > 0 {
162 info!(
163 queue_type = ?queue_type,
164 previous_errors = consecutive_poll_errors,
165 "SQS polling recovered after consecutive errors"
166 );
167 }
168 consecutive_poll_errors = 0;
169
170 if let Some(messages) = output.messages {
171 if !messages.is_empty() {
172 debug!(
173 queue_type = ?queue_type,
174 message_count = messages.len(),
175 "Received messages from SQS"
176 );
177
178 for message in messages {
180 let permit = match semaphore.clone().acquire_owned().await {
181 Ok(permit) => permit,
182 Err(err) => {
183 error!(
184 queue_type = ?queue_type,
185 error = %err,
186 "Semaphore closed, stopping SQS worker loop"
187 );
188 return;
189 }
190 };
191 let client = sqs_client.clone();
192 let url = queue_url.clone();
193 let state = app_state.clone();
194
195 inflight.spawn(async move {
196 let _permit = permit; let result = tokio::time::timeout(
199 handler_timeout,
200 AssertUnwindSafe(process_message(
201 client.clone(),
202 message,
203 queue_type,
204 &url,
205 state,
206 max_retries,
207 ))
208 .catch_unwind(),
209 )
210 .await;
211
212 match result {
213 Ok(Ok(Ok(MessageOutcome::Delete { receipt_handle }))) => {
214 Some(receipt_handle)
215 }
216 Ok(Ok(Ok(MessageOutcome::Retain))) => None,
217 Ok(Ok(Err(e))) => {
218 error!(
219 queue_type = ?queue_type,
220 error = %e,
221 "Failed to process message"
222 );
223 None
224 }
225 Ok(Err(panic_info)) => {
226 let msg = panic_info
227 .downcast_ref::<String>()
228 .map(|s| s.as_str())
229 .or_else(|| {
230 panic_info.downcast_ref::<&str>().copied()
231 })
232 .unwrap_or("unknown panic");
233 error!(
234 queue_type = ?queue_type,
235 panic = %msg,
236 "Message handler panicked"
237 );
238 None
239 }
240 Err(_) => {
241 error!(
242 queue_type = ?queue_type,
243 timeout_secs = handler_timeout.as_secs(),
244 "Message handler timed out; message will be retried after visibility timeout"
245 );
246 None
247 }
248 }
249 });
250 }
251 }
252 }
253 }
254 Err(e) => {
255 consecutive_poll_errors = consecutive_poll_errors.saturating_add(1);
256 let backoff_secs = poll_error_backoff_secs(consecutive_poll_errors);
257 let (error_kind, error_code, error_message) = match &e {
258 SdkError::ServiceError(ctx) => {
259 ("service", ctx.err().code(), ctx.err().message())
260 }
261 SdkError::DispatchFailure(_) => ("dispatch", None, None),
262 SdkError::ResponseError(_) => ("response", None, None),
263 SdkError::TimeoutError(_) => ("timeout", None, None),
264 _ => ("other", None, None),
265 };
266 error!(
267 queue_type = ?queue_type,
268 error_kind = error_kind,
269 error_code = error_code.unwrap_or("unknown"),
270 error_message = error_message.unwrap_or("n/a"),
271 error = %e,
272 error_debug = ?e,
273 consecutive_errors = consecutive_poll_errors,
274 backoff_secs = backoff_secs,
275 "Failed to receive messages from SQS, backing off"
276 );
277 tokio::select! {
278 _ = tokio::time::sleep(Duration::from_secs(backoff_secs)) => {}
279 _ = shutdown_rx.changed() => {
280 info!(queue_type = ?queue_type, "Shutdown signal received during backoff, stopping worker");
281 break;
282 }
283 }
284 }
285 }
286 }
287
288 if !inflight.is_empty() {
290 info!(
291 queue_type = ?queue_type,
292 count = inflight.len(),
293 "Draining in-flight tasks before shutdown"
294 );
295 match tokio::time::timeout(Duration::from_secs(30), async {
296 while let Some(result) = inflight.join_next().await {
297 match result {
298 Ok(Some(receipt_handle)) => pending_deletes.push(receipt_handle),
299 Ok(None) => {}
300 Err(e) => {
301 warn!(
302 queue_type = ?queue_type,
303 error = %e,
304 "In-flight task failed during drain"
305 );
306 }
307 }
308 }
309 })
310 .await
311 {
312 Ok(()) => info!(queue_type = ?queue_type, "All in-flight tasks drained"),
313 Err(_) => {
314 warn!(
315 queue_type = ?queue_type,
316 remaining = inflight.len(),
317 "Drain timeout, abandoning remaining tasks"
318 );
319 inflight.abort_all();
320 }
321 }
322 }
323
324 if !pending_deletes.is_empty() {
326 flush_delete_batch(&sqs_client, &queue_url, &pending_deletes, queue_type).await;
327 }
328
329 info!(queue_type = ?queue_type, "SQS worker stopped");
330 });
331
332 Ok(WorkerHandle::Tokio(handle))
333}
334
335async fn process_message(
340 sqs_client: aws_sdk_sqs::Client,
341 message: Message,
342 queue_type: QueueType,
343 queue_url: &str,
344 app_state: Arc<ThinData<crate::models::DefaultAppState>>,
345 max_retries: usize,
346) -> Result<MessageOutcome, QueueBackendError> {
347 let body = message
348 .body()
349 .ok_or_else(|| QueueBackendError::QueueError("Empty message body".to_string()))?;
350
351 let receipt_handle = message
352 .receipt_handle()
353 .ok_or_else(|| QueueBackendError::QueueError("Missing receipt handle".to_string()))?;
354
355 if let Some(target_scheduled_on) = parse_target_scheduled_on(&message) {
357 let now = std::time::SystemTime::now()
358 .duration_since(std::time::SystemTime::UNIX_EPOCH)
359 .map_err(|e| QueueBackendError::QueueError(format!("System clock error: {e}")))?
360 .as_secs() as i64;
361 let remaining = target_scheduled_on - now;
362 if remaining > 0 {
363 let should_delete_original = defer_message(
364 &sqs_client,
365 queue_url,
366 body.to_string(),
367 &message,
368 target_scheduled_on,
369 remaining.min(900) as i32,
370 )
371 .await?;
372
373 debug!(
374 queue_type = ?queue_type,
375 remaining_seconds = remaining,
376 "Deferred scheduled SQS message for next delay hop"
377 );
378 return if should_delete_original {
379 Ok(MessageOutcome::Delete {
380 receipt_handle: receipt_handle.to_string(),
381 })
382 } else {
383 Ok(MessageOutcome::Retain)
384 };
385 }
386 }
387
388 let receive_count = message
390 .attributes()
391 .and_then(|attrs| attrs.get(&MessageSystemAttributeName::ApproximateReceiveCount))
392 .and_then(|count| count.parse::<usize>().ok())
393 .unwrap_or(1);
394 let attempt_number = receive_count.saturating_sub(1);
396 let logical_retry_attempt = parse_retry_attempt(&message).unwrap_or(attempt_number);
399
400 let sqs_message_id = message.message_id().unwrap_or("unknown").to_string();
402
403 debug!(
404 queue_type = ?queue_type,
405 message_id = %sqs_message_id,
406 attempt = attempt_number,
407 receive_count = receive_count,
408 max_retries = max_retries,
409 "Processing message"
410 );
411
412 let result = match queue_type {
414 QueueType::TransactionRequest => {
415 process_job::<TransactionRequest, _, _>(
416 body,
417 app_state,
418 attempt_number,
419 sqs_message_id,
420 "TransactionRequest",
421 transaction_request_handler,
422 )
423 .await
424 }
425 QueueType::TransactionSubmission => {
426 process_job::<TransactionSend, _, _>(
427 body,
428 app_state,
429 attempt_number,
430 sqs_message_id,
431 "TransactionSend",
432 transaction_submission_handler,
433 )
434 .await
435 }
436 QueueType::StatusCheck | QueueType::StatusCheckEvm | QueueType::StatusCheckStellar => {
437 process_job::<TransactionStatusCheck, _, _>(
438 body,
439 app_state,
440 attempt_number,
441 sqs_message_id,
442 "TransactionStatusCheck",
443 transaction_status_handler,
444 )
445 .await
446 }
447 QueueType::Notification => {
448 process_job::<NotificationSend, _, _>(
449 body,
450 app_state,
451 attempt_number,
452 sqs_message_id,
453 "NotificationSend",
454 notification_handler,
455 )
456 .await
457 }
458 QueueType::TokenSwapRequest => {
459 process_job::<TokenSwapRequest, _, _>(
460 body,
461 app_state,
462 attempt_number,
463 sqs_message_id,
464 "TokenSwapRequest",
465 token_swap_request_handler,
466 )
467 .await
468 }
469 QueueType::RelayerHealthCheck => {
470 process_job::<RelayerHealthCheck, _, _>(
471 body,
472 app_state,
473 attempt_number,
474 sqs_message_id,
475 "RelayerHealthCheck",
476 relayer_health_check_handler,
477 )
478 .await
479 }
480 };
481
482 match result {
483 Ok(()) => {
484 debug!(
485 queue_type = ?queue_type,
486 attempt = attempt_number,
487 "Message processed successfully"
488 );
489
490 Ok(MessageOutcome::Delete {
491 receipt_handle: receipt_handle.to_string(),
492 })
493 }
494 Err(ProcessingError::Permanent(e)) => {
495 error!(
496 queue_type = ?queue_type,
497 attempt = attempt_number,
498 error = %e,
499 "Permanent handler failure, message will be deleted"
500 );
501
502 Ok(MessageOutcome::Delete {
503 receipt_handle: receipt_handle.to_string(),
504 })
505 }
506 Err(ProcessingError::Retryable(e)) => {
507 if max_retries != usize::MAX && receive_count > max_retries {
509 error!(
510 queue_type = ?queue_type,
511 attempt = attempt_number,
512 receive_count = receive_count,
513 max_retries = max_retries,
514 error = %e,
515 "Max retries exceeded; message will be automatically moved to DLQ by SQS redrive policy"
516 );
517 return Ok(MessageOutcome::Retain);
518 }
519
520 let delay = if queue_type.is_status_check() {
524 compute_status_retry_delay(body, logical_retry_attempt)
525 } else {
526 retry_delay_secs(backoff_config_for_queue(queue_type), logical_retry_attempt)
527 };
528
529 if is_fifo_queue_url(queue_url) {
532 if let Err(err) = sqs_client
533 .change_message_visibility()
534 .queue_url(queue_url)
535 .receipt_handle(receipt_handle)
536 .visibility_timeout(delay.clamp(1, 900))
537 .send()
538 .await
539 {
540 error!(
541 queue_type = ?queue_type,
542 error = %err,
543 "Failed to set visibility timeout for retry; falling back to existing visibility timeout"
544 );
545 return Ok(MessageOutcome::Retain);
546 }
547
548 debug!(
549 queue_type = ?queue_type,
550 attempt = logical_retry_attempt,
551 delay_seconds = delay,
552 error = %e,
553 "Retry scheduled via visibility timeout"
554 );
555
556 return Ok(MessageOutcome::Retain);
557 }
558
559 let next_retry_attempt = logical_retry_attempt.saturating_add(1);
560
561 if let Err(send_err) = sqs_client
565 .send_message()
566 .queue_url(queue_url)
567 .message_body(body.to_string())
568 .delay_seconds(delay)
569 .message_attributes(
570 "retry_attempt",
571 MessageAttributeValue::builder()
572 .data_type("Number")
573 .string_value(next_retry_attempt.to_string())
574 .build()
575 .map_err(|err| {
576 QueueBackendError::SqsError(format!(
577 "Failed to build retry_attempt attribute: {err}"
578 ))
579 })?,
580 )
581 .send()
582 .await
583 {
584 error!(
585 queue_type = ?queue_type,
586 error = %send_err,
587 "Failed to re-enqueue message; leaving original for visibility timeout retry"
588 );
589 return Ok(MessageOutcome::Retain);
591 }
592
593 debug!(
594 queue_type = ?queue_type,
595 attempt = logical_retry_attempt,
596 delay_seconds = delay,
597 error = %e,
598 "Message re-enqueued with backoff delay"
599 );
600
601 Ok(MessageOutcome::Delete {
603 receipt_handle: receipt_handle.to_string(),
604 })
605 }
606 }
607}
608
609async fn process_job<T, F, Fut>(
612 body: &str,
613 app_state: Arc<ThinData<crate::models::DefaultAppState>>,
614 attempt: usize,
615 task_id: String,
616 type_name: &str,
617 handler: F,
618) -> Result<(), ProcessingError>
619where
620 T: DeserializeOwned,
621 F: FnOnce(Job<T>, ThinData<crate::models::DefaultAppState>, WorkerContext) -> Fut,
622 Fut: Future<Output = Result<(), HandlerError>>,
623{
624 let job: Job<T> = serde_json::from_str(body).map_err(|e| {
625 error!(error = %e, "Failed to deserialize {} job", type_name);
626 ProcessingError::Permanent(format!("Failed to deserialize {type_name} job: {e}"))
628 })?;
629
630 let ctx = WorkerContext::new(attempt, task_id);
631 handler(job, (*app_state).clone(), ctx)
632 .await
633 .map_err(map_handler_error)
634}
635
636fn map_handler_error(error: HandlerError) -> ProcessingError {
637 match error {
638 HandlerError::Abort(msg) => ProcessingError::Permanent(msg),
639 HandlerError::Retry(msg) => ProcessingError::Retryable(msg),
640 }
641}
642
643fn parse_target_scheduled_on(message: &Message) -> Option<i64> {
644 message
645 .message_attributes()
646 .and_then(|attrs| attrs.get("target_scheduled_on"))
647 .and_then(|value| value.string_value())
648 .and_then(|value| value.parse::<i64>().ok())
649}
650
651fn parse_retry_attempt(message: &Message) -> Option<usize> {
652 message
653 .message_attributes()
654 .and_then(|attrs| attrs.get("retry_attempt"))
655 .and_then(|value| value.string_value())
656 .and_then(|value| value.parse::<usize>().ok())
657}
658
659fn is_fifo_queue_url(queue_url: &str) -> bool {
660 queue_url.ends_with(".fifo")
661}
662
663async fn defer_message(
664 sqs_client: &aws_sdk_sqs::Client,
665 queue_url: &str,
666 body: String,
667 message: &Message,
668 target_scheduled_on: i64,
669 delay_seconds: i32,
670) -> Result<bool, QueueBackendError> {
671 if is_fifo_queue_url(queue_url) {
672 let receipt_handle = message.receipt_handle().ok_or_else(|| {
673 QueueBackendError::QueueError(
674 "Cannot defer FIFO message: missing receipt handle".to_string(),
675 )
676 })?;
677
678 sqs_client
679 .change_message_visibility()
680 .queue_url(queue_url)
681 .receipt_handle(receipt_handle)
682 .visibility_timeout(delay_seconds.clamp(1, 900))
683 .send()
684 .await
685 .map_err(|e| {
686 QueueBackendError::SqsError(format!(
687 "Failed to defer FIFO message via visibility timeout: {e}"
688 ))
689 })?;
690
691 return Ok(false);
692 }
693
694 let request = sqs_client
697 .send_message()
698 .queue_url(queue_url)
699 .message_body(body)
700 .delay_seconds(delay_seconds.clamp(1, 900))
701 .message_attributes(
702 "target_scheduled_on",
703 MessageAttributeValue::builder()
704 .data_type("Number")
705 .string_value(target_scheduled_on.to_string())
706 .build()
707 .map_err(|e| {
708 QueueBackendError::SqsError(format!(
709 "Failed to build deferred scheduled attribute: {e}"
710 ))
711 })?,
712 );
713
714 request.send().await.map_err(|e| {
715 QueueBackendError::SqsError(format!("Failed to defer scheduled message: {e}"))
716 })?;
717
718 Ok(true)
719}
720
721#[derive(serde::Deserialize)]
726struct StatusCheckData {
727 network_type: Option<crate::models::NetworkType>,
728}
729
730#[derive(serde::Deserialize)]
735struct PartialStatusCheckJob {
736 data: StatusCheckData,
737}
738
739fn compute_status_retry_delay(body: &str, attempt: usize) -> i32 {
746 let network_type = serde_json::from_str::<PartialStatusCheckJob>(body)
747 .ok()
748 .and_then(|j| j.data.network_type);
749
750 crate::queues::retry_config::status_check_retry_delay_secs(network_type, attempt)
751}
752
753fn get_concurrency_for_queue(queue_type: QueueType) -> usize {
755 let configured = ServerConfig::get_worker_concurrency(
756 queue_type.concurrency_env_key(),
757 queue_type.default_concurrency(),
758 );
759 if configured == 0 {
760 warn!(
761 queue_type = ?queue_type,
762 "Configured concurrency is 0; clamping to 1"
763 );
764 1
765 } else {
766 configured
767 }
768}
769
770fn handler_timeout_secs(queue_type: QueueType) -> u64 {
774 u64::from(queue_type.visibility_timeout_secs().max(1))
775}
776
777const MAX_POLL_BACKOFF_SECS: u64 = 60;
779
780const RECOVERY_PROBE_EVERY: u32 = 4;
784
785fn poll_error_backoff_secs(consecutive_errors: u32) -> u64 {
789 let base: u64 = 5;
790
791 if consecutive_errors >= 7 && consecutive_errors % RECOVERY_PROBE_EVERY == 0 {
794 return base;
795 }
796
797 let exponent = consecutive_errors.saturating_sub(1).min(16);
798 base.saturating_mul(2_u64.saturating_pow(exponent))
799 .min(MAX_POLL_BACKOFF_SECS)
800}
801
802async fn flush_delete_batch(
808 sqs_client: &aws_sdk_sqs::Client,
809 queue_url: &str,
810 batch: &[String],
811 queue_type: QueueType,
812) -> usize {
813 if batch.is_empty() {
814 return 0;
815 }
816
817 let mut deleted = 0;
818
819 for chunk in batch.chunks(10) {
820 let entries: Vec<DeleteMessageBatchRequestEntry> = chunk
821 .iter()
822 .enumerate()
823 .map(|(i, handle)| {
824 DeleteMessageBatchRequestEntry::builder()
825 .id(i.to_string())
826 .receipt_handle(handle)
827 .build()
828 .expect("id and receipt_handle are always set")
829 })
830 .collect();
831
832 match sqs_client
833 .delete_message_batch()
834 .queue_url(queue_url)
835 .set_entries(Some(entries))
836 .send()
837 .await
838 {
839 Ok(output) => {
840 deleted += output.successful().len();
841
842 for f in output.failed() {
843 warn!(
844 queue_type = ?queue_type,
845 id = %f.id(),
846 code = %f.code(),
847 message = f.message().unwrap_or("unknown"),
848 "Batch delete entry failed (message will be redelivered)"
849 );
850 }
851 }
852 Err(e) => {
853 error!(
854 queue_type = ?queue_type,
855 error = %e,
856 batch_size = chunk.len(),
857 "Batch delete API call failed (messages will be redelivered)"
858 );
859 }
860 }
861 }
862
863 deleted
864}
865
866#[cfg(test)]
867mod tests {
868 use super::*;
869
870 #[test]
871 fn test_get_concurrency_for_queue() {
872 let concurrency = get_concurrency_for_queue(QueueType::TransactionRequest);
874 assert!(concurrency > 0);
875
876 let concurrency = get_concurrency_for_queue(QueueType::StatusCheck);
877 assert!(concurrency > 0);
878 }
879
880 #[test]
881 fn test_handler_timeout_secs_is_positive() {
882 let all = [
883 QueueType::TransactionRequest,
884 QueueType::TransactionSubmission,
885 QueueType::StatusCheck,
886 QueueType::StatusCheckEvm,
887 QueueType::StatusCheckStellar,
888 QueueType::Notification,
889 QueueType::TokenSwapRequest,
890 QueueType::RelayerHealthCheck,
891 ];
892 for queue_type in all {
893 assert!(handler_timeout_secs(queue_type) > 0);
894 }
895 }
896
897 #[test]
898 fn test_handler_timeout_secs_uses_visibility_timeout() {
899 assert_eq!(
900 handler_timeout_secs(QueueType::StatusCheckEvm),
901 QueueType::StatusCheckEvm.visibility_timeout_secs() as u64
902 );
903 assert_eq!(
904 handler_timeout_secs(QueueType::Notification),
905 QueueType::Notification.visibility_timeout_secs() as u64
906 );
907 }
908
909 #[test]
910 fn test_parse_target_scheduled_on() {
911 let message = Message::builder().build();
913
914 assert_eq!(parse_target_scheduled_on(&message), None);
916
917 let message = Message::builder()
919 .message_attributes(
920 "target_scheduled_on",
921 MessageAttributeValue::builder()
922 .data_type("Number")
923 .string_value("1234567890")
924 .build()
925 .unwrap(),
926 )
927 .build();
928
929 assert_eq!(parse_target_scheduled_on(&message), Some(1234567890));
930 }
931
932 #[test]
933 fn test_parse_retry_attempt() {
934 let message = Message::builder().build();
935 assert_eq!(parse_retry_attempt(&message), None);
936
937 let message = Message::builder()
938 .message_attributes(
939 "retry_attempt",
940 MessageAttributeValue::builder()
941 .data_type("Number")
942 .string_value("7")
943 .build()
944 .unwrap(),
945 )
946 .build();
947 assert_eq!(parse_retry_attempt(&message), Some(7));
948 }
949
950 #[test]
951 fn test_map_handler_error() {
952 let error = HandlerError::Abort("Validation failed".to_string());
954 let result = map_handler_error(error);
955 assert!(matches!(result, ProcessingError::Permanent(_)));
956
957 let error = HandlerError::Retry("Network timeout".to_string());
959 let result = map_handler_error(error);
960 assert!(matches!(result, ProcessingError::Retryable(_)));
961 }
962
963 #[test]
964 fn test_is_fifo_queue_url() {
965 assert!(is_fifo_queue_url(
966 "https://sqs.us-east-1.amazonaws.com/123/queue.fifo"
967 ));
968 assert!(!is_fifo_queue_url(
969 "https://sqs.us-east-1.amazonaws.com/123/queue"
970 ));
971 }
972
973 #[test]
974 fn test_compute_status_retry_delay_evm() {
975 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"evm"}}"#;
977 assert_eq!(compute_status_retry_delay(body, 0), 8);
978 assert_eq!(compute_status_retry_delay(body, 1), 12);
979 assert_eq!(compute_status_retry_delay(body, 8), 12);
980 }
981
982 #[test]
983 fn test_compute_status_retry_delay_stellar() {
984 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"stellar"}}"#;
985 assert_eq!(compute_status_retry_delay(body, 0), 2);
986 assert_eq!(compute_status_retry_delay(body, 1), 3);
987 assert_eq!(compute_status_retry_delay(body, 8), 3);
988 }
989
990 #[test]
991 fn test_compute_status_retry_delay_solana() {
992 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"solana"}}"#;
993 assert_eq!(compute_status_retry_delay(body, 0), 5);
994 assert_eq!(compute_status_retry_delay(body, 1), 8);
995 assert_eq!(compute_status_retry_delay(body, 8), 8);
996 }
997
998 #[test]
999 fn test_compute_status_retry_delay_missing_network() {
1000 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1"}}"#;
1001 assert_eq!(compute_status_retry_delay(body, 0), 5);
1002 assert_eq!(compute_status_retry_delay(body, 1), 8);
1003 assert_eq!(compute_status_retry_delay(body, 8), 8);
1004 }
1005
1006 #[test]
1007 fn test_compute_status_retry_delay_invalid_body() {
1008 assert_eq!(compute_status_retry_delay("not json", 0), 5);
1009 assert_eq!(compute_status_retry_delay("not json", 1), 8);
1010 assert_eq!(compute_status_retry_delay("not json", 8), 8);
1011 }
1012
1013 #[tokio::test]
1014 async fn test_semaphore_released_on_panic() {
1015 let sem = Arc::new(tokio::sync::Semaphore::new(1));
1016 let permit = sem.clone().acquire_owned().await.unwrap();
1017
1018 let handle = tokio::spawn(async move {
1019 let _permit = permit; let _ = AssertUnwindSafe(async { panic!("test panic") })
1021 .catch_unwind()
1022 .await;
1023 });
1024
1025 handle.await.unwrap();
1026 let _p = tokio::time::timeout(Duration::from_millis(100), sem.acquire())
1028 .await
1029 .expect("permit should be available after panic");
1030 }
1031
1032 #[test]
1033 fn test_poll_error_backoff_secs() {
1034 assert_eq!(poll_error_backoff_secs(1), 5);
1036 assert_eq!(poll_error_backoff_secs(2), 10);
1038 assert_eq!(poll_error_backoff_secs(3), 20);
1040 assert_eq!(poll_error_backoff_secs(4), 40);
1042 assert_eq!(poll_error_backoff_secs(5), 60);
1044 assert_eq!(poll_error_backoff_secs(6), 60);
1045 assert_eq!(poll_error_backoff_secs(7), 60);
1046 assert_eq!(poll_error_backoff_secs(8), 5);
1048 assert_eq!(poll_error_backoff_secs(9), 60);
1049 assert_eq!(poll_error_backoff_secs(12), 5); }
1051
1052 #[test]
1053 fn test_poll_error_backoff_zero_errors() {
1054 assert_eq!(poll_error_backoff_secs(0), 5);
1056 }
1057
1058 #[test]
1059 fn test_poll_error_backoff_recovery_probes() {
1060 for i in (8..=100).step_by(RECOVERY_PROBE_EVERY as usize) {
1062 assert_eq!(
1063 poll_error_backoff_secs(i as u32),
1064 5,
1065 "Expected recovery probe at error {i}"
1066 );
1067 }
1068 }
1069
1070 #[test]
1071 fn test_message_outcome_delete_carries_receipt_handle() {
1072 let handle = "test-receipt-handle-123".to_string();
1073 let outcome = MessageOutcome::Delete {
1074 receipt_handle: handle.clone(),
1075 };
1076 match outcome {
1077 MessageOutcome::Delete { receipt_handle } => {
1078 assert_eq!(receipt_handle, handle);
1079 }
1080 MessageOutcome::Retain => panic!("Expected Delete variant"),
1081 }
1082 }
1083
1084 #[test]
1085 fn test_message_outcome_retain() {
1086 let outcome = MessageOutcome::Retain;
1087 assert!(matches!(outcome, MessageOutcome::Retain));
1088 }
1089
1090 #[test]
1091 fn test_batch_delete_entry_builder() {
1092 let handles = vec![
1095 "receipt-0".to_string(),
1096 "receipt-1".to_string(),
1097 "receipt-2".to_string(),
1098 ];
1099 let entries: Vec<DeleteMessageBatchRequestEntry> = handles
1100 .iter()
1101 .enumerate()
1102 .map(|(i, handle)| {
1103 DeleteMessageBatchRequestEntry::builder()
1104 .id(i.to_string())
1105 .receipt_handle(handle)
1106 .build()
1107 .expect("id and receipt_handle are set")
1108 })
1109 .collect();
1110
1111 assert_eq!(entries.len(), 3);
1112 assert_eq!(entries[0].id(), "0");
1113 assert_eq!(entries[0].receipt_handle(), "receipt-0");
1114 assert_eq!(entries[2].id(), "2");
1115 assert_eq!(entries[2].receipt_handle(), "receipt-2");
1116 }
1117
1118 #[test]
1119 fn test_batch_chunking_logic() {
1120 let handles: Vec<String> = (0..25).map(|i| format!("receipt-{i}")).collect();
1123 let chunks: Vec<&[String]> = handles.chunks(10).collect();
1124
1125 assert_eq!(chunks.len(), 3);
1126 assert_eq!(chunks[0].len(), 10);
1127 assert_eq!(chunks[1].len(), 10);
1128 assert_eq!(chunks[2].len(), 5);
1129 }
1130
1131 #[test]
1132 fn test_outcome_collection_pattern() {
1133 let outcomes = vec![
1136 Some("receipt-1".to_string()), None, Some("receipt-2".to_string()), None, Some("receipt-3".to_string()), ];
1142
1143 let pending_deletes: Vec<String> = outcomes.into_iter().flatten().collect();
1144
1145 assert_eq!(pending_deletes.len(), 3);
1146 assert_eq!(pending_deletes[0], "receipt-1");
1147 assert_eq!(pending_deletes[1], "receipt-2");
1148 assert_eq!(pending_deletes[2], "receipt-3");
1149 }
1150
1151 #[test]
1154 fn test_parse_target_scheduled_on_non_numeric_string() {
1155 let message = Message::builder()
1156 .message_attributes(
1157 "target_scheduled_on",
1158 MessageAttributeValue::builder()
1159 .data_type("String")
1160 .string_value("not-a-number")
1161 .build()
1162 .unwrap(),
1163 )
1164 .build();
1165 assert_eq!(parse_target_scheduled_on(&message), None);
1166 }
1167
1168 #[test]
1169 fn test_parse_target_scheduled_on_empty_string() {
1170 let message = Message::builder()
1171 .message_attributes(
1172 "target_scheduled_on",
1173 MessageAttributeValue::builder()
1174 .data_type("Number")
1175 .string_value("")
1176 .build()
1177 .unwrap(),
1178 )
1179 .build();
1180 assert_eq!(parse_target_scheduled_on(&message), None);
1181 }
1182
1183 #[test]
1184 fn test_parse_target_scheduled_on_negative_value() {
1185 let message = Message::builder()
1186 .message_attributes(
1187 "target_scheduled_on",
1188 MessageAttributeValue::builder()
1189 .data_type("Number")
1190 .string_value("-1000")
1191 .build()
1192 .unwrap(),
1193 )
1194 .build();
1195 assert_eq!(parse_target_scheduled_on(&message), Some(-1000));
1197 }
1198
1199 #[test]
1200 fn test_parse_target_scheduled_on_float_string() {
1201 let message = Message::builder()
1202 .message_attributes(
1203 "target_scheduled_on",
1204 MessageAttributeValue::builder()
1205 .data_type("Number")
1206 .string_value("1234567890.5")
1207 .build()
1208 .unwrap(),
1209 )
1210 .build();
1211 assert_eq!(parse_target_scheduled_on(&message), None);
1213 }
1214
1215 #[test]
1216 fn test_parse_target_scheduled_on_zero() {
1217 let message = Message::builder()
1218 .message_attributes(
1219 "target_scheduled_on",
1220 MessageAttributeValue::builder()
1221 .data_type("Number")
1222 .string_value("0")
1223 .build()
1224 .unwrap(),
1225 )
1226 .build();
1227 assert_eq!(parse_target_scheduled_on(&message), Some(0));
1228 }
1229
1230 #[test]
1231 fn test_parse_target_scheduled_on_wrong_attribute_name() {
1232 let message = Message::builder()
1234 .message_attributes(
1235 "wrong_key",
1236 MessageAttributeValue::builder()
1237 .data_type("Number")
1238 .string_value("1234567890")
1239 .build()
1240 .unwrap(),
1241 )
1242 .build();
1243 assert_eq!(parse_target_scheduled_on(&message), None);
1244 }
1245
1246 #[test]
1249 fn test_parse_retry_attempt_non_numeric_string() {
1250 let message = Message::builder()
1251 .message_attributes(
1252 "retry_attempt",
1253 MessageAttributeValue::builder()
1254 .data_type("String")
1255 .string_value("abc")
1256 .build()
1257 .unwrap(),
1258 )
1259 .build();
1260 assert_eq!(parse_retry_attempt(&message), None);
1261 }
1262
1263 #[test]
1264 fn test_parse_retry_attempt_negative_value() {
1265 let message = Message::builder()
1266 .message_attributes(
1267 "retry_attempt",
1268 MessageAttributeValue::builder()
1269 .data_type("Number")
1270 .string_value("-1")
1271 .build()
1272 .unwrap(),
1273 )
1274 .build();
1275 assert_eq!(parse_retry_attempt(&message), None);
1277 }
1278
1279 #[test]
1280 fn test_parse_retry_attempt_zero() {
1281 let message = Message::builder()
1282 .message_attributes(
1283 "retry_attempt",
1284 MessageAttributeValue::builder()
1285 .data_type("Number")
1286 .string_value("0")
1287 .build()
1288 .unwrap(),
1289 )
1290 .build();
1291 assert_eq!(parse_retry_attempt(&message), Some(0));
1292 }
1293
1294 #[test]
1295 fn test_parse_retry_attempt_large_value() {
1296 let message = Message::builder()
1297 .message_attributes(
1298 "retry_attempt",
1299 MessageAttributeValue::builder()
1300 .data_type("Number")
1301 .string_value("999999")
1302 .build()
1303 .unwrap(),
1304 )
1305 .build();
1306 assert_eq!(parse_retry_attempt(&message), Some(999999));
1307 }
1308
1309 #[test]
1312 fn test_is_fifo_queue_url_empty_string() {
1313 assert!(!is_fifo_queue_url(""));
1314 }
1315
1316 #[test]
1317 fn test_is_fifo_queue_url_just_fifo_suffix() {
1318 assert!(is_fifo_queue_url("my-queue.fifo"));
1319 }
1320
1321 #[test]
1322 fn test_is_fifo_queue_url_fifo_in_middle() {
1323 assert!(!is_fifo_queue_url(
1325 "https://sqs.us-east-1.amazonaws.com/123/.fifo/queue"
1326 ));
1327 }
1328
1329 #[test]
1330 fn test_is_fifo_queue_url_case_sensitive() {
1331 assert!(!is_fifo_queue_url(
1332 "https://sqs.us-east-1.amazonaws.com/123/queue.FIFO"
1333 ));
1334 assert!(!is_fifo_queue_url(
1335 "https://sqs.us-east-1.amazonaws.com/123/queue.Fifo"
1336 ));
1337 }
1338
1339 #[test]
1340 fn test_is_fifo_queue_url_standard_queue_variations() {
1341 assert!(!is_fifo_queue_url(
1342 "https://sqs.us-east-1.amazonaws.com/123456789/my-queue"
1343 ));
1344 assert!(!is_fifo_queue_url(
1345 "https://sqs.eu-west-1.amazonaws.com/123456789/relayer-tx-request"
1346 ));
1347 assert!(!is_fifo_queue_url(
1348 "http://localhost:4566/000000000000/test-queue"
1349 ));
1350 }
1351
1352 #[test]
1353 fn test_is_fifo_queue_url_localstack() {
1354 assert!(is_fifo_queue_url(
1356 "http://localhost:4566/000000000000/test-queue.fifo"
1357 ));
1358 }
1359
1360 #[test]
1363 fn test_map_handler_error_preserves_abort_message() {
1364 let msg = "Validation failed: invalid nonce";
1365 let error = HandlerError::Abort(msg.to_string());
1366 match map_handler_error(error) {
1367 ProcessingError::Permanent(s) => assert_eq!(s, msg),
1368 ProcessingError::Retryable(_) => panic!("Expected Permanent"),
1369 }
1370 }
1371
1372 #[test]
1373 fn test_map_handler_error_preserves_retry_message() {
1374 let msg = "RPC timeout after 30s";
1375 let error = HandlerError::Retry(msg.to_string());
1376 match map_handler_error(error) {
1377 ProcessingError::Retryable(s) => assert_eq!(s, msg),
1378 ProcessingError::Permanent(_) => panic!("Expected Retryable"),
1379 }
1380 }
1381
1382 #[test]
1383 fn test_map_handler_error_empty_message() {
1384 let error = HandlerError::Abort(String::new());
1385 match map_handler_error(error) {
1386 ProcessingError::Permanent(s) => assert!(s.is_empty()),
1387 ProcessingError::Retryable(_) => panic!("Expected Permanent"),
1388 }
1389 }
1390
1391 #[test]
1394 fn test_handler_timeout_secs_matches_visibility_timeout_for_all_queues() {
1395 let all = [
1396 QueueType::TransactionRequest,
1397 QueueType::TransactionSubmission,
1398 QueueType::StatusCheck,
1399 QueueType::StatusCheckEvm,
1400 QueueType::StatusCheckStellar,
1401 QueueType::Notification,
1402 QueueType::TokenSwapRequest,
1403 QueueType::RelayerHealthCheck,
1404 ];
1405 for qt in all {
1406 assert_eq!(
1407 handler_timeout_secs(qt),
1408 qt.visibility_timeout_secs().max(1) as u64,
1409 "{qt:?}: handler timeout should equal max(visibility_timeout, 1)"
1410 );
1411 }
1412 }
1413
1414 #[test]
1417 fn test_get_concurrency_for_queue_all_types_positive() {
1418 let all = [
1419 QueueType::TransactionRequest,
1420 QueueType::TransactionSubmission,
1421 QueueType::StatusCheck,
1422 QueueType::StatusCheckEvm,
1423 QueueType::StatusCheckStellar,
1424 QueueType::Notification,
1425 QueueType::TokenSwapRequest,
1426 QueueType::RelayerHealthCheck,
1427 ];
1428 for qt in all {
1429 assert!(
1430 get_concurrency_for_queue(qt) > 0,
1431 "{qt:?}: concurrency must be positive (clamped to at least 1)"
1432 );
1433 }
1434 }
1435
1436 #[test]
1439 fn test_poll_error_backoff_never_exceeds_max() {
1440 for i in 0..200 {
1441 let backoff = poll_error_backoff_secs(i);
1442 assert!(
1443 backoff <= MAX_POLL_BACKOFF_SECS,
1444 "Error count {i}: backoff {backoff}s exceeds MAX {MAX_POLL_BACKOFF_SECS}s"
1445 );
1446 }
1447 }
1448
1449 #[test]
1450 fn test_poll_error_backoff_u32_max_does_not_overflow() {
1451 let backoff = poll_error_backoff_secs(u32::MAX);
1452 assert!(backoff <= MAX_POLL_BACKOFF_SECS);
1453 assert!(backoff > 0);
1454 }
1455
1456 #[test]
1457 fn test_poll_error_backoff_always_positive() {
1458 for i in 0..200 {
1459 assert!(
1460 poll_error_backoff_secs(i) > 0,
1461 "Error count {i}: backoff must be positive"
1462 );
1463 }
1464 }
1465
1466 #[test]
1467 fn test_poll_error_backoff_monotonic_before_cap() {
1468 let mut prev = poll_error_backoff_secs(0);
1470 for i in 1..=4 {
1471 let curr = poll_error_backoff_secs(i);
1472 assert!(
1473 curr >= prev,
1474 "Backoff should be non-decreasing before cap: {prev} -> {curr} at error {i}"
1475 );
1476 prev = curr;
1477 }
1478 }
1479
1480 #[test]
1483 fn test_max_poll_backoff_is_reasonable() {
1484 assert!(
1485 MAX_POLL_BACKOFF_SECS >= 10,
1486 "Max backoff should be at least 10s to avoid tight error loops"
1487 );
1488 assert!(
1489 MAX_POLL_BACKOFF_SECS <= 300,
1490 "Max backoff should be at most 5 minutes to detect recovery promptly"
1491 );
1492 }
1493
1494 #[test]
1495 fn test_recovery_probe_every_is_valid() {
1496 assert!(
1497 RECOVERY_PROBE_EVERY >= 2,
1498 "Recovery probe interval must be at least 2 to avoid probing every attempt"
1499 );
1500 assert!(
1501 RECOVERY_PROBE_EVERY <= 10,
1502 "Recovery probe interval should not be too large or recovery detection is slow"
1503 );
1504 }
1505
1506 #[test]
1509 fn test_compute_status_retry_delay_very_high_attempt() {
1510 let body = r#"{"message_id":"m1","version":"1","timestamp":"0","job_type":"TransactionStatusCheck","data":{"transaction_id":"tx1","relayer_id":"r1","network_type":"evm"}}"#;
1511 assert_eq!(compute_status_retry_delay(body, 1000), 12);
1513 assert_eq!(compute_status_retry_delay(body, usize::MAX), 12);
1514 }
1515
1516 #[test]
1517 fn test_compute_status_retry_delay_empty_body() {
1518 assert_eq!(compute_status_retry_delay("", 0), 5);
1520 assert_eq!(compute_status_retry_delay("{}", 0), 5);
1521 }
1522
1523 #[test]
1524 fn test_compute_status_retry_delay_partial_json() {
1525 assert_eq!(compute_status_retry_delay(r#"{"data":{}}"#, 0), 5);
1527 assert_eq!(
1528 compute_status_retry_delay(r#"{"data":{"network_type":"evm"}}"#, 0),
1529 8
1530 );
1531 }
1532
1533 #[test]
1536 fn test_partial_status_check_job_deserializes_network_type() {
1537 let body = r#"{"data":{"network_type":"evm","extra_field":"ignored"}}"#;
1538 let parsed: PartialStatusCheckJob = serde_json::from_str(body).unwrap();
1539 assert_eq!(
1540 parsed.data.network_type,
1541 Some(crate::models::NetworkType::Evm)
1542 );
1543 }
1544
1545 #[test]
1546 fn test_partial_status_check_job_handles_missing_network_type() {
1547 let body = r#"{"data":{"transaction_id":"tx1"}}"#;
1548 let parsed: PartialStatusCheckJob = serde_json::from_str(body).unwrap();
1549 assert_eq!(parsed.data.network_type, None);
1550 }
1551
1552 #[test]
1553 fn test_partial_status_check_job_rejects_missing_data() {
1554 let body = r#"{"not_data":{}}"#;
1555 let result = serde_json::from_str::<PartialStatusCheckJob>(body);
1556 assert!(result.is_err());
1557 }
1558
1559 #[test]
1562 fn test_fifo_detection_consistent_with_defer_and_retry_logic() {
1563 let standard = "https://sqs.us-east-1.amazonaws.com/123/relayer-status-check";
1568 let fifo = "https://sqs.us-east-1.amazonaws.com/123/relayer-status-check.fifo";
1569
1570 assert!(!is_fifo_queue_url(standard));
1571 assert!(is_fifo_queue_url(fifo));
1572 }
1573}