openzeppelin_relayer/queues/redis/
backend.rs

1//! Redis backend implementation using Apalis.
2//!
3//! This module provides a Redis/Apalis-backed implementation of the QueueBackend trait.
4//! It wraps the existing Queue structure and delegates to Apalis for job processing.
5
6use std::sync::Arc;
7
8use actix_web::web::ThinData;
9use apalis::prelude::Storage;
10use async_trait::async_trait;
11use tracing::info;
12
13use crate::{
14    jobs::{
15        Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
16        TransactionSend, TransactionStatusCheck,
17    },
18    models::{DefaultAppState, NetworkType},
19    queues::{Queue, QueueBackendType},
20    utils::RedisConnections,
21};
22
23use super::{QueueBackend, QueueBackendError, QueueHealth, QueueType, WorkerHandle};
24
25/// Redis backend using Apalis for job queue operations.
26///
27/// This is a wrapper around the existing Queue implementation that provides
28/// the QueueBackend trait interface. It delegates all operations to the
29/// existing Apalis/Redis infrastructure.
30#[derive(Clone)]
31pub struct RedisBackend {
32    queue: Queue,
33}
34
35impl std::fmt::Debug for RedisBackend {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.debug_struct("RedisBackend")
38            .field("backend_type", &"redis")
39            .finish()
40    }
41}
42
43impl RedisBackend {
44    /// Creates a new Redis backend.
45    ///
46    /// This initializes all Redis-backed queues using the existing Queue::setup method.
47    ///
48    /// # Arguments
49    /// * `redis_connections` - Redis connection pools for queue operations
50    ///
51    /// # Errors
52    /// Returns QueueBackendError if queue setup fails
53    pub async fn new(redis_connections: Arc<RedisConnections>) -> Result<Self, QueueBackendError> {
54        info!("Initializing Redis queue backend");
55
56        let queue = Queue::setup(redis_connections)
57            .await
58            .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
59
60        Ok(Self { queue })
61    }
62
63    /// Returns a reference to the underlying Queue for compatibility with existing code.
64    pub fn queue(&self) -> &Queue {
65        &self.queue
66    }
67}
68
69/// Select status-check queue by network type.
70///
71/// EVM and Stellar use dedicated queues. Solana and unknown network types
72/// use the default status-check queue.
73fn status_check_queue_type(network_type: Option<&NetworkType>) -> QueueType {
74    match network_type {
75        Some(NetworkType::Evm) => QueueType::StatusCheckEvm,
76        Some(NetworkType::Stellar) => QueueType::StatusCheckStellar,
77        _ => QueueType::StatusCheck,
78    }
79}
80
81fn static_redis_health_statuses() -> Vec<QueueHealth> {
82    vec![
83        QueueHealth {
84            queue_type: QueueType::TransactionRequest,
85            messages_visible: 0, // Would need Redis LLEN query
86            messages_in_flight: 0,
87            messages_dlq: 0,
88            backend: "redis".to_string(),
89            is_healthy: true,
90        },
91        QueueHealth {
92            queue_type: QueueType::TransactionSubmission,
93            messages_visible: 0,
94            messages_in_flight: 0,
95            messages_dlq: 0,
96            backend: "redis".to_string(),
97            is_healthy: true,
98        },
99        QueueHealth {
100            queue_type: QueueType::StatusCheck,
101            messages_visible: 0,
102            messages_in_flight: 0,
103            messages_dlq: 0,
104            backend: "redis".to_string(),
105            is_healthy: true,
106        },
107        QueueHealth {
108            queue_type: QueueType::StatusCheckEvm,
109            messages_visible: 0,
110            messages_in_flight: 0,
111            messages_dlq: 0,
112            backend: "redis".to_string(),
113            is_healthy: true,
114        },
115        QueueHealth {
116            queue_type: QueueType::StatusCheckStellar,
117            messages_visible: 0,
118            messages_in_flight: 0,
119            messages_dlq: 0,
120            backend: "redis".to_string(),
121            is_healthy: true,
122        },
123        QueueHealth {
124            queue_type: QueueType::Notification,
125            messages_visible: 0,
126            messages_in_flight: 0,
127            messages_dlq: 0,
128            backend: "redis".to_string(),
129            is_healthy: true,
130        },
131        QueueHealth {
132            queue_type: QueueType::TokenSwapRequest,
133            messages_visible: 0,
134            messages_in_flight: 0,
135            messages_dlq: 0,
136            backend: "redis".to_string(),
137            is_healthy: true,
138        },
139        QueueHealth {
140            queue_type: QueueType::RelayerHealthCheck,
141            messages_visible: 0,
142            messages_in_flight: 0,
143            messages_dlq: 0,
144            backend: "redis".to_string(),
145            is_healthy: true,
146        },
147    ]
148}
149
150#[async_trait]
151impl QueueBackend for RedisBackend {
152    async fn produce_transaction_request(
153        &self,
154        job: Job<TransactionRequest>,
155        scheduled_on: Option<i64>,
156    ) -> Result<String, QueueBackendError> {
157        let mut storage = self.queue.transaction_request_queue.clone();
158        let job_id = job.message_id.clone();
159
160        match scheduled_on {
161            Some(on) => {
162                storage
163                    .schedule(job, on)
164                    .await
165                    .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
166            }
167            None => {
168                storage
169                    .push(job)
170                    .await
171                    .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
172            }
173        }
174
175        Ok(job_id)
176    }
177
178    async fn produce_transaction_submission(
179        &self,
180        job: Job<TransactionSend>,
181        scheduled_on: Option<i64>,
182    ) -> Result<String, QueueBackendError> {
183        let mut storage = self.queue.transaction_submission_queue.clone();
184        let job_id = job.message_id.clone();
185
186        match scheduled_on {
187            Some(on) => {
188                storage
189                    .schedule(job, on)
190                    .await
191                    .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
192            }
193            None => {
194                storage
195                    .push(job)
196                    .await
197                    .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
198            }
199        }
200
201        Ok(job_id)
202    }
203
204    async fn produce_transaction_status_check(
205        &self,
206        job: Job<TransactionStatusCheck>,
207        scheduled_on: Option<i64>,
208    ) -> Result<String, QueueBackendError> {
209        // Route by network_type to preserve existing Redis queue behavior.
210        let mut storage = match status_check_queue_type(job.data.network_type.as_ref()) {
211            QueueType::StatusCheckEvm => self.queue.transaction_status_queue_evm.clone(),
212            QueueType::StatusCheckStellar => self.queue.transaction_status_queue_stellar.clone(),
213            _ => self.queue.transaction_status_queue.clone(),
214        };
215        let job_id = job.message_id.clone();
216
217        match scheduled_on {
218            Some(on) => {
219                storage
220                    .schedule(job, on)
221                    .await
222                    .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
223            }
224            None => {
225                storage
226                    .push(job)
227                    .await
228                    .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
229            }
230        }
231
232        Ok(job_id)
233    }
234
235    async fn produce_notification(
236        &self,
237        job: Job<NotificationSend>,
238        scheduled_on: Option<i64>,
239    ) -> Result<String, QueueBackendError> {
240        let mut storage = self.queue.notification_queue.clone();
241        let job_id = job.message_id.clone();
242
243        match scheduled_on {
244            Some(on) => {
245                storage
246                    .schedule(job, on)
247                    .await
248                    .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
249            }
250            None => {
251                storage
252                    .push(job)
253                    .await
254                    .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
255            }
256        }
257
258        Ok(job_id)
259    }
260
261    async fn produce_token_swap_request(
262        &self,
263        job: Job<TokenSwapRequest>,
264        scheduled_on: Option<i64>,
265    ) -> Result<String, QueueBackendError> {
266        let mut storage = self.queue.token_swap_request_queue.clone();
267        let job_id = job.message_id.clone();
268
269        match scheduled_on {
270            Some(on) => {
271                storage
272                    .schedule(job, on)
273                    .await
274                    .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
275            }
276            None => {
277                storage
278                    .push(job)
279                    .await
280                    .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
281            }
282        }
283
284        Ok(job_id)
285    }
286
287    async fn produce_relayer_health_check(
288        &self,
289        job: Job<RelayerHealthCheck>,
290        scheduled_on: Option<i64>,
291    ) -> Result<String, QueueBackendError> {
292        let mut storage = self.queue.relayer_health_check_queue.clone();
293        let job_id = job.message_id.clone();
294
295        match scheduled_on {
296            Some(on) => {
297                storage
298                    .schedule(job, on)
299                    .await
300                    .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
301            }
302            None => {
303                storage
304                    .push(job)
305                    .await
306                    .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
307            }
308        }
309
310        Ok(job_id)
311    }
312
313    async fn initialize_workers(
314        &self,
315        app_state: Arc<ThinData<DefaultAppState>>,
316    ) -> Result<Vec<WorkerHandle>, QueueBackendError> {
317        info!("Initializing Redis backend workers");
318
319        super::redis_worker::initialize_redis_workers((*app_state).clone())
320            .await
321            .map_err(|e| QueueBackendError::WorkerInitError(e.to_string()))?;
322
323        super::redis_worker::initialize_redis_token_swap_workers((*app_state).clone())
324            .await
325            .map_err(|e| QueueBackendError::WorkerInitError(e.to_string()))?;
326
327        // Apalis workers are owned by the monitors; no explicit
328        // worker handles are returned from that flow.
329        Ok(vec![])
330    }
331
332    async fn health_check(&self) -> Result<Vec<QueueHealth>, QueueBackendError> {
333        // Intentionally avoid per-request Redis queue depth calls here to keep
334        // health checks lightweight and avoid adding pressure to Redis.
335        // Return static backend health metadata only.
336        Ok(static_redis_health_statuses())
337    }
338
339    fn backend_type(&self) -> QueueBackendType {
340        QueueBackendType::Redis
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347    use crate::jobs::{Job, JobType, TransactionRequest};
348    use crate::models::NetworkType;
349    use crate::queues::QueueType;
350
351    #[test]
352    fn test_backend_type_logic() {
353        // Test that backend_type returns Redis without requiring a Queue instance
354        // This tests the logic, not the actual implementation
355        assert_eq!(QueueBackendType::Redis.as_str(), "redis");
356        assert_eq!(QueueBackendType::Redis.to_string(), "redis");
357    }
358
359    #[test]
360    fn test_produce_transaction_status_check_routing_logic() {
361        assert_eq!(
362            status_check_queue_type(Some(&NetworkType::Evm)),
363            QueueType::StatusCheckEvm
364        );
365        assert_eq!(
366            status_check_queue_type(Some(&NetworkType::Stellar)),
367            QueueType::StatusCheckStellar
368        );
369        assert_eq!(
370            status_check_queue_type(Some(&NetworkType::Solana)),
371            QueueType::StatusCheck
372        );
373        assert_eq!(status_check_queue_type(None), QueueType::StatusCheck);
374    }
375
376    #[test]
377    fn test_job_id_extraction() {
378        // Test that job IDs are correctly extracted from jobs
379        let job = Job::new(
380            JobType::TransactionRequest,
381            TransactionRequest::new("tx1", "relayer1"),
382        );
383
384        // Verify job has a message_id
385        assert!(!job.message_id.is_empty());
386        assert_eq!(job.message_id.len(), 36); // UUID v4 format
387
388        // Verify job_id can be cloned (used in produce methods)
389        let job_id = job.message_id.clone();
390        assert_eq!(job_id, job.message_id);
391    }
392
393    #[test]
394    fn test_scheduled_on_handling() {
395        // Test that scheduled_on timestamps are handled correctly
396        // None means immediate execution, Some(timestamp) means scheduled
397
398        let now = std::time::SystemTime::now()
399            .duration_since(std::time::UNIX_EPOCH)
400            .unwrap()
401            .as_secs() as i64;
402
403        // Immediate execution
404        let immediate: Option<i64> = None;
405        assert_eq!(immediate, None);
406
407        // Scheduled execution
408        let scheduled: Option<i64> = Some(now + 60);
409        assert!(scheduled.is_some());
410        assert!(scheduled.unwrap() > now);
411
412        // Past timestamp should still be Some (handled by queue backend)
413        let past: Option<i64> = Some(now - 10);
414        assert!(past.is_some());
415    }
416
417    #[test]
418    fn test_health_check_structure() {
419        // Test the structure of health check responses without requiring Redis
420        // This verifies the expected format and fields
421
422        let expected_queue_types = vec![
423            QueueType::TransactionRequest,
424            QueueType::TransactionSubmission,
425            QueueType::StatusCheck,
426            QueueType::StatusCheckEvm,
427            QueueType::StatusCheckStellar,
428            QueueType::Notification,
429            QueueType::TokenSwapRequest,
430            QueueType::RelayerHealthCheck,
431        ];
432
433        // Verify all expected queue types exist
434        assert_eq!(expected_queue_types.len(), 8);
435
436        // Verify QueueType implements required traits
437        for queue_type in &expected_queue_types {
438            assert!(!queue_type.queue_name().is_empty());
439            assert!(!queue_type.redis_namespace().is_empty());
440        }
441
442        let statuses = static_redis_health_statuses();
443        assert_eq!(statuses.len(), expected_queue_types.len());
444        for queue_type in expected_queue_types {
445            assert!(statuses.iter().any(|h| h.queue_type == queue_type));
446        }
447        assert!(statuses.iter().all(|h| h.backend == "redis"));
448        assert!(statuses.iter().all(|h| h.is_healthy));
449    }
450
451    #[test]
452    fn test_static_redis_health_statuses_have_zero_counts() {
453        let statuses = static_redis_health_statuses();
454        assert!(!statuses.is_empty());
455        for status in statuses {
456            assert_eq!(status.messages_visible, 0);
457            assert_eq!(status.messages_in_flight, 0);
458            assert_eq!(status.messages_dlq, 0);
459        }
460    }
461}