openzeppelin_relayer/queues/
mod.rs1use async_trait::async_trait;
24use std::sync::Arc;
25
26use crate::{
27 config::ServerConfig,
28 jobs::{
29 Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
30 TransactionSend, TransactionStatusCheck,
31 },
32 models::DefaultAppState,
33 utils::RedisConnections,
34};
35use actix_web::web::ThinData;
36
37pub mod errors;
38pub mod queue_type;
39pub mod redis;
40pub mod retry_config;
41pub mod sqs;
42pub mod swap_filter;
43pub mod worker_types;
44
45pub use errors::QueueBackendError;
46pub use queue_type::QueueType;
47pub use redis::queue::Queue;
48pub use retry_config::{backoff_config_for_queue, retry_delay_secs, status_check_retry_delay_secs};
49pub use swap_filter::filter_relayers_for_swap;
50pub use worker_types::{HandlerError, QueueHealth, WorkerContext, WorkerHandle};
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum QueueBackendType {
55 Redis,
56 Sqs,
57}
58
59impl QueueBackendType {
60 pub const fn as_str(self) -> &'static str {
61 match self {
62 Self::Redis => "redis",
63 Self::Sqs => "sqs",
64 }
65 }
66}
67
68impl std::fmt::Display for QueueBackendType {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 f.write_str(self.as_str())
71 }
72}
73
74#[async_trait]
82pub trait QueueBackend: Send + Sync {
83 async fn produce_transaction_request(
92 &self,
93 job: Job<TransactionRequest>,
94 scheduled_on: Option<i64>,
95 ) -> Result<String, QueueBackendError>;
96
97 async fn produce_transaction_submission(
99 &self,
100 job: Job<TransactionSend>,
101 scheduled_on: Option<i64>,
102 ) -> Result<String, QueueBackendError>;
103
104 async fn produce_transaction_status_check(
106 &self,
107 job: Job<TransactionStatusCheck>,
108 scheduled_on: Option<i64>,
109 ) -> Result<String, QueueBackendError>;
110
111 async fn produce_notification(
113 &self,
114 job: Job<NotificationSend>,
115 scheduled_on: Option<i64>,
116 ) -> Result<String, QueueBackendError>;
117
118 async fn produce_token_swap_request(
120 &self,
121 job: Job<TokenSwapRequest>,
122 scheduled_on: Option<i64>,
123 ) -> Result<String, QueueBackendError>;
124
125 async fn produce_relayer_health_check(
127 &self,
128 job: Job<RelayerHealthCheck>,
129 scheduled_on: Option<i64>,
130 ) -> Result<String, QueueBackendError>;
131
132 async fn initialize_workers(
143 &self,
144 app_state: Arc<ThinData<DefaultAppState>>,
145 ) -> Result<Vec<WorkerHandle>, QueueBackendError>;
146
147 async fn health_check(&self) -> Result<Vec<QueueHealth>, QueueBackendError>;
152
153 fn backend_type(&self) -> QueueBackendType;
155
156 fn shutdown(&self) {}
162}
163
164#[derive(Clone)]
170pub enum QueueBackendStorage {
171 Redis(Box<redis::backend::RedisBackend>),
172 Sqs(sqs::backend::SqsBackend),
173}
174
175impl std::fmt::Debug for QueueBackendStorage {
176 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
177 match self {
178 Self::Redis(b) => std::fmt::Debug::fmt(b, f),
179 Self::Sqs(b) => std::fmt::Debug::fmt(b, f),
180 }
181 }
182}
183
184impl QueueBackendStorage {
185 pub fn queue(&self) -> Option<&Queue> {
189 match self {
190 Self::Redis(b) => Some(b.queue()),
191 Self::Sqs(_) => None,
192 }
193 }
194
195 pub fn redis_connections(&self) -> Option<Arc<RedisConnections>> {
199 self.queue().map(|q| q.redis_connections())
200 }
201}
202
203#[async_trait]
204impl QueueBackend for QueueBackendStorage {
205 async fn produce_transaction_request(
206 &self,
207 job: Job<TransactionRequest>,
208 scheduled_on: Option<i64>,
209 ) -> Result<String, QueueBackendError> {
210 match self {
211 Self::Redis(b) => b.produce_transaction_request(job, scheduled_on).await,
212 Self::Sqs(b) => b.produce_transaction_request(job, scheduled_on).await,
213 }
214 }
215
216 async fn produce_transaction_submission(
217 &self,
218 job: Job<TransactionSend>,
219 scheduled_on: Option<i64>,
220 ) -> Result<String, QueueBackendError> {
221 match self {
222 Self::Redis(b) => b.produce_transaction_submission(job, scheduled_on).await,
223 Self::Sqs(b) => b.produce_transaction_submission(job, scheduled_on).await,
224 }
225 }
226
227 async fn produce_transaction_status_check(
228 &self,
229 job: Job<TransactionStatusCheck>,
230 scheduled_on: Option<i64>,
231 ) -> Result<String, QueueBackendError> {
232 match self {
233 Self::Redis(b) => b.produce_transaction_status_check(job, scheduled_on).await,
234 Self::Sqs(b) => b.produce_transaction_status_check(job, scheduled_on).await,
235 }
236 }
237
238 async fn produce_notification(
239 &self,
240 job: Job<NotificationSend>,
241 scheduled_on: Option<i64>,
242 ) -> Result<String, QueueBackendError> {
243 match self {
244 Self::Redis(b) => b.produce_notification(job, scheduled_on).await,
245 Self::Sqs(b) => b.produce_notification(job, scheduled_on).await,
246 }
247 }
248
249 async fn produce_token_swap_request(
250 &self,
251 job: Job<TokenSwapRequest>,
252 scheduled_on: Option<i64>,
253 ) -> Result<String, QueueBackendError> {
254 match self {
255 Self::Redis(b) => b.produce_token_swap_request(job, scheduled_on).await,
256 Self::Sqs(b) => b.produce_token_swap_request(job, scheduled_on).await,
257 }
258 }
259
260 async fn produce_relayer_health_check(
261 &self,
262 job: Job<RelayerHealthCheck>,
263 scheduled_on: Option<i64>,
264 ) -> Result<String, QueueBackendError> {
265 match self {
266 Self::Redis(b) => b.produce_relayer_health_check(job, scheduled_on).await,
267 Self::Sqs(b) => b.produce_relayer_health_check(job, scheduled_on).await,
268 }
269 }
270
271 async fn initialize_workers(
272 &self,
273 app_state: Arc<ThinData<DefaultAppState>>,
274 ) -> Result<Vec<WorkerHandle>, QueueBackendError> {
275 match self {
276 Self::Redis(b) => b.initialize_workers(app_state).await,
277 Self::Sqs(b) => b.initialize_workers(app_state).await,
278 }
279 }
280
281 async fn health_check(&self) -> Result<Vec<QueueHealth>, QueueBackendError> {
282 match self {
283 Self::Redis(b) => b.health_check().await,
284 Self::Sqs(b) => b.health_check().await,
285 }
286 }
287
288 fn backend_type(&self) -> QueueBackendType {
289 match self {
290 Self::Redis(b) => b.backend_type(),
291 Self::Sqs(b) => b.backend_type(),
292 }
293 }
294
295 fn shutdown(&self) {
296 match self {
297 Self::Redis(b) => b.shutdown(),
298 Self::Sqs(b) => b.shutdown(),
299 }
300 }
301}
302
303pub async fn create_queue_backend(
319 redis_connections: Arc<RedisConnections>,
320) -> Result<Arc<QueueBackendStorage>, QueueBackendError> {
321 let backend_type = ServerConfig::get_queue_backend();
322
323 let storage = match backend_type.to_lowercase().as_str() {
324 "redis" => {
325 let backend = redis::backend::RedisBackend::new(redis_connections).await?;
326 QueueBackendStorage::Redis(Box::new(backend))
327 }
328 "sqs" => {
329 let backend = sqs::backend::SqsBackend::new().await?;
330 QueueBackendStorage::Sqs(backend)
331 }
332 other => {
333 return Err(QueueBackendError::ConfigError(format!(
334 "Unsupported QUEUE_BACKEND value: {other}. Must be 'redis' or 'sqs'"
335 )));
336 }
337 };
338
339 Ok(Arc::new(storage))
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345
346 #[test]
347 fn test_queue_type_enum_values() {
348 let types = vec![
350 QueueType::TransactionRequest,
351 QueueType::TransactionSubmission,
352 QueueType::StatusCheck,
353 QueueType::StatusCheckEvm,
354 QueueType::StatusCheckStellar,
355 QueueType::Notification,
356 QueueType::TokenSwapRequest,
357 QueueType::RelayerHealthCheck,
358 ];
359
360 for queue_type in types {
361 assert!(!queue_type.queue_name().is_empty());
362 assert!(!queue_type.redis_namespace().is_empty());
363 assert!(queue_type.max_retries() > 0 || queue_type.max_retries() == usize::MAX);
364 }
365 }
366
367 #[test]
368 fn test_queue_type_visibility_timeouts_in_range() {
369 let all_types = [
371 QueueType::TransactionRequest,
372 QueueType::TransactionSubmission,
373 QueueType::StatusCheck,
374 QueueType::StatusCheckEvm,
375 QueueType::StatusCheckStellar,
376 QueueType::Notification,
377 QueueType::TokenSwapRequest,
378 QueueType::RelayerHealthCheck,
379 ];
380 for qt in all_types {
381 let vt = qt.visibility_timeout_secs();
382 assert!(vt > 0, "{qt}: visibility timeout must be > 0");
383 assert!(
384 vt <= 43200,
385 "{qt}: visibility timeout {vt}s exceeds SQS max (43200s)"
386 );
387 }
388 }
389
390 #[test]
391 fn test_queue_type_polling_intervals_appropriate() {
392 assert_eq!(QueueType::StatusCheck.polling_interval_secs(), 5);
394
395 assert!(QueueType::TransactionRequest.polling_interval_secs() >= 5);
397 assert!(QueueType::TransactionSubmission.polling_interval_secs() >= 5);
398 assert!(QueueType::Notification.polling_interval_secs() >= 10);
399 }
400
401 #[test]
402 fn test_queue_backend_error_variants() {
403 let errors = vec![
404 QueueBackendError::RedisError("test".to_string()),
405 QueueBackendError::SqsError("test".to_string()),
406 QueueBackendError::SerializationError("test".to_string()),
407 QueueBackendError::ConfigError("test".to_string()),
408 QueueBackendError::QueueNotFound("test".to_string()),
409 QueueBackendError::WorkerInitError("test".to_string()),
410 QueueBackendError::QueueError("test".to_string()),
411 ];
412
413 for error in errors {
414 let error_str = error.to_string();
415 assert!(!error_str.is_empty());
416 }
417 }
418
419 #[test]
420 fn test_queue_backend_type_string_representations() {
421 assert_eq!(QueueBackendType::Redis.as_str(), "redis");
422 assert_eq!(QueueBackendType::Sqs.as_str(), "sqs");
423 assert_eq!(QueueBackendType::Redis.to_string(), "redis");
424 assert_eq!(QueueBackendType::Sqs.to_string(), "sqs");
425 }
426}