openzeppelin_relayer/queues/redis/
queue.rs

1//! Queue management module for job processing.
2//!
3//! This module provides Redis-backed queue implementation for handling different types of jobs:
4//! - Transaction requests
5//! - Transaction submissions
6//! - Transaction status checks
7//! - Notifications
8//! - Solana swap requests
9//! - Relayer health checks
10use std::{env, sync::Arc};
11
12use apalis_redis::{Config, RedisStorage};
13use color_eyre::{eyre, Result};
14use redis::aio::{ConnectionManager, ConnectionManagerConfig};
15use serde::{Deserialize, Serialize};
16use tokio::time::Duration;
17use tracing::info;
18
19use crate::{config::ServerConfig, utils::RedisConnections};
20
21use crate::jobs::{
22    Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
23    TransactionSend, TransactionStatusCheck,
24};
25
26#[derive(Clone)]
27pub struct Queue {
28    pub transaction_request_queue: RedisStorage<Job<TransactionRequest>>,
29    pub transaction_submission_queue: RedisStorage<Job<TransactionSend>>,
30    /// Default/fallback status queue for backward compatibility, Solana, and future networks
31    pub transaction_status_queue: RedisStorage<Job<TransactionStatusCheck>>,
32    /// EVM-specific status queue with slower retries
33    pub transaction_status_queue_evm: RedisStorage<Job<TransactionStatusCheck>>,
34    /// Stellar-specific status queue with fast retries
35    pub transaction_status_queue_stellar: RedisStorage<Job<TransactionStatusCheck>>,
36    pub notification_queue: RedisStorage<Job<NotificationSend>>,
37    pub token_swap_request_queue: RedisStorage<Job<TokenSwapRequest>>,
38    pub relayer_health_check_queue: RedisStorage<Job<RelayerHealthCheck>>,
39    /// Redis connection pools for handlers that need pool-based access.
40    /// Provides both primary (write) and reader (read) pools for:
41    /// - Distributed locking
42    /// - Status check metadata (failure counters)
43    /// - Any handler-specific Redis operations
44    redis_connections: Arc<RedisConnections>,
45}
46
47impl std::fmt::Debug for Queue {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("Queue")
50            .field("transaction_request_queue", &"RedisStorage<...>")
51            .field("transaction_submission_queue", &"RedisStorage<...>")
52            .field("transaction_status_queue", &"RedisStorage<...>")
53            .field("transaction_status_queue_evm", &"RedisStorage<...>")
54            .field("transaction_status_queue_stellar", &"RedisStorage<...>")
55            .field("notification_queue", &"RedisStorage<...>")
56            .field("token_swap_request_queue", &"RedisStorage<...>")
57            .field("relayer_health_check_queue", &"RedisStorage<...>")
58            .field("redis_connections", &"RedisConnections")
59            .finish()
60    }
61}
62
63/// Configuration for queue storage tuning.
64#[derive(Clone, Debug)]
65struct QueueConfig {
66    /// How often to move scheduled jobs to active queue (default: 30s)
67    enqueue_scheduled: Duration,
68}
69
70impl Default for QueueConfig {
71    fn default() -> Self {
72        Self {
73            enqueue_scheduled: Duration::from_secs(30),
74        }
75    }
76}
77
78impl QueueConfig {
79    /// - Faster poll interval for lower latency
80    fn high_frequency() -> Self {
81        Self {
82            enqueue_scheduled: Duration::from_secs(2),
83        }
84    }
85
86    /// Configuration for lower-frequency queues.
87    /// - Smaller buffer (less memory pressure)
88    /// - Slower scheduled job polling (reduces Redis load)
89    fn low_frequency() -> Self {
90        Self {
91            enqueue_scheduled: Duration::from_secs(20),
92        }
93    }
94}
95
96impl Queue {
97    /// Creates a RedisStorage for a specific job type using a ConnectionManager.
98    ///
99    /// # Arguments
100    /// * `namespace` - Redis key namespace for this queue
101    /// * `conn` - ConnectionManager with auto-reconnect
102    /// * `queue_config` - Tuning parameters for this queue
103    ///
104    /// ConnectionManager provides automatic reconnection on connection failures,
105    /// ensuring queue processing continues even if the Redis connection drops temporarily.
106    fn storage<T: Serialize + for<'de> Deserialize<'de>>(
107        namespace: &str,
108        conn: ConnectionManager,
109        queue_config: QueueConfig,
110    ) -> RedisStorage<T> {
111        let config = Config::default()
112            .set_namespace(namespace)
113            .set_enqueue_scheduled(queue_config.enqueue_scheduled);
114
115        RedisStorage::new_with_config(conn, config)
116    }
117
118    /// Creates a ConnectionManager with the standard queue configuration.
119    ///
120    /// Each ConnectionManager represents a single Redis connection with auto-reconnect.
121    /// Creating separate managers for different queue types enables parallel Redis operations.
122    async fn create_connection_manager(
123        client: &redis::Client,
124        queue_timeout: Duration,
125    ) -> Result<ConnectionManager> {
126        let conn_config = ConnectionManagerConfig::new()
127            .set_connection_timeout(queue_timeout)
128            .set_response_timeout(queue_timeout)
129            .set_number_of_retries(2)
130            .set_max_delay(1000);
131
132        ConnectionManager::new_with_config(client.clone(), conn_config)
133            .await
134            .map_err(|e| eyre::eyre!("Failed to create Redis connection manager: {}", e))
135    }
136
137    /// Sets up all job queues with properly configured Redis connections.
138    ///
139    /// # Architecture
140    /// - **Queue storages**: Each queue gets its own `ConnectionManager` for maximum parallelism
141    /// - **Handler operations**: Use `redis_connections` pool for metadata, locking, counters
142    ///
143    /// # Connection Strategy
144    /// Each queue has a dedicated Redis connection to prevent contention under high throughput.
145    /// This allows 8 parallel Redis operations (one per queue type).
146    ///
147    /// # Arguments
148    /// * `redis_connections` - Redis connection pools for handler operations.
149    ///
150    /// # Connection Configuration
151    /// - `connection_timeout`: Max time to establish TCP connection to Redis
152    /// - `response_timeout`: Max time to wait for Redis command responses
153    /// - Auto-reconnect: ConnectionManager automatically reconnects on failures
154    pub async fn setup(redis_connections: Arc<RedisConnections>) -> Result<Self> {
155        let server_config = ServerConfig::from_env();
156        let redis_url = &server_config.redis_url;
157
158        // Create Redis client
159        let client = redis::Client::open(redis_url.as_str())
160            .map_err(|e| eyre::eyre!("Failed to create Redis client for queue: {}", e))?;
161
162        // Configure timeout for all ConnectionManagers
163        // Worst case calculation: 3 attempts × 5s timeout + ~0.3s backoff = ~15.3s
164        let queue_timeout = Duration::from_secs(5);
165
166        // Create one ConnectionManager per queue to prevent connection contention.
167        // Each ConnectionManager is a single Redis connection with auto-reconnect.
168        let conn_tx_request = Self::create_connection_manager(&client, queue_timeout).await?;
169        let conn_tx_submit = Self::create_connection_manager(&client, queue_timeout).await?;
170        let conn_status = Self::create_connection_manager(&client, queue_timeout).await?;
171        let conn_status_evm = Self::create_connection_manager(&client, queue_timeout).await?;
172        let conn_status_stellar = Self::create_connection_manager(&client, queue_timeout).await?;
173        let conn_notification = Self::create_connection_manager(&client, queue_timeout).await?;
174        let conn_swap = Self::create_connection_manager(&client, queue_timeout).await?;
175        let conn_health = Self::create_connection_manager(&client, queue_timeout).await?;
176
177        info!(
178            redis_url = %redis_url,
179            connection_timeout_ms = 5000,
180            response_timeout_ms = 5000,
181            retries = 2,
182            max_backoff_ms = 1000,
183            connection_count = 8,
184            "Queue setup: created dedicated ConnectionManager per queue"
185        );
186
187        // use REDIS_KEY_PREFIX only if set, otherwise do not use it
188        let redis_key_prefix = env::var("REDIS_KEY_PREFIX")
189            .ok()
190            .filter(|v| !v.is_empty())
191            .map(|value| format!("{value}:queue:"))
192            .unwrap_or_default();
193
194        // Queue configurations:
195        // - High-frequency: transaction_status (critical path)
196        // - Low-frequency: request, submission, notifications, health checks, swaps
197        let high_frequency = QueueConfig::high_frequency();
198        let low_frequency = QueueConfig::low_frequency();
199
200        Ok(Self {
201            transaction_request_queue: Self::storage(
202                &format!("{redis_key_prefix}transaction_request_queue"),
203                conn_tx_request,
204                low_frequency.clone(), // scheduling not used
205            ),
206            transaction_submission_queue: Self::storage(
207                &format!("{redis_key_prefix}transaction_submission_queue"),
208                conn_tx_submit,
209                low_frequency.clone(), // scheduling not used
210            ),
211            transaction_status_queue: Self::storage(
212                &format!("{redis_key_prefix}transaction_status_queue"),
213                conn_status,
214                high_frequency.clone(),
215            ),
216            transaction_status_queue_evm: Self::storage(
217                &format!("{redis_key_prefix}transaction_status_queue_evm"),
218                conn_status_evm,
219                high_frequency.clone(),
220            ),
221            transaction_status_queue_stellar: Self::storage(
222                &format!("{redis_key_prefix}transaction_status_queue_stellar"),
223                conn_status_stellar,
224                high_frequency.clone(),
225            ),
226            // Lower-frequency queues
227            notification_queue: Self::storage(
228                &format!("{redis_key_prefix}notification_queue"),
229                conn_notification,
230                low_frequency.clone(), // scheduling not used
231            ),
232            token_swap_request_queue: Self::storage(
233                &format!("{redis_key_prefix}token_swap_request_queue"),
234                conn_swap,
235                low_frequency.clone(), // scheduling not used
236            ),
237            relayer_health_check_queue: Self::storage(
238                &format!("{redis_key_prefix}relayer_health_check_queue"),
239                conn_health,
240                low_frequency.clone(), // scheduling not used
241            ),
242            redis_connections,
243        })
244    }
245
246    /// Returns the Redis connection pools.
247    ///
248    /// This provides access to both primary and reader pools for handlers
249    /// that need Redis pool-based access (e.g., for metadata storage, distributed locking).
250    pub fn redis_connections(&self) -> Arc<RedisConnections> {
251        self.redis_connections.clone()
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258
259    #[test]
260    fn test_queue_storage_configuration() {
261        // Test the config creation logic without actual Redis connections
262        let namespace = "test_namespace";
263        let config = Config::default().set_namespace(namespace);
264
265        assert_eq!(config.get_namespace(), namespace);
266    }
267
268    // Mock version of Queue for testing
269    #[derive(Clone, Debug)]
270    struct MockQueue {
271        pub namespace_transaction_request: String,
272        pub namespace_transaction_submission: String,
273        pub namespace_transaction_status: String,
274        pub namespace_transaction_status_evm: String,
275        pub namespace_transaction_status_stellar: String,
276        pub namespace_notification: String,
277        pub namespace_token_swap_request_queue: String,
278        pub namespace_relayer_health_check_queue: String,
279    }
280
281    impl MockQueue {
282        fn new() -> Self {
283            Self {
284                namespace_transaction_request: "transaction_request_queue".to_string(),
285                namespace_transaction_submission: "transaction_submission_queue".to_string(),
286                namespace_transaction_status: "transaction_status_queue".to_string(),
287                namespace_transaction_status_evm: "transaction_status_queue_evm".to_string(),
288                namespace_transaction_status_stellar: "transaction_status_queue_stellar"
289                    .to_string(),
290                namespace_notification: "notification_queue".to_string(),
291                namespace_token_swap_request_queue: "token_swap_request_queue".to_string(),
292                namespace_relayer_health_check_queue: "relayer_health_check_queue".to_string(),
293            }
294        }
295    }
296
297    #[test]
298    fn test_queue_namespaces() {
299        let mock_queue = MockQueue::new();
300
301        assert_eq!(
302            mock_queue.namespace_transaction_request,
303            "transaction_request_queue"
304        );
305        assert_eq!(
306            mock_queue.namespace_transaction_submission,
307            "transaction_submission_queue"
308        );
309        assert_eq!(
310            mock_queue.namespace_transaction_status,
311            "transaction_status_queue"
312        );
313        assert_eq!(
314            mock_queue.namespace_transaction_status_evm,
315            "transaction_status_queue_evm"
316        );
317        assert_eq!(
318            mock_queue.namespace_transaction_status_stellar,
319            "transaction_status_queue_stellar"
320        );
321        assert_eq!(mock_queue.namespace_notification, "notification_queue");
322        assert_eq!(
323            mock_queue.namespace_token_swap_request_queue,
324            "token_swap_request_queue"
325        );
326        assert_eq!(
327            mock_queue.namespace_relayer_health_check_queue,
328            "relayer_health_check_queue"
329        );
330    }
331
332    #[test]
333    fn test_queue_config_with_prefix() {
334        // Test that namespace includes prefix when set
335        let prefix = "myprefix:queue:";
336        let queue_name = "transaction_request_queue";
337        let full_namespace = format!("{prefix}{queue_name}");
338
339        let config = Config::default().set_namespace(&full_namespace);
340        assert_eq!(
341            config.get_namespace(),
342            "myprefix:queue:transaction_request_queue"
343        );
344    }
345
346    #[test]
347    fn test_queue_config_without_prefix() {
348        // Test that namespace works without prefix
349        let queue_name = "transaction_request_queue";
350
351        let config = Config::default().set_namespace(queue_name);
352        assert_eq!(config.get_namespace(), "transaction_request_queue");
353    }
354
355    #[test]
356    fn test_queue_config_default() {
357        let config = QueueConfig::default();
358
359        assert_eq!(config.enqueue_scheduled, Duration::from_secs(30));
360    }
361
362    #[test]
363    fn test_queue_config_high_throughput() {
364        let config = QueueConfig::high_frequency();
365
366        // High frequency should have faster enqueue_scheduled
367        assert_eq!(config.enqueue_scheduled, Duration::from_secs(2));
368
369        // Verify it's faster than default
370        let default = QueueConfig::default();
371        assert!(config.enqueue_scheduled < default.enqueue_scheduled);
372    }
373
374    #[test]
375    fn test_queue_config_low_frequency() {
376        let config = QueueConfig::low_frequency();
377
378        assert_eq!(config.enqueue_scheduled, Duration::from_secs(20));
379
380        // Low frequency should have longer enqueue_scheduled than high frequency
381        let high = QueueConfig::high_frequency();
382        assert!(config.enqueue_scheduled > high.enqueue_scheduled);
383    }
384}