openzeppelin_relayer/queues/redis/
backend.rs1use std::sync::Arc;
7
8use actix_web::web::ThinData;
9use apalis::prelude::Storage;
10use async_trait::async_trait;
11use tracing::info;
12
13use crate::{
14 jobs::{
15 Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
16 TransactionSend, TransactionStatusCheck,
17 },
18 models::{DefaultAppState, NetworkType},
19 queues::{Queue, QueueBackendType},
20 utils::RedisConnections,
21};
22
23use super::{QueueBackend, QueueBackendError, QueueHealth, QueueType, WorkerHandle};
24
25#[derive(Clone)]
31pub struct RedisBackend {
32 queue: Queue,
33}
34
35impl std::fmt::Debug for RedisBackend {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.debug_struct("RedisBackend")
38 .field("backend_type", &"redis")
39 .finish()
40 }
41}
42
43impl RedisBackend {
44 pub async fn new(redis_connections: Arc<RedisConnections>) -> Result<Self, QueueBackendError> {
54 info!("Initializing Redis queue backend");
55
56 let queue = Queue::setup(redis_connections)
57 .await
58 .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
59
60 Ok(Self { queue })
61 }
62
63 pub fn queue(&self) -> &Queue {
65 &self.queue
66 }
67}
68
69fn status_check_queue_type(network_type: Option<&NetworkType>) -> QueueType {
74 match network_type {
75 Some(NetworkType::Evm) => QueueType::StatusCheckEvm,
76 Some(NetworkType::Stellar) => QueueType::StatusCheckStellar,
77 _ => QueueType::StatusCheck,
78 }
79}
80
81fn static_redis_health_statuses() -> Vec<QueueHealth> {
82 vec![
83 QueueHealth {
84 queue_type: QueueType::TransactionRequest,
85 messages_visible: 0, messages_in_flight: 0,
87 messages_dlq: 0,
88 backend: "redis".to_string(),
89 is_healthy: true,
90 },
91 QueueHealth {
92 queue_type: QueueType::TransactionSubmission,
93 messages_visible: 0,
94 messages_in_flight: 0,
95 messages_dlq: 0,
96 backend: "redis".to_string(),
97 is_healthy: true,
98 },
99 QueueHealth {
100 queue_type: QueueType::StatusCheck,
101 messages_visible: 0,
102 messages_in_flight: 0,
103 messages_dlq: 0,
104 backend: "redis".to_string(),
105 is_healthy: true,
106 },
107 QueueHealth {
108 queue_type: QueueType::StatusCheckEvm,
109 messages_visible: 0,
110 messages_in_flight: 0,
111 messages_dlq: 0,
112 backend: "redis".to_string(),
113 is_healthy: true,
114 },
115 QueueHealth {
116 queue_type: QueueType::StatusCheckStellar,
117 messages_visible: 0,
118 messages_in_flight: 0,
119 messages_dlq: 0,
120 backend: "redis".to_string(),
121 is_healthy: true,
122 },
123 QueueHealth {
124 queue_type: QueueType::Notification,
125 messages_visible: 0,
126 messages_in_flight: 0,
127 messages_dlq: 0,
128 backend: "redis".to_string(),
129 is_healthy: true,
130 },
131 QueueHealth {
132 queue_type: QueueType::TokenSwapRequest,
133 messages_visible: 0,
134 messages_in_flight: 0,
135 messages_dlq: 0,
136 backend: "redis".to_string(),
137 is_healthy: true,
138 },
139 QueueHealth {
140 queue_type: QueueType::RelayerHealthCheck,
141 messages_visible: 0,
142 messages_in_flight: 0,
143 messages_dlq: 0,
144 backend: "redis".to_string(),
145 is_healthy: true,
146 },
147 ]
148}
149
150#[async_trait]
151impl QueueBackend for RedisBackend {
152 async fn produce_transaction_request(
153 &self,
154 job: Job<TransactionRequest>,
155 scheduled_on: Option<i64>,
156 ) -> Result<String, QueueBackendError> {
157 let mut storage = self.queue.transaction_request_queue.clone();
158 let job_id = job.message_id.clone();
159
160 match scheduled_on {
161 Some(on) => {
162 storage
163 .schedule(job, on)
164 .await
165 .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
166 }
167 None => {
168 storage
169 .push(job)
170 .await
171 .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
172 }
173 }
174
175 Ok(job_id)
176 }
177
178 async fn produce_transaction_submission(
179 &self,
180 job: Job<TransactionSend>,
181 scheduled_on: Option<i64>,
182 ) -> Result<String, QueueBackendError> {
183 let mut storage = self.queue.transaction_submission_queue.clone();
184 let job_id = job.message_id.clone();
185
186 match scheduled_on {
187 Some(on) => {
188 storage
189 .schedule(job, on)
190 .await
191 .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
192 }
193 None => {
194 storage
195 .push(job)
196 .await
197 .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
198 }
199 }
200
201 Ok(job_id)
202 }
203
204 async fn produce_transaction_status_check(
205 &self,
206 job: Job<TransactionStatusCheck>,
207 scheduled_on: Option<i64>,
208 ) -> Result<String, QueueBackendError> {
209 let mut storage = match status_check_queue_type(job.data.network_type.as_ref()) {
211 QueueType::StatusCheckEvm => self.queue.transaction_status_queue_evm.clone(),
212 QueueType::StatusCheckStellar => self.queue.transaction_status_queue_stellar.clone(),
213 _ => self.queue.transaction_status_queue.clone(),
214 };
215 let job_id = job.message_id.clone();
216
217 match scheduled_on {
218 Some(on) => {
219 storage
220 .schedule(job, on)
221 .await
222 .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
223 }
224 None => {
225 storage
226 .push(job)
227 .await
228 .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
229 }
230 }
231
232 Ok(job_id)
233 }
234
235 async fn produce_notification(
236 &self,
237 job: Job<NotificationSend>,
238 scheduled_on: Option<i64>,
239 ) -> Result<String, QueueBackendError> {
240 let mut storage = self.queue.notification_queue.clone();
241 let job_id = job.message_id.clone();
242
243 match scheduled_on {
244 Some(on) => {
245 storage
246 .schedule(job, on)
247 .await
248 .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
249 }
250 None => {
251 storage
252 .push(job)
253 .await
254 .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
255 }
256 }
257
258 Ok(job_id)
259 }
260
261 async fn produce_token_swap_request(
262 &self,
263 job: Job<TokenSwapRequest>,
264 scheduled_on: Option<i64>,
265 ) -> Result<String, QueueBackendError> {
266 let mut storage = self.queue.token_swap_request_queue.clone();
267 let job_id = job.message_id.clone();
268
269 match scheduled_on {
270 Some(on) => {
271 storage
272 .schedule(job, on)
273 .await
274 .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
275 }
276 None => {
277 storage
278 .push(job)
279 .await
280 .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
281 }
282 }
283
284 Ok(job_id)
285 }
286
287 async fn produce_relayer_health_check(
288 &self,
289 job: Job<RelayerHealthCheck>,
290 scheduled_on: Option<i64>,
291 ) -> Result<String, QueueBackendError> {
292 let mut storage = self.queue.relayer_health_check_queue.clone();
293 let job_id = job.message_id.clone();
294
295 match scheduled_on {
296 Some(on) => {
297 storage
298 .schedule(job, on)
299 .await
300 .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
301 }
302 None => {
303 storage
304 .push(job)
305 .await
306 .map_err(|e| QueueBackendError::RedisError(e.to_string()))?;
307 }
308 }
309
310 Ok(job_id)
311 }
312
313 async fn initialize_workers(
314 &self,
315 app_state: Arc<ThinData<DefaultAppState>>,
316 ) -> Result<Vec<WorkerHandle>, QueueBackendError> {
317 info!("Initializing Redis backend workers");
318
319 super::redis_worker::initialize_redis_workers((*app_state).clone())
320 .await
321 .map_err(|e| QueueBackendError::WorkerInitError(e.to_string()))?;
322
323 super::redis_worker::initialize_redis_token_swap_workers((*app_state).clone())
324 .await
325 .map_err(|e| QueueBackendError::WorkerInitError(e.to_string()))?;
326
327 Ok(vec![])
330 }
331
332 async fn health_check(&self) -> Result<Vec<QueueHealth>, QueueBackendError> {
333 Ok(static_redis_health_statuses())
337 }
338
339 fn backend_type(&self) -> QueueBackendType {
340 QueueBackendType::Redis
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347 use crate::jobs::{Job, JobType, TransactionRequest};
348 use crate::models::NetworkType;
349 use crate::queues::QueueType;
350
351 #[test]
352 fn test_backend_type_logic() {
353 assert_eq!(QueueBackendType::Redis.as_str(), "redis");
356 assert_eq!(QueueBackendType::Redis.to_string(), "redis");
357 }
358
359 #[test]
360 fn test_produce_transaction_status_check_routing_logic() {
361 assert_eq!(
362 status_check_queue_type(Some(&NetworkType::Evm)),
363 QueueType::StatusCheckEvm
364 );
365 assert_eq!(
366 status_check_queue_type(Some(&NetworkType::Stellar)),
367 QueueType::StatusCheckStellar
368 );
369 assert_eq!(
370 status_check_queue_type(Some(&NetworkType::Solana)),
371 QueueType::StatusCheck
372 );
373 assert_eq!(status_check_queue_type(None), QueueType::StatusCheck);
374 }
375
376 #[test]
377 fn test_job_id_extraction() {
378 let job = Job::new(
380 JobType::TransactionRequest,
381 TransactionRequest::new("tx1", "relayer1"),
382 );
383
384 assert!(!job.message_id.is_empty());
386 assert_eq!(job.message_id.len(), 36); let job_id = job.message_id.clone();
390 assert_eq!(job_id, job.message_id);
391 }
392
393 #[test]
394 fn test_scheduled_on_handling() {
395 let now = std::time::SystemTime::now()
399 .duration_since(std::time::UNIX_EPOCH)
400 .unwrap()
401 .as_secs() as i64;
402
403 let immediate: Option<i64> = None;
405 assert_eq!(immediate, None);
406
407 let scheduled: Option<i64> = Some(now + 60);
409 assert!(scheduled.is_some());
410 assert!(scheduled.unwrap() > now);
411
412 let past: Option<i64> = Some(now - 10);
414 assert!(past.is_some());
415 }
416
417 #[test]
418 fn test_health_check_structure() {
419 let expected_queue_types = vec![
423 QueueType::TransactionRequest,
424 QueueType::TransactionSubmission,
425 QueueType::StatusCheck,
426 QueueType::StatusCheckEvm,
427 QueueType::StatusCheckStellar,
428 QueueType::Notification,
429 QueueType::TokenSwapRequest,
430 QueueType::RelayerHealthCheck,
431 ];
432
433 assert_eq!(expected_queue_types.len(), 8);
435
436 for queue_type in &expected_queue_types {
438 assert!(!queue_type.queue_name().is_empty());
439 assert!(!queue_type.redis_namespace().is_empty());
440 }
441
442 let statuses = static_redis_health_statuses();
443 assert_eq!(statuses.len(), expected_queue_types.len());
444 for queue_type in expected_queue_types {
445 assert!(statuses.iter().any(|h| h.queue_type == queue_type));
446 }
447 assert!(statuses.iter().all(|h| h.backend == "redis"));
448 assert!(statuses.iter().all(|h| h.is_healthy));
449 }
450
451 #[test]
452 fn test_static_redis_health_statuses_have_zero_counts() {
453 let statuses = static_redis_health_statuses();
454 assert!(!statuses.is_empty());
455 for status in statuses {
456 assert_eq!(status.messages_visible, 0);
457 assert_eq!(status.messages_in_flight, 0);
458 assert_eq!(status.messages_dlq, 0);
459 }
460 }
461}