openzeppelin_relayer/queues/
mod.rs

1//! Queue backend abstraction layer.
2//!
3//! This module provides a backend-agnostic interface for job queue operations.
4//! Implementations can use Redis/Apalis (current) or AWS SQS (new) as the backend.
5//!
6//! # Environment Variables
7//!
8//! - `QUEUE_BACKEND`: Backend to use ("redis" or "sqs", default: "redis")
9//!
10//! # Example
11//!
12//! ```ignore
13//! // Create backend from environment
14//! let backend = create_queue_backend(redis_connections).await?;
15//!
16//! // Produce a job
17//! backend.produce_transaction_request(job, None).await?;
18//!
19//! // Initialize workers
20//! let workers = backend.initialize_workers(app_state).await?;
21//! ```
22
23use async_trait::async_trait;
24use std::sync::Arc;
25
26use crate::{
27    config::ServerConfig,
28    jobs::{
29        Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
30        TransactionSend, TransactionStatusCheck,
31    },
32    models::DefaultAppState,
33    utils::RedisConnections,
34};
35use actix_web::web::ThinData;
36
37pub mod errors;
38pub mod queue_type;
39pub mod redis;
40pub mod retry_config;
41pub mod sqs;
42pub mod swap_filter;
43pub mod worker_types;
44
45pub use errors::QueueBackendError;
46pub use queue_type::QueueType;
47pub use redis::queue::Queue;
48pub use retry_config::{backoff_config_for_queue, retry_delay_secs, status_check_retry_delay_secs};
49pub use swap_filter::filter_relayers_for_swap;
50pub use worker_types::{HandlerError, QueueHealth, WorkerContext, WorkerHandle};
51
52/// Supported queue backend implementations.
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum QueueBackendType {
55    Redis,
56    Sqs,
57}
58
59impl QueueBackendType {
60    pub const fn as_str(self) -> &'static str {
61        match self {
62            Self::Redis => "redis",
63            Self::Sqs => "sqs",
64        }
65    }
66}
67
68impl std::fmt::Display for QueueBackendType {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        f.write_str(self.as_str())
71    }
72}
73
74/// Queue backend abstraction trait.
75///
76/// This trait defines the interface for job queue operations that can be
77/// implemented by different backends (Redis/Apalis, AWS SQS, etc.).
78///
79/// The trait is designed to be backend-agnostic, with no Redis or SQS-specific
80/// types in the interface.
81#[async_trait]
82pub trait QueueBackend: Send + Sync {
83    /// Produces a transaction request job to the queue.
84    ///
85    /// # Arguments
86    /// * `job` - The job to enqueue
87    /// * `scheduled_on` - Optional Unix timestamp for delayed execution
88    ///
89    /// # Returns
90    /// Result with job ID on success, or QueueBackendError on failure
91    async fn produce_transaction_request(
92        &self,
93        job: Job<TransactionRequest>,
94        scheduled_on: Option<i64>,
95    ) -> Result<String, QueueBackendError>;
96
97    /// Produces a transaction submission job to the queue.
98    async fn produce_transaction_submission(
99        &self,
100        job: Job<TransactionSend>,
101        scheduled_on: Option<i64>,
102    ) -> Result<String, QueueBackendError>;
103
104    /// Produces a transaction status check job to the queue.
105    async fn produce_transaction_status_check(
106        &self,
107        job: Job<TransactionStatusCheck>,
108        scheduled_on: Option<i64>,
109    ) -> Result<String, QueueBackendError>;
110
111    /// Produces a notification send job to the queue.
112    async fn produce_notification(
113        &self,
114        job: Job<NotificationSend>,
115        scheduled_on: Option<i64>,
116    ) -> Result<String, QueueBackendError>;
117
118    /// Produces a token swap request job to the queue.
119    async fn produce_token_swap_request(
120        &self,
121        job: Job<TokenSwapRequest>,
122        scheduled_on: Option<i64>,
123    ) -> Result<String, QueueBackendError>;
124
125    /// Produces a relayer health check job to the queue.
126    async fn produce_relayer_health_check(
127        &self,
128        job: Job<RelayerHealthCheck>,
129        scheduled_on: Option<i64>,
130    ) -> Result<String, QueueBackendError>;
131
132    /// Initializes and starts all worker tasks for this backend.
133    ///
134    /// Workers will poll their respective queues and process jobs using
135    /// the provided application state.
136    ///
137    /// # Arguments
138    /// * `app_state` - Application state containing handlers and configuration
139    ///
140    /// # Returns
141    /// Vector of worker handles that can be used to monitor or stop workers
142    async fn initialize_workers(
143        &self,
144        app_state: Arc<ThinData<DefaultAppState>>,
145    ) -> Result<Vec<WorkerHandle>, QueueBackendError>;
146
147    /// Performs a health check on all queues.
148    ///
149    /// Returns health status for each queue, including message counts
150    /// and backend-specific health indicators.
151    async fn health_check(&self) -> Result<Vec<QueueHealth>, QueueBackendError>;
152
153    /// Returns the backend type identifier.
154    fn backend_type(&self) -> QueueBackendType;
155
156    /// Signals all workers to shut down gracefully.
157    ///
158    /// The default implementation is a no-op (e.g. Redis/Apalis workers handle
159    /// shutdown via Monitor's signal handling). SQS backend overrides this to
160    /// broadcast a shutdown signal to all polling loops and cron tasks.
161    fn shutdown(&self) {}
162}
163
164/// Enum-based queue backend storage, following the codebase convention
165/// used by `SignerRepositoryStorage`, `NetworkRepositoryStorage`, etc.
166///
167/// Provides static dispatch over the concrete backend implementations
168/// instead of `dyn QueueBackend` trait objects.
169#[derive(Clone)]
170pub enum QueueBackendStorage {
171    Redis(Box<redis::backend::RedisBackend>),
172    Sqs(sqs::backend::SqsBackend),
173}
174
175impl std::fmt::Debug for QueueBackendStorage {
176    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177        match self {
178            Self::Redis(b) => std::fmt::Debug::fmt(b, f),
179            Self::Sqs(b) => std::fmt::Debug::fmt(b, f),
180        }
181    }
182}
183
184impl QueueBackendStorage {
185    /// Returns a reference to the underlying `Queue` when the backend is Redis.
186    ///
187    /// Returns `None` for non-Redis backends (e.g. SQS) that do not use a `Queue`.
188    pub fn queue(&self) -> Option<&Queue> {
189        match self {
190            Self::Redis(b) => Some(b.queue()),
191            Self::Sqs(_) => None,
192        }
193    }
194
195    /// Returns the Redis connection pools when the backend is Redis.
196    ///
197    /// Delegates to `Queue::redis_connections()`. Returns `None` for non-Redis backends.
198    pub fn redis_connections(&self) -> Option<Arc<RedisConnections>> {
199        self.queue().map(|q| q.redis_connections())
200    }
201}
202
203#[async_trait]
204impl QueueBackend for QueueBackendStorage {
205    async fn produce_transaction_request(
206        &self,
207        job: Job<TransactionRequest>,
208        scheduled_on: Option<i64>,
209    ) -> Result<String, QueueBackendError> {
210        match self {
211            Self::Redis(b) => b.produce_transaction_request(job, scheduled_on).await,
212            Self::Sqs(b) => b.produce_transaction_request(job, scheduled_on).await,
213        }
214    }
215
216    async fn produce_transaction_submission(
217        &self,
218        job: Job<TransactionSend>,
219        scheduled_on: Option<i64>,
220    ) -> Result<String, QueueBackendError> {
221        match self {
222            Self::Redis(b) => b.produce_transaction_submission(job, scheduled_on).await,
223            Self::Sqs(b) => b.produce_transaction_submission(job, scheduled_on).await,
224        }
225    }
226
227    async fn produce_transaction_status_check(
228        &self,
229        job: Job<TransactionStatusCheck>,
230        scheduled_on: Option<i64>,
231    ) -> Result<String, QueueBackendError> {
232        match self {
233            Self::Redis(b) => b.produce_transaction_status_check(job, scheduled_on).await,
234            Self::Sqs(b) => b.produce_transaction_status_check(job, scheduled_on).await,
235        }
236    }
237
238    async fn produce_notification(
239        &self,
240        job: Job<NotificationSend>,
241        scheduled_on: Option<i64>,
242    ) -> Result<String, QueueBackendError> {
243        match self {
244            Self::Redis(b) => b.produce_notification(job, scheduled_on).await,
245            Self::Sqs(b) => b.produce_notification(job, scheduled_on).await,
246        }
247    }
248
249    async fn produce_token_swap_request(
250        &self,
251        job: Job<TokenSwapRequest>,
252        scheduled_on: Option<i64>,
253    ) -> Result<String, QueueBackendError> {
254        match self {
255            Self::Redis(b) => b.produce_token_swap_request(job, scheduled_on).await,
256            Self::Sqs(b) => b.produce_token_swap_request(job, scheduled_on).await,
257        }
258    }
259
260    async fn produce_relayer_health_check(
261        &self,
262        job: Job<RelayerHealthCheck>,
263        scheduled_on: Option<i64>,
264    ) -> Result<String, QueueBackendError> {
265        match self {
266            Self::Redis(b) => b.produce_relayer_health_check(job, scheduled_on).await,
267            Self::Sqs(b) => b.produce_relayer_health_check(job, scheduled_on).await,
268        }
269    }
270
271    async fn initialize_workers(
272        &self,
273        app_state: Arc<ThinData<DefaultAppState>>,
274    ) -> Result<Vec<WorkerHandle>, QueueBackendError> {
275        match self {
276            Self::Redis(b) => b.initialize_workers(app_state).await,
277            Self::Sqs(b) => b.initialize_workers(app_state).await,
278        }
279    }
280
281    async fn health_check(&self) -> Result<Vec<QueueHealth>, QueueBackendError> {
282        match self {
283            Self::Redis(b) => b.health_check().await,
284            Self::Sqs(b) => b.health_check().await,
285        }
286    }
287
288    fn backend_type(&self) -> QueueBackendType {
289        match self {
290            Self::Redis(b) => b.backend_type(),
291            Self::Sqs(b) => b.backend_type(),
292        }
293    }
294
295    fn shutdown(&self) {
296        match self {
297            Self::Redis(b) => b.shutdown(),
298            Self::Sqs(b) => b.shutdown(),
299        }
300    }
301}
302
303/// Creates a queue backend based on the QUEUE_BACKEND environment variable.
304///
305/// # Arguments
306/// * `redis_connections` - Redis connection pools (used by Redis backend, ignored by SQS)
307///
308/// # Environment Variables
309/// - `QUEUE_BACKEND`: Backend to use ("redis" or "sqs", default: "redis")
310///
311/// # Returns
312/// Arc-wrapped `QueueBackendStorage` implementing QueueBackend
313///
314/// # Errors
315/// Returns QueueBackendError::ConfigError if:
316/// - QUEUE_BACKEND contains an unsupported value
317/// - Required backend-specific configuration is missing
318pub async fn create_queue_backend(
319    redis_connections: Arc<RedisConnections>,
320) -> Result<Arc<QueueBackendStorage>, QueueBackendError> {
321    let backend_type = ServerConfig::get_queue_backend();
322
323    let storage = match backend_type.to_lowercase().as_str() {
324        "redis" => {
325            let backend = redis::backend::RedisBackend::new(redis_connections).await?;
326            QueueBackendStorage::Redis(Box::new(backend))
327        }
328        "sqs" => {
329            let backend = sqs::backend::SqsBackend::new().await?;
330            QueueBackendStorage::Sqs(backend)
331        }
332        other => {
333            return Err(QueueBackendError::ConfigError(format!(
334                "Unsupported QUEUE_BACKEND value: {other}. Must be 'redis' or 'sqs'"
335            )));
336        }
337    };
338
339    Ok(Arc::new(storage))
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345
346    #[test]
347    fn test_queue_type_enum_values() {
348        // Ensure all QueueType variants are covered
349        let types = vec![
350            QueueType::TransactionRequest,
351            QueueType::TransactionSubmission,
352            QueueType::StatusCheck,
353            QueueType::StatusCheckEvm,
354            QueueType::StatusCheckStellar,
355            QueueType::Notification,
356            QueueType::TokenSwapRequest,
357            QueueType::RelayerHealthCheck,
358        ];
359
360        for queue_type in types {
361            assert!(!queue_type.queue_name().is_empty());
362            assert!(!queue_type.redis_namespace().is_empty());
363            assert!(queue_type.max_retries() > 0 || queue_type.max_retries() == usize::MAX);
364        }
365    }
366
367    #[test]
368    fn test_queue_type_visibility_timeouts_in_range() {
369        // All visibility timeouts should be within SQS limits (0-43200).
370        let all_types = [
371            QueueType::TransactionRequest,
372            QueueType::TransactionSubmission,
373            QueueType::StatusCheck,
374            QueueType::StatusCheckEvm,
375            QueueType::StatusCheckStellar,
376            QueueType::Notification,
377            QueueType::TokenSwapRequest,
378            QueueType::RelayerHealthCheck,
379        ];
380        for qt in all_types {
381            let vt = qt.visibility_timeout_secs();
382            assert!(vt > 0, "{qt}: visibility timeout must be > 0");
383            assert!(
384                vt <= 43200,
385                "{qt}: visibility timeout {vt}s exceeds SQS max (43200s)"
386            );
387        }
388    }
389
390    #[test]
391    fn test_queue_type_polling_intervals_appropriate() {
392        // Status check should poll most frequently
393        assert_eq!(QueueType::StatusCheck.polling_interval_secs(), 5);
394
395        // Others should be slower
396        assert!(QueueType::TransactionRequest.polling_interval_secs() >= 5);
397        assert!(QueueType::TransactionSubmission.polling_interval_secs() >= 5);
398        assert!(QueueType::Notification.polling_interval_secs() >= 10);
399    }
400
401    #[test]
402    fn test_queue_backend_error_variants() {
403        let errors = vec![
404            QueueBackendError::RedisError("test".to_string()),
405            QueueBackendError::SqsError("test".to_string()),
406            QueueBackendError::SerializationError("test".to_string()),
407            QueueBackendError::ConfigError("test".to_string()),
408            QueueBackendError::QueueNotFound("test".to_string()),
409            QueueBackendError::WorkerInitError("test".to_string()),
410            QueueBackendError::QueueError("test".to_string()),
411        ];
412
413        for error in errors {
414            let error_str = error.to_string();
415            assert!(!error_str.is_empty());
416        }
417    }
418
419    #[test]
420    fn test_queue_backend_type_string_representations() {
421        assert_eq!(QueueBackendType::Redis.as_str(), "redis");
422        assert_eq!(QueueBackendType::Sqs.as_str(), "sqs");
423        assert_eq!(QueueBackendType::Redis.to_string(), "redis");
424        assert_eq!(QueueBackendType::Sqs.to_string(), "sqs");
425    }
426}