openzeppelin_relayer/jobs/handlers/
system_cleanup_handler.rs1use actix_web::web::ThinData;
23use deadpool_redis::Pool;
24use eyre::Result;
25use std::env;
26use std::sync::Arc;
27use std::time::Duration;
28use tracing::{debug, error, info, instrument, warn};
29
30use crate::{
31 config::ServerConfig,
32 constants::{SYSTEM_CLEANUP_LOCK_TTL_SECS, WORKER_SYSTEM_CLEANUP_RETRIES},
33 jobs::{handle_result, JobProducerTrait},
34 models::DefaultAppState,
35 queues::{HandlerError, QueueBackendType, WorkerContext},
36 utils::DistributedLock,
37};
38
39const SYSTEM_CLEANUP_LOCK_NAME: &str = "system_queue_cleanup";
42
43const JOB_AGE_THRESHOLD_SECS: i64 = 10 * 60;
48
49const CLEANUP_BATCH_SIZE: isize = 500;
52
53const QUEUE_NAMES: &[&str] = &[
56 "transaction_request_queue",
57 "transaction_submission_queue",
58 "transaction_status_queue",
59 "transaction_status_queue_evm",
60 "transaction_status_queue_stellar",
61 "notification_queue",
62 "token_swap_request_queue",
63 "relayer_health_check_queue",
64];
65
66const SORTED_SET_SUFFIXES: &[&str] = &[":done", ":failed", ":dead"];
68
69#[derive(Default, Debug, Clone)]
71pub struct SystemCleanupCronReminder();
72
73#[instrument(
89 level = "debug",
90 skip(job, data),
91 fields(
92 job_type = "system_cleanup",
93 attempt = %ctx.attempt,
94 ),
95 err
96)]
97pub async fn system_cleanup_handler(
98 job: SystemCleanupCronReminder,
99 data: ThinData<DefaultAppState>,
100 ctx: WorkerContext,
101) -> Result<(), HandlerError> {
102 let result = handle_cleanup_request(job, &data).await;
103
104 handle_result(result, &ctx, "SystemCleanup", WORKER_SYSTEM_CLEANUP_RETRIES)
105}
106
107async fn handle_cleanup_request(
116 _job: SystemCleanupCronReminder,
117 data: &ThinData<DefaultAppState>,
118) -> Result<()> {
119 let backend_type = data.job_producer().backend_type();
121 if backend_type != QueueBackendType::Redis {
122 debug!(
123 backend = %backend_type,
124 "Skipping queue metadata cleanup - not using Redis queue backend"
125 );
126 return Ok(());
127 }
128
129 let transaction_repo = data.transaction_repository();
130 let (redis_connections, key_prefix) =
131 match crate::repositories::TransactionRepository::connection_info(transaction_repo.as_ref())
132 {
133 Some((connections, prefix)) => (connections, prefix),
134 None => {
135 debug!("in-memory repository detected, skipping system cleanup");
136 return Ok(());
137 }
138 };
139 let pool = redis_connections.primary().clone();
140
141 let _lock_guard = if ServerConfig::get_distributed_mode() {
144 let lock_key = format!("{key_prefix}:lock:{SYSTEM_CLEANUP_LOCK_NAME}");
145 let lock = DistributedLock::new(
146 pool.clone(),
147 &lock_key,
148 Duration::from_secs(SYSTEM_CLEANUP_LOCK_TTL_SECS),
149 );
150
151 match lock.try_acquire().await {
152 Ok(Some(guard)) => {
153 debug!(lock_key = %lock_key, "acquired distributed lock for system cleanup");
154 Some(guard)
155 }
156 Ok(None) => {
157 info!(lock_key = %lock_key, "system cleanup skipped - another instance is processing");
158 return Ok(());
159 }
160 Err(e) => {
161 warn!(
164 error = %e,
165 lock_key = %lock_key,
166 "failed to acquire distributed lock, skipping cleanup"
167 );
168 return Ok(());
169 }
170 }
171 } else {
172 debug!("distributed mode disabled, skipping lock acquisition");
173 None
174 };
175
176 info!("executing queue metadata cleanup");
177
178 let redis_key_prefix = env::var("REDIS_KEY_PREFIX")
182 .ok()
183 .filter(|v| !v.is_empty())
184 .map(|value| format!("{value}:queue:"))
185 .unwrap_or_default();
186
187 let cutoff_timestamp = chrono::Utc::now().timestamp() - JOB_AGE_THRESHOLD_SECS;
188
189 let mut total_cleaned = 0usize;
190 let mut total_errors = 0usize;
191
192 for queue_name in QUEUE_NAMES {
194 let namespace = format!("{redis_key_prefix}{queue_name}");
195
196 match cleanup_queue(&pool, &namespace, cutoff_timestamp).await {
197 Ok(cleaned) => {
198 if cleaned > 0 {
199 debug!(
200 queue = %queue_name,
201 cleaned_count = cleaned,
202 "cleaned up stale job metadata"
203 );
204 }
205 total_cleaned += cleaned;
206 }
207 Err(e) => {
208 error!(
209 queue = %queue_name,
210 error = %e,
211 "failed to cleanup queue"
212 );
213 total_errors += 1;
214 }
215 }
216 }
217
218 info!(
219 total_cleaned,
220 total_errors,
221 queues_processed = QUEUE_NAMES.len(),
222 "system cleanup completed"
223 );
224
225 if total_errors > 0 {
226 Err(eyre::eyre!(
227 "System cleanup completed with {} errors",
228 total_errors
229 ))
230 } else {
231 Ok(())
232 }
233}
234
235async fn cleanup_queue(pool: &Arc<Pool>, namespace: &str, cutoff_timestamp: i64) -> Result<usize> {
245 let mut total_cleaned = 0usize;
246 let data_key = format!("{namespace}:data");
247 let result_key = format!("{data_key}::result");
248
249 for suffix in SORTED_SET_SUFFIXES {
251 let sorted_set_key = format!("{namespace}{suffix}");
252 let cleaned = cleanup_sorted_set_and_hashes(
253 pool,
254 &sorted_set_key,
255 &data_key,
256 &result_key,
257 cutoff_timestamp,
258 )
259 .await?;
260 total_cleaned += cleaned;
261 }
262
263 Ok(total_cleaned)
264}
265
266async fn cleanup_sorted_set_and_hashes(
281 pool: &Arc<Pool>,
282 sorted_set_key: &str,
283 data_key: &str,
284 result_key: &str,
285 cutoff_timestamp: i64,
286) -> Result<usize> {
287 let mut total_cleaned = 0usize;
288 let mut conn = pool.get().await?;
289
290 loop {
291 let job_ids: Vec<String> = redis::cmd("ZRANGEBYSCORE")
294 .arg(sorted_set_key)
295 .arg("-inf")
296 .arg(cutoff_timestamp)
297 .arg("LIMIT")
298 .arg(0)
299 .arg(CLEANUP_BATCH_SIZE)
300 .query_async(&mut conn)
301 .await?;
302
303 if job_ids.is_empty() {
304 break;
305 }
306
307 let batch_size = job_ids.len();
308
309 let mut pipe = redis::pipe();
311
312 pipe.cmd("ZREM").arg(sorted_set_key);
314 for job_id in &job_ids {
315 pipe.arg(job_id);
316 }
317 pipe.ignore();
318
319 pipe.cmd("HDEL").arg(data_key);
321 for job_id in &job_ids {
322 pipe.arg(job_id);
323 }
324 pipe.ignore();
325
326 pipe.cmd("HDEL").arg(result_key);
328 for job_id in &job_ids {
329 pipe.arg(job_id);
330 }
331 pipe.ignore();
332
333 pipe.query_async::<()>(&mut conn).await?;
334
335 total_cleaned += batch_size;
336
337 if batch_size < CLEANUP_BATCH_SIZE as usize {
339 break;
340 }
341 }
342
343 Ok(total_cleaned)
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349
350 #[test]
351 fn test_queue_names_not_empty() {
352 assert!(!QUEUE_NAMES.is_empty());
353 }
354
355 #[test]
356 fn test_sorted_set_suffixes() {
357 assert!(SORTED_SET_SUFFIXES.contains(&":done"));
358 assert!(SORTED_SET_SUFFIXES.contains(&":failed"));
359 assert!(SORTED_SET_SUFFIXES.contains(&":dead"));
360 }
361
362 #[test]
363 fn test_constants() {
364 assert_eq!(SYSTEM_CLEANUP_LOCK_TTL_SECS, 14 * 60); assert_eq!(JOB_AGE_THRESHOLD_SECS, 10 * 60); assert_eq!(CLEANUP_BATCH_SIZE, 500);
367 }
368
369 #[test]
370 fn test_namespace_format_without_prefix() {
371 let redis_key_prefix = "";
373 let queue_name = "transaction_request_queue";
374 let namespace = format!("{redis_key_prefix}{queue_name}");
375 assert_eq!(namespace, "transaction_request_queue");
376 }
377
378 #[test]
379 fn test_namespace_format_with_prefix() {
380 let redis_key_prefix = "oz-relayer:queue:";
382 let queue_name = "transaction_request_queue";
383 let namespace = format!("{redis_key_prefix}{queue_name}");
384 assert_eq!(namespace, "oz-relayer:queue:transaction_request_queue");
385 }
386
387 #[test]
388 fn test_sorted_set_key_format() {
389 let namespace = "transaction_request_queue";
391 let sorted_set_key = format!("{namespace}:done");
392 assert_eq!(sorted_set_key, "transaction_request_queue:done");
393
394 let namespace_with_prefix = "oz-relayer:queue:transaction_request_queue";
396 let sorted_set_key_with_prefix = format!("{namespace_with_prefix}:done");
397 assert_eq!(
398 sorted_set_key_with_prefix,
399 "oz-relayer:queue:transaction_request_queue:done"
400 );
401 }
402
403 #[test]
404 fn test_data_key_format() {
405 let namespace = "transaction_request_queue";
407 let data_key = format!("{namespace}:data");
408 assert_eq!(data_key, "transaction_request_queue:data");
409
410 let namespace_with_prefix = "oz-relayer:queue:transaction_request_queue";
412 let data_key_with_prefix = format!("{namespace_with_prefix}:data");
413 assert_eq!(
414 data_key_with_prefix,
415 "oz-relayer:queue:transaction_request_queue:data"
416 );
417 }
418
419 #[test]
420 fn test_result_key_format() {
421 let namespace = "transaction_request_queue";
423 let result_key = format!("{namespace}:result");
424 assert_eq!(result_key, "transaction_request_queue:result");
425
426 let namespace_with_prefix = "oz-relayer:queue:transaction_request_queue";
428 let result_key_with_prefix = format!("{namespace_with_prefix}:result");
429 assert_eq!(
430 result_key_with_prefix,
431 "oz-relayer:queue:transaction_request_queue:result"
432 );
433 }
434
435 #[test]
436 fn test_lock_key_format() {
437 let prefix = "oz-relayer";
438 let lock_key = format!("{prefix}:lock:{SYSTEM_CLEANUP_LOCK_NAME}");
439 assert_eq!(lock_key, "oz-relayer:lock:system_queue_cleanup");
440 }
441
442 #[test]
443 fn test_cutoff_timestamp_calculation() {
444 let now = chrono::Utc::now().timestamp();
445 let cutoff = now - JOB_AGE_THRESHOLD_SECS;
446 assert!(cutoff < now);
447 assert_eq!(now - cutoff, JOB_AGE_THRESHOLD_SECS);
448 }
449}