openzeppelin_relayer/jobs/handlers/
system_cleanup_handler.rs

1//! System cleanup worker implementation for Redis queue metadata.
2//!
3//! This module implements a cleanup worker that removes stale job metadata from Redis.
4//! The job queue library stores job metadata in Redis that never gets automatically cleaned up.
5//! When jobs complete, keys accumulate in:
6//! - `{namespace}:done` - Sorted set of completed job IDs (score = timestamp)
7//! - `{namespace}:data` - Hash storing job payloads (field = job_id)
8//! - `{namespace}:result` - Hash storing job results (field = job_id)
9//! - `{namespace}:failed` - Sorted set of failed jobs
10//! - `{namespace}:dead` - Sorted set of dead-letter jobs
11//!
12//! This worker runs every 15 minutes to clean up this metadata and prevent Redis memory from growing
13//! indefinitely.
14//!
15//! ## Distributed Lock
16//!
17//! Since this runs on multiple service instances simultaneously (each with its own
18//! CronStream), a distributed lock is used to ensure only one instance processes
19//! the cleanup at a time. The lock has a 14-minute TTL (the cron runs every 15 minutes),
20//! ensuring the lock expires before the next scheduled run.
21
22use actix_web::web::ThinData;
23use deadpool_redis::Pool;
24use eyre::Result;
25use std::env;
26use std::sync::Arc;
27use std::time::Duration;
28use tracing::{debug, error, info, instrument, warn};
29
30use crate::{
31    config::ServerConfig,
32    constants::{SYSTEM_CLEANUP_LOCK_TTL_SECS, WORKER_SYSTEM_CLEANUP_RETRIES},
33    jobs::{handle_result, JobProducerTrait},
34    models::DefaultAppState,
35    queues::{HandlerError, QueueBackendType, WorkerContext},
36    utils::DistributedLock,
37};
38
39/// Distributed lock name for queue cleanup.
40/// Only one instance across the cluster should run cleanup at a time.
41const SYSTEM_CLEANUP_LOCK_NAME: &str = "system_queue_cleanup";
42
43// Note: SYSTEM_CLEANUP_LOCK_TTL_SECS is defined in crate::constants::worker
44
45/// Age threshold for job metadata cleanup (10 minutes).
46/// Jobs older than this threshold will be cleaned up.
47const JOB_AGE_THRESHOLD_SECS: i64 = 10 * 60;
48
49/// Batch size for cleanup operations.
50/// Processing in batches prevents memory issues with large datasets.
51const CLEANUP_BATCH_SIZE: isize = 500;
52
53/// Queue names to clean up.
54/// These are the queue namespaces used by the relayer.
55const QUEUE_NAMES: &[&str] = &[
56    "transaction_request_queue",
57    "transaction_submission_queue",
58    "transaction_status_queue",
59    "transaction_status_queue_evm",
60    "transaction_status_queue_stellar",
61    "notification_queue",
62    "token_swap_request_queue",
63    "relayer_health_check_queue",
64];
65
66/// Sorted set suffixes that contain job IDs to clean up.
67const SORTED_SET_SUFFIXES: &[&str] = &[":done", ":failed", ":dead"];
68
69/// Represents a cron reminder job for triggering system cleanup operations.
70#[derive(Default, Debug, Clone)]
71pub struct SystemCleanupCronReminder();
72
73/// Handles periodic queue metadata cleanup jobs.
74///
75/// This function processes stale job metadata by:
76/// 1. Acquiring a distributed lock to prevent concurrent cleanup
77/// 2. Iterating through all queue namespaces
78/// 3. For each queue, finding and removing job IDs older than threshold
79/// 4. Cleaning up associated data from the `:data` hash
80///
81/// # Arguments
82/// * `job` - The cron reminder job triggering the cleanup
83/// * `data` - Application state containing repositories
84/// * `ctx` - Worker context with attempt number and task ID
85///
86/// # Returns
87/// * `Result<(), HandlerError>` - Success or failure of cleanup processing
88#[instrument(
89    level = "debug",
90    skip(job, data),
91    fields(
92        job_type = "system_cleanup",
93        attempt = %ctx.attempt,
94    ),
95    err
96)]
97pub async fn system_cleanup_handler(
98    job: SystemCleanupCronReminder,
99    data: ThinData<DefaultAppState>,
100    ctx: WorkerContext,
101) -> Result<(), HandlerError> {
102    let result = handle_cleanup_request(job, &data).await;
103
104    handle_result(result, &ctx, "SystemCleanup", WORKER_SYSTEM_CLEANUP_RETRIES)
105}
106
107/// Handles the actual system cleanup request logic.
108///
109/// This function first attempts to acquire a distributed lock to ensure only
110/// one instance processes cleanup at a time. If the lock is already held by
111/// another instance, this returns early without doing any work.
112///
113/// Note: Queue metadata cleanup only runs when using Redis queue backend.
114/// SQS backend and in-memory mode skip cleanup since they don't use Redis queues.
115async fn handle_cleanup_request(
116    _job: SystemCleanupCronReminder,
117    data: &ThinData<DefaultAppState>,
118) -> Result<()> {
119    // Skip cleanup if not using Redis queue backend
120    let backend_type = data.job_producer().backend_type();
121    if backend_type != QueueBackendType::Redis {
122        debug!(
123            backend = %backend_type,
124            "Skipping queue metadata cleanup - not using Redis queue backend"
125        );
126        return Ok(());
127    }
128
129    let transaction_repo = data.transaction_repository();
130    let (redis_connections, key_prefix) =
131        match crate::repositories::TransactionRepository::connection_info(transaction_repo.as_ref())
132        {
133            Some((connections, prefix)) => (connections, prefix),
134            None => {
135                debug!("in-memory repository detected, skipping system cleanup");
136                return Ok(());
137            }
138        };
139    let pool = redis_connections.primary().clone();
140
141    // In distributed mode, acquire a lock to prevent multiple instances from
142    // running cleanup simultaneously. In single-instance mode, skip locking.
143    let _lock_guard = if ServerConfig::get_distributed_mode() {
144        let lock_key = format!("{key_prefix}:lock:{SYSTEM_CLEANUP_LOCK_NAME}");
145        let lock = DistributedLock::new(
146            pool.clone(),
147            &lock_key,
148            Duration::from_secs(SYSTEM_CLEANUP_LOCK_TTL_SECS),
149        );
150
151        match lock.try_acquire().await {
152            Ok(Some(guard)) => {
153                debug!(lock_key = %lock_key, "acquired distributed lock for system cleanup");
154                Some(guard)
155            }
156            Ok(None) => {
157                info!(lock_key = %lock_key, "system cleanup skipped - another instance is processing");
158                return Ok(());
159            }
160            Err(e) => {
161                // Fail closed: skip cleanup if we can't communicate with Redis for locking,
162                // to prevent concurrent execution across multiple instances
163                warn!(
164                    error = %e,
165                    lock_key = %lock_key,
166                    "failed to acquire distributed lock, skipping cleanup"
167                );
168                return Ok(());
169            }
170        }
171    } else {
172        debug!("distributed mode disabled, skipping lock acquisition");
173        None
174    };
175
176    info!("executing queue metadata cleanup");
177
178    // Queue keys use REDIS_KEY_PREFIX if set, with ":queue:" suffix
179    // Format: {REDIS_KEY_PREFIX}:queue:{queue_name}:done, etc.
180    // If REDIS_KEY_PREFIX is not set, keys are just {queue_name}:done, etc.
181    let redis_key_prefix = env::var("REDIS_KEY_PREFIX")
182        .ok()
183        .filter(|v| !v.is_empty())
184        .map(|value| format!("{value}:queue:"))
185        .unwrap_or_default();
186
187    let cutoff_timestamp = chrono::Utc::now().timestamp() - JOB_AGE_THRESHOLD_SECS;
188
189    let mut total_cleaned = 0usize;
190    let mut total_errors = 0usize;
191
192    // Process each queue
193    for queue_name in QUEUE_NAMES {
194        let namespace = format!("{redis_key_prefix}{queue_name}");
195
196        match cleanup_queue(&pool, &namespace, cutoff_timestamp).await {
197            Ok(cleaned) => {
198                if cleaned > 0 {
199                    debug!(
200                        queue = %queue_name,
201                        cleaned_count = cleaned,
202                        "cleaned up stale job metadata"
203                    );
204                }
205                total_cleaned += cleaned;
206            }
207            Err(e) => {
208                error!(
209                    queue = %queue_name,
210                    error = %e,
211                    "failed to cleanup queue"
212                );
213                total_errors += 1;
214            }
215        }
216    }
217
218    info!(
219        total_cleaned,
220        total_errors,
221        queues_processed = QUEUE_NAMES.len(),
222        "system cleanup completed"
223    );
224
225    if total_errors > 0 {
226        Err(eyre::eyre!(
227            "System cleanup completed with {} errors",
228            total_errors
229        ))
230    } else {
231        Ok(())
232    }
233}
234
235/// Cleans up stale job metadata for a single queue namespace.
236///
237/// # Arguments
238/// * `pool` - Redis connection pool
239/// * `namespace` - The queue namespace (e.g., "oz-relayer:queue:transaction_request_queue")
240/// * `cutoff_timestamp` - Unix timestamp; jobs older than this will be cleaned up
241///
242/// # Returns
243/// * `Result<usize>` - Number of jobs cleaned up
244async fn cleanup_queue(pool: &Arc<Pool>, namespace: &str, cutoff_timestamp: i64) -> Result<usize> {
245    let mut total_cleaned = 0usize;
246    let data_key = format!("{namespace}:data");
247    let result_key = format!("{data_key}::result");
248
249    // Clean up each sorted set (done, failed, dead) and associated hash entries
250    for suffix in SORTED_SET_SUFFIXES {
251        let sorted_set_key = format!("{namespace}{suffix}");
252        let cleaned = cleanup_sorted_set_and_hashes(
253            pool,
254            &sorted_set_key,
255            &data_key,
256            &result_key,
257            cutoff_timestamp,
258        )
259        .await?;
260        total_cleaned += cleaned;
261    }
262
263    Ok(total_cleaned)
264}
265
266/// Cleans up job IDs from a sorted set and their associated data/result hashes.
267///
268/// Uses ZRANGEBYSCORE to find old job IDs, then removes them from the
269/// sorted set and both the data and result hashes in a pipeline for efficiency.
270///
271/// # Arguments
272/// * `pool` - Redis connection pool
273/// * `sorted_set_key` - Key of the sorted set (e.g., "queue:transaction_request_queue:done")
274/// * `data_key` - Key of the data hash (e.g., "queue:transaction_request_queue:data")
275/// * `result_key` - Key of the result hash (e.g., "queue:transaction_request_queue:result")
276/// * `cutoff_timestamp` - Unix timestamp; jobs with score older than this will be cleaned up
277///
278/// # Returns
279/// * `Result<usize>` - Number of jobs cleaned up
280async fn cleanup_sorted_set_and_hashes(
281    pool: &Arc<Pool>,
282    sorted_set_key: &str,
283    data_key: &str,
284    result_key: &str,
285    cutoff_timestamp: i64,
286) -> Result<usize> {
287    let mut total_cleaned = 0usize;
288    let mut conn = pool.get().await?;
289
290    loop {
291        // Get batch of old job IDs from sorted set
292        // ZRANGEBYSCORE key -inf cutoff LIMIT 0 batch_size
293        let job_ids: Vec<String> = redis::cmd("ZRANGEBYSCORE")
294            .arg(sorted_set_key)
295            .arg("-inf")
296            .arg(cutoff_timestamp)
297            .arg("LIMIT")
298            .arg(0)
299            .arg(CLEANUP_BATCH_SIZE)
300            .query_async(&mut conn)
301            .await?;
302
303        if job_ids.is_empty() {
304            break;
305        }
306
307        let batch_size = job_ids.len();
308
309        // Use pipeline to remove from sorted set and both hashes atomically
310        let mut pipe = redis::pipe();
311
312        // ZREM sorted_set_key job_id1 job_id2 ...
313        pipe.cmd("ZREM").arg(sorted_set_key);
314        for job_id in &job_ids {
315            pipe.arg(job_id);
316        }
317        pipe.ignore();
318
319        // HDEL data_key job_id1 job_id2 ...
320        pipe.cmd("HDEL").arg(data_key);
321        for job_id in &job_ids {
322            pipe.arg(job_id);
323        }
324        pipe.ignore();
325
326        // HDEL result_key job_id1 job_id2 ...
327        pipe.cmd("HDEL").arg(result_key);
328        for job_id in &job_ids {
329            pipe.arg(job_id);
330        }
331        pipe.ignore();
332
333        pipe.query_async::<()>(&mut conn).await?;
334
335        total_cleaned += batch_size;
336
337        // If we got fewer than batch size, we're done
338        if batch_size < CLEANUP_BATCH_SIZE as usize {
339            break;
340        }
341    }
342
343    Ok(total_cleaned)
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349
350    #[test]
351    fn test_queue_names_not_empty() {
352        assert!(!QUEUE_NAMES.is_empty());
353    }
354
355    #[test]
356    fn test_sorted_set_suffixes() {
357        assert!(SORTED_SET_SUFFIXES.contains(&":done"));
358        assert!(SORTED_SET_SUFFIXES.contains(&":failed"));
359        assert!(SORTED_SET_SUFFIXES.contains(&":dead"));
360    }
361
362    #[test]
363    fn test_constants() {
364        assert_eq!(SYSTEM_CLEANUP_LOCK_TTL_SECS, 14 * 60); // 14 minutes
365        assert_eq!(JOB_AGE_THRESHOLD_SECS, 10 * 60); // 10 minutes
366        assert_eq!(CLEANUP_BATCH_SIZE, 500);
367    }
368
369    #[test]
370    fn test_namespace_format_without_prefix() {
371        // When REDIS_KEY_PREFIX is not set, queue keys are at root level
372        let redis_key_prefix = "";
373        let queue_name = "transaction_request_queue";
374        let namespace = format!("{redis_key_prefix}{queue_name}");
375        assert_eq!(namespace, "transaction_request_queue");
376    }
377
378    #[test]
379    fn test_namespace_format_with_prefix() {
380        // When REDIS_KEY_PREFIX is set, queue keys include the prefix
381        let redis_key_prefix = "oz-relayer:queue:";
382        let queue_name = "transaction_request_queue";
383        let namespace = format!("{redis_key_prefix}{queue_name}");
384        assert_eq!(namespace, "oz-relayer:queue:transaction_request_queue");
385    }
386
387    #[test]
388    fn test_sorted_set_key_format() {
389        // Without prefix
390        let namespace = "transaction_request_queue";
391        let sorted_set_key = format!("{namespace}:done");
392        assert_eq!(sorted_set_key, "transaction_request_queue:done");
393
394        // With prefix
395        let namespace_with_prefix = "oz-relayer:queue:transaction_request_queue";
396        let sorted_set_key_with_prefix = format!("{namespace_with_prefix}:done");
397        assert_eq!(
398            sorted_set_key_with_prefix,
399            "oz-relayer:queue:transaction_request_queue:done"
400        );
401    }
402
403    #[test]
404    fn test_data_key_format() {
405        // Without prefix
406        let namespace = "transaction_request_queue";
407        let data_key = format!("{namespace}:data");
408        assert_eq!(data_key, "transaction_request_queue:data");
409
410        // With prefix
411        let namespace_with_prefix = "oz-relayer:queue:transaction_request_queue";
412        let data_key_with_prefix = format!("{namespace_with_prefix}:data");
413        assert_eq!(
414            data_key_with_prefix,
415            "oz-relayer:queue:transaction_request_queue:data"
416        );
417    }
418
419    #[test]
420    fn test_result_key_format() {
421        // Without prefix
422        let namespace = "transaction_request_queue";
423        let result_key = format!("{namespace}:result");
424        assert_eq!(result_key, "transaction_request_queue:result");
425
426        // With prefix
427        let namespace_with_prefix = "oz-relayer:queue:transaction_request_queue";
428        let result_key_with_prefix = format!("{namespace_with_prefix}:result");
429        assert_eq!(
430            result_key_with_prefix,
431            "oz-relayer:queue:transaction_request_queue:result"
432        );
433    }
434
435    #[test]
436    fn test_lock_key_format() {
437        let prefix = "oz-relayer";
438        let lock_key = format!("{prefix}:lock:{SYSTEM_CLEANUP_LOCK_NAME}");
439        assert_eq!(lock_key, "oz-relayer:lock:system_queue_cleanup");
440    }
441
442    #[test]
443    fn test_cutoff_timestamp_calculation() {
444        let now = chrono::Utc::now().timestamp();
445        let cutoff = now - JOB_AGE_THRESHOLD_SECS;
446        assert!(cutoff < now);
447        assert_eq!(now - cutoff, JOB_AGE_THRESHOLD_SECS);
448    }
449}