openzeppelin_relayer/queues/redis/
queue.rs1use std::{env, sync::Arc};
11
12use apalis_redis::{Config, RedisStorage};
13use color_eyre::{eyre, Result};
14use redis::aio::{ConnectionManager, ConnectionManagerConfig};
15use serde::{Deserialize, Serialize};
16use tokio::time::Duration;
17use tracing::info;
18
19use crate::{config::ServerConfig, utils::RedisConnections};
20
21use crate::jobs::{
22 Job, NotificationSend, RelayerHealthCheck, TokenSwapRequest, TransactionRequest,
23 TransactionSend, TransactionStatusCheck,
24};
25
26#[derive(Clone)]
27pub struct Queue {
28 pub transaction_request_queue: RedisStorage<Job<TransactionRequest>>,
29 pub transaction_submission_queue: RedisStorage<Job<TransactionSend>>,
30 pub transaction_status_queue: RedisStorage<Job<TransactionStatusCheck>>,
32 pub transaction_status_queue_evm: RedisStorage<Job<TransactionStatusCheck>>,
34 pub transaction_status_queue_stellar: RedisStorage<Job<TransactionStatusCheck>>,
36 pub notification_queue: RedisStorage<Job<NotificationSend>>,
37 pub token_swap_request_queue: RedisStorage<Job<TokenSwapRequest>>,
38 pub relayer_health_check_queue: RedisStorage<Job<RelayerHealthCheck>>,
39 redis_connections: Arc<RedisConnections>,
45}
46
47impl std::fmt::Debug for Queue {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 f.debug_struct("Queue")
50 .field("transaction_request_queue", &"RedisStorage<...>")
51 .field("transaction_submission_queue", &"RedisStorage<...>")
52 .field("transaction_status_queue", &"RedisStorage<...>")
53 .field("transaction_status_queue_evm", &"RedisStorage<...>")
54 .field("transaction_status_queue_stellar", &"RedisStorage<...>")
55 .field("notification_queue", &"RedisStorage<...>")
56 .field("token_swap_request_queue", &"RedisStorage<...>")
57 .field("relayer_health_check_queue", &"RedisStorage<...>")
58 .field("redis_connections", &"RedisConnections")
59 .finish()
60 }
61}
62
63#[derive(Clone, Debug)]
65struct QueueConfig {
66 enqueue_scheduled: Duration,
68}
69
70impl Default for QueueConfig {
71 fn default() -> Self {
72 Self {
73 enqueue_scheduled: Duration::from_secs(30),
74 }
75 }
76}
77
78impl QueueConfig {
79 fn high_frequency() -> Self {
81 Self {
82 enqueue_scheduled: Duration::from_secs(2),
83 }
84 }
85
86 fn low_frequency() -> Self {
90 Self {
91 enqueue_scheduled: Duration::from_secs(20),
92 }
93 }
94}
95
96impl Queue {
97 fn storage<T: Serialize + for<'de> Deserialize<'de>>(
107 namespace: &str,
108 conn: ConnectionManager,
109 queue_config: QueueConfig,
110 ) -> RedisStorage<T> {
111 let config = Config::default()
112 .set_namespace(namespace)
113 .set_enqueue_scheduled(queue_config.enqueue_scheduled);
114
115 RedisStorage::new_with_config(conn, config)
116 }
117
118 async fn create_connection_manager(
123 client: &redis::Client,
124 queue_timeout: Duration,
125 ) -> Result<ConnectionManager> {
126 let conn_config = ConnectionManagerConfig::new()
127 .set_connection_timeout(queue_timeout)
128 .set_response_timeout(queue_timeout)
129 .set_number_of_retries(2)
130 .set_max_delay(1000);
131
132 ConnectionManager::new_with_config(client.clone(), conn_config)
133 .await
134 .map_err(|e| eyre::eyre!("Failed to create Redis connection manager: {}", e))
135 }
136
137 pub async fn setup(redis_connections: Arc<RedisConnections>) -> Result<Self> {
155 let server_config = ServerConfig::from_env();
156 let redis_url = &server_config.redis_url;
157
158 let client = redis::Client::open(redis_url.as_str())
160 .map_err(|e| eyre::eyre!("Failed to create Redis client for queue: {}", e))?;
161
162 let queue_timeout = Duration::from_secs(5);
165
166 let conn_tx_request = Self::create_connection_manager(&client, queue_timeout).await?;
169 let conn_tx_submit = Self::create_connection_manager(&client, queue_timeout).await?;
170 let conn_status = Self::create_connection_manager(&client, queue_timeout).await?;
171 let conn_status_evm = Self::create_connection_manager(&client, queue_timeout).await?;
172 let conn_status_stellar = Self::create_connection_manager(&client, queue_timeout).await?;
173 let conn_notification = Self::create_connection_manager(&client, queue_timeout).await?;
174 let conn_swap = Self::create_connection_manager(&client, queue_timeout).await?;
175 let conn_health = Self::create_connection_manager(&client, queue_timeout).await?;
176
177 info!(
178 redis_url = %redis_url,
179 connection_timeout_ms = 5000,
180 response_timeout_ms = 5000,
181 retries = 2,
182 max_backoff_ms = 1000,
183 connection_count = 8,
184 "Queue setup: created dedicated ConnectionManager per queue"
185 );
186
187 let redis_key_prefix = env::var("REDIS_KEY_PREFIX")
189 .ok()
190 .filter(|v| !v.is_empty())
191 .map(|value| format!("{value}:queue:"))
192 .unwrap_or_default();
193
194 let high_frequency = QueueConfig::high_frequency();
198 let low_frequency = QueueConfig::low_frequency();
199
200 Ok(Self {
201 transaction_request_queue: Self::storage(
202 &format!("{redis_key_prefix}transaction_request_queue"),
203 conn_tx_request,
204 low_frequency.clone(), ),
206 transaction_submission_queue: Self::storage(
207 &format!("{redis_key_prefix}transaction_submission_queue"),
208 conn_tx_submit,
209 low_frequency.clone(), ),
211 transaction_status_queue: Self::storage(
212 &format!("{redis_key_prefix}transaction_status_queue"),
213 conn_status,
214 high_frequency.clone(),
215 ),
216 transaction_status_queue_evm: Self::storage(
217 &format!("{redis_key_prefix}transaction_status_queue_evm"),
218 conn_status_evm,
219 high_frequency.clone(),
220 ),
221 transaction_status_queue_stellar: Self::storage(
222 &format!("{redis_key_prefix}transaction_status_queue_stellar"),
223 conn_status_stellar,
224 high_frequency.clone(),
225 ),
226 notification_queue: Self::storage(
228 &format!("{redis_key_prefix}notification_queue"),
229 conn_notification,
230 low_frequency.clone(), ),
232 token_swap_request_queue: Self::storage(
233 &format!("{redis_key_prefix}token_swap_request_queue"),
234 conn_swap,
235 low_frequency.clone(), ),
237 relayer_health_check_queue: Self::storage(
238 &format!("{redis_key_prefix}relayer_health_check_queue"),
239 conn_health,
240 low_frequency.clone(), ),
242 redis_connections,
243 })
244 }
245
246 pub fn redis_connections(&self) -> Arc<RedisConnections> {
251 self.redis_connections.clone()
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258
259 #[test]
260 fn test_queue_storage_configuration() {
261 let namespace = "test_namespace";
263 let config = Config::default().set_namespace(namespace);
264
265 assert_eq!(config.get_namespace(), namespace);
266 }
267
268 #[derive(Clone, Debug)]
270 struct MockQueue {
271 pub namespace_transaction_request: String,
272 pub namespace_transaction_submission: String,
273 pub namespace_transaction_status: String,
274 pub namespace_transaction_status_evm: String,
275 pub namespace_transaction_status_stellar: String,
276 pub namespace_notification: String,
277 pub namespace_token_swap_request_queue: String,
278 pub namespace_relayer_health_check_queue: String,
279 }
280
281 impl MockQueue {
282 fn new() -> Self {
283 Self {
284 namespace_transaction_request: "transaction_request_queue".to_string(),
285 namespace_transaction_submission: "transaction_submission_queue".to_string(),
286 namespace_transaction_status: "transaction_status_queue".to_string(),
287 namespace_transaction_status_evm: "transaction_status_queue_evm".to_string(),
288 namespace_transaction_status_stellar: "transaction_status_queue_stellar"
289 .to_string(),
290 namespace_notification: "notification_queue".to_string(),
291 namespace_token_swap_request_queue: "token_swap_request_queue".to_string(),
292 namespace_relayer_health_check_queue: "relayer_health_check_queue".to_string(),
293 }
294 }
295 }
296
297 #[test]
298 fn test_queue_namespaces() {
299 let mock_queue = MockQueue::new();
300
301 assert_eq!(
302 mock_queue.namespace_transaction_request,
303 "transaction_request_queue"
304 );
305 assert_eq!(
306 mock_queue.namespace_transaction_submission,
307 "transaction_submission_queue"
308 );
309 assert_eq!(
310 mock_queue.namespace_transaction_status,
311 "transaction_status_queue"
312 );
313 assert_eq!(
314 mock_queue.namespace_transaction_status_evm,
315 "transaction_status_queue_evm"
316 );
317 assert_eq!(
318 mock_queue.namespace_transaction_status_stellar,
319 "transaction_status_queue_stellar"
320 );
321 assert_eq!(mock_queue.namespace_notification, "notification_queue");
322 assert_eq!(
323 mock_queue.namespace_token_swap_request_queue,
324 "token_swap_request_queue"
325 );
326 assert_eq!(
327 mock_queue.namespace_relayer_health_check_queue,
328 "relayer_health_check_queue"
329 );
330 }
331
332 #[test]
333 fn test_queue_config_with_prefix() {
334 let prefix = "myprefix:queue:";
336 let queue_name = "transaction_request_queue";
337 let full_namespace = format!("{prefix}{queue_name}");
338
339 let config = Config::default().set_namespace(&full_namespace);
340 assert_eq!(
341 config.get_namespace(),
342 "myprefix:queue:transaction_request_queue"
343 );
344 }
345
346 #[test]
347 fn test_queue_config_without_prefix() {
348 let queue_name = "transaction_request_queue";
350
351 let config = Config::default().set_namespace(queue_name);
352 assert_eq!(config.get_namespace(), "transaction_request_queue");
353 }
354
355 #[test]
356 fn test_queue_config_default() {
357 let config = QueueConfig::default();
358
359 assert_eq!(config.enqueue_scheduled, Duration::from_secs(30));
360 }
361
362 #[test]
363 fn test_queue_config_high_throughput() {
364 let config = QueueConfig::high_frequency();
365
366 assert_eq!(config.enqueue_scheduled, Duration::from_secs(2));
368
369 let default = QueueConfig::default();
371 assert!(config.enqueue_scheduled < default.enqueue_scheduled);
372 }
373
374 #[test]
375 fn test_queue_config_low_frequency() {
376 let config = QueueConfig::low_frequency();
377
378 assert_eq!(config.enqueue_scheduled, Duration::from_secs(20));
379
380 let high = QueueConfig::high_frequency();
382 assert!(config.enqueue_scheduled > high.enqueue_scheduled);
383 }
384}