openzeppelin_relayer/jobs/handlers/
transaction_cleanup_handler.rs

1//! Transaction cleanup worker implementation.
2//!
3//! This module implements the transaction cleanup worker that processes
4//! expired transactions marked for deletion. It runs as a cron job to
5//! automatically clean up transactions that have passed their delete_at timestamp.
6//!
7//! ## Distributed Lock
8//!
9//! Since this runs on multiple service instances simultaneously (each with its own
10//! CronStream), a distributed lock is used to ensure only one instance processes
11//! the cleanup at a time. The lock has a 9-minute TTL (the cron runs every 10 minutes),
12//! ensuring the lock expires before the next scheduled run.
13
14use actix_web::web::ThinData;
15use chrono::{DateTime, Utc};
16use eyre::Result;
17use std::sync::Arc;
18use std::time::Duration;
19use tracing::{debug, error, info, instrument, warn};
20
21use crate::{
22    config::ServerConfig,
23    constants::{
24        FINAL_TRANSACTION_STATUSES, TRANSACTION_CLEANUP_LOCK_TTL_SECS,
25        WORKER_TRANSACTION_CLEANUP_RETRIES,
26    },
27    jobs::handle_result,
28    models::{
29        DefaultAppState, NetworkTransactionData, PaginationQuery, RelayerRepoModel,
30        TransactionRepoModel, TransactionStatus,
31    },
32    queues::{HandlerError, WorkerContext},
33    repositories::{Repository, TransactionDeleteRequest, TransactionRepository},
34    utils::DistributedLock,
35};
36
37/// Maximum number of relayers to process concurrently
38const MAX_CONCURRENT_RELAYERS: usize = 10;
39
40/// Number of transactions to fetch per page during cleanup
41const CLEANUP_PAGE_SIZE: u32 = 100;
42
43/// Maximum number of transactions to delete in a single batch operation.
44/// This prevents overwhelming Redis with very large pipelines.
45const DELETE_BATCH_SIZE: usize = 100;
46
47/// Maximum page iterations per status before stopping.
48/// Prevents unbounded cleanup from exceeding the lock TTL.
49/// With CLEANUP_PAGE_SIZE=100, allows up to 150,000 transactions per status per run.
50const MAX_CLEANUP_ITERATIONS_PER_STATUS: u32 = 1500;
51
52/// Distributed lock name for transaction cleanup.
53/// Only one instance across the cluster should run cleanup at a time.
54const CLEANUP_LOCK_NAME: &str = "transaction_cleanup";
55
56/// Handles periodic transaction cleanup jobs from the queue.
57///
58/// This function processes expired transactions by:
59/// 1. Fetching all relayers from the system
60/// 2. For each relayer, finding transactions with final statuses
61/// 3. Checking if their delete_at timestamp has passed
62/// 4. Validating transactions are in final states before deletion
63/// 5. Deleting transactions that have expired (in parallel)
64///
65/// # Arguments
66/// * `job` - The cron reminder job triggering the cleanup
67/// * `data` - Application state containing repositories
68/// * `ctx` - Worker context with attempt number and task ID
69///
70/// # Returns
71/// * `Result<(), HandlerError>` - Success or failure of cleanup processing
72#[instrument(
73    level = "debug",
74    skip(job, data),
75    fields(
76        job_type = "transaction_cleanup",
77        attempt = %ctx.attempt,
78    ),
79    err
80)]
81pub async fn transaction_cleanup_handler(
82    job: TransactionCleanupCronReminder,
83    data: ThinData<DefaultAppState>,
84    ctx: WorkerContext,
85) -> Result<(), HandlerError> {
86    let result = handle_request(job, &data).await;
87
88    handle_result(
89        result,
90        &ctx,
91        "TransactionCleanup",
92        WORKER_TRANSACTION_CLEANUP_RETRIES,
93    )
94}
95
96/// Represents a cron reminder job for triggering cleanup operations.
97#[derive(Default, Debug, Clone)]
98pub struct TransactionCleanupCronReminder();
99
100/// Handles the actual transaction cleanup request logic.
101///
102/// This function first attempts to acquire a distributed lock to ensure only
103/// one instance processes cleanup at a time. If the lock is already held by
104/// another instance, this returns early without doing any work.
105///
106/// # Arguments
107/// * `_job` - The cron reminder job (currently unused)
108/// * `data` - Application state containing repositories
109///
110/// # Returns
111/// * `Result<()>` - Success or failure of the cleanup operation
112async fn handle_request(
113    _job: TransactionCleanupCronReminder,
114    data: &ThinData<DefaultAppState>,
115) -> Result<()> {
116    let transaction_repo = data.transaction_repository();
117
118    // In distributed mode, acquire a lock to prevent multiple instances from
119    // running cleanup simultaneously. In single-instance mode, skip locking.
120    let lock_guard = if ServerConfig::get_distributed_mode() {
121        if let Some((connections, prefix)) = transaction_repo.connection_info() {
122            let conn = connections.primary().clone();
123            let lock_key = format!("{prefix}:lock:{CLEANUP_LOCK_NAME}");
124            let lock = DistributedLock::new(
125                conn,
126                &lock_key,
127                Duration::from_secs(TRANSACTION_CLEANUP_LOCK_TTL_SECS),
128            );
129
130            match lock.try_acquire().await {
131                Ok(Some(guard)) => {
132                    debug!(lock_key = %lock_key, "acquired distributed lock for transaction cleanup");
133                    Some(guard)
134                }
135                Ok(None) => {
136                    info!(lock_key = %lock_key, "transaction cleanup skipped - another instance is processing");
137                    return Ok(());
138                }
139                Err(e) => {
140                    // Fail closed: skip cleanup if we can't communicate with Redis for locking,
141                    // to prevent concurrent execution across multiple instances
142                    warn!(
143                        error = %e,
144                        lock_key = %lock_key,
145                        "failed to acquire distributed lock, skipping cleanup"
146                    );
147                    return Ok(());
148                }
149            }
150        } else {
151            debug!("in-memory repository detected, skipping distributed lock");
152            None
153        }
154    } else {
155        debug!("distributed mode disabled, skipping lock acquisition");
156        None
157    };
158
159    let now = Utc::now();
160    info!(
161        timestamp = %now.to_rfc3339(),
162        "executing transaction cleanup from storage"
163    );
164
165    let relayer_repo = data.relayer_repository();
166
167    // Fetch all relayers
168    let relayers = relayer_repo.list_all().await.map_err(|e| {
169        error!(
170            error = %e,
171            "failed to fetch relayers for cleanup"
172        );
173        eyre::eyre!("Failed to fetch relayers: {}", e)
174    })?;
175
176    info!(
177        relayer_count = relayers.len(),
178        "found relayers to process for cleanup"
179    );
180
181    // Process relayers in parallel batches
182    let cleanup_results = process_relayers_in_batches(relayers, transaction_repo, now).await;
183
184    // Aggregate and report results
185    let result = report_cleanup_results(cleanup_results).await;
186
187    // Lock guard is automatically released when dropped (via Drop impl).
188    // This happens regardless of whether we exit normally or via early return/error.
189    drop(lock_guard);
190
191    result
192}
193
194/// Processes multiple relayers in parallel batches for cleanup.
195///
196/// # Arguments
197/// * `relayers` - List of relayers to process
198/// * `transaction_repo` - Reference to the transaction repository
199/// * `now` - Current UTC timestamp for comparison
200///
201/// # Returns
202/// * `Vec<RelayerCleanupResult>` - Results from processing each relayer
203async fn process_relayers_in_batches(
204    relayers: Vec<RelayerRepoModel>,
205    transaction_repo: Arc<impl TransactionRepository>,
206    now: DateTime<Utc>,
207) -> Vec<RelayerCleanupResult> {
208    use futures::stream::{self, StreamExt};
209
210    // Process relayers with limited concurrency to avoid overwhelming the system
211    let results: Vec<RelayerCleanupResult> = stream::iter(relayers)
212        .map(|relayer| {
213            let repo_clone = Arc::clone(&transaction_repo);
214            async move { process_single_relayer(relayer, repo_clone, now).await }
215        })
216        .buffer_unordered(MAX_CONCURRENT_RELAYERS)
217        .collect()
218        .await;
219
220    results
221}
222
223/// Result of processing a single relayer's transactions.
224#[derive(Debug)]
225struct RelayerCleanupResult {
226    relayer_id: String,
227    cleaned_count: usize,
228    error: Option<String>,
229}
230
231/// Processes cleanup for a single relayer by iterating over each final status independently.
232///
233/// Each status is processed separately to use efficient single-key Redis ZRANGE pagination
234/// instead of the multi-status merge path which fetches all IDs into memory.
235///
236/// # Arguments
237/// * `relayer` - The relayer to process
238/// * `transaction_repo` - Reference to the transaction repository
239/// * `now` - Current UTC timestamp for comparison
240///
241/// # Returns
242/// * `RelayerCleanupResult` - Result of processing this relayer
243async fn process_single_relayer(
244    relayer: RelayerRepoModel,
245    transaction_repo: Arc<impl TransactionRepository>,
246    now: DateTime<Utc>,
247) -> RelayerCleanupResult {
248    debug!(
249        relayer_id = %relayer.id,
250        "processing cleanup for relayer"
251    );
252
253    let mut total_cleaned = 0usize;
254
255    for status in FINAL_TRANSACTION_STATUSES {
256        match process_status_cleanup(&relayer.id, status, &transaction_repo, now).await {
257            Ok(cleaned) => total_cleaned += cleaned,
258            Err(e) => {
259                error!(
260                    error = %e,
261                    relayer_id = %relayer.id,
262                    status = ?status,
263                    "failed to cleanup transactions for status"
264                );
265                return RelayerCleanupResult {
266                    relayer_id: relayer.id,
267                    cleaned_count: total_cleaned,
268                    error: Some(e.to_string()),
269                };
270            }
271        }
272    }
273
274    if total_cleaned > 0 {
275        info!(
276            cleaned_count = total_cleaned,
277            relayer_id = %relayer.id,
278            "cleaned up expired transactions"
279        );
280    }
281
282    RelayerCleanupResult {
283        relayer_id: relayer.id,
284        cleaned_count: total_cleaned,
285        error: None,
286    }
287}
288
289/// Processes cleanup for a single status of a single relayer.
290///
291/// Uses stable pagination: when items are deleted, the same page is re-queried
292/// because deletions shift subsequent items into the current page's range.
293/// Only advances the page when no deletions occurred (items remain in place).
294///
295/// # Arguments
296/// * `relayer_id` - ID of the relayer
297/// * `status` - The transaction status to process
298/// * `transaction_repo` - Reference to the transaction repository
299/// * `now` - Current UTC timestamp for comparison
300///
301/// # Returns
302/// * `Result<usize>` - Number of transactions cleaned up for this status
303async fn process_status_cleanup(
304    relayer_id: &str,
305    status: &TransactionStatus,
306    transaction_repo: &Arc<impl TransactionRepository>,
307    now: DateTime<Utc>,
308) -> Result<usize> {
309    let mut current_page = 1u32;
310    let mut total_cleaned = 0usize;
311    let mut iterations = 0u32;
312
313    loop {
314        if iterations >= MAX_CLEANUP_ITERATIONS_PER_STATUS {
315            warn!(
316                relayer_id = %relayer_id,
317                status = ?status,
318                iterations,
319                total_cleaned,
320                "reached max cleanup iterations, stopping"
321            );
322            break;
323        }
324        iterations += 1;
325
326        let query = PaginationQuery {
327            page: current_page,
328            per_page: CLEANUP_PAGE_SIZE,
329        };
330
331        let page_result = transaction_repo
332            .find_by_status_paginated(relayer_id, &[status.clone()], query, true)
333            .await
334            .map_err(|e| {
335                eyre::eyre!(
336                    "Failed to fetch {:?} transactions for relayer {}: {}",
337                    status,
338                    relayer_id,
339                    e
340                )
341            })?;
342
343        if page_result.items.is_empty() {
344            break;
345        }
346
347        debug!(
348            page = current_page,
349            page_count = page_result.items.len(),
350            total = page_result.total,
351            relayer_id = %relayer_id,
352            status = ?status,
353            "processing page of transactions for cleanup"
354        );
355
356        let cleaned_count =
357            process_transactions_for_cleanup(page_result.items, transaction_repo, relayer_id, now)
358                .await;
359
360        total_cleaned += cleaned_count;
361
362        if cleaned_count == 0 {
363            // No items were deleted on this page, so items remain in place.
364            // Advance to the next page to check further items.
365            current_page += 1;
366        }
367        // When items were deleted, stay on the same page: deletions shift
368        // subsequent items into the current page range, so re-querying
369        // the same page picks up previously-unreachable items.
370    }
371
372    if total_cleaned > 0 {
373        debug!(
374            total_cleaned,
375            relayer_id = %relayer_id,
376            status = ?status,
377            "status cleanup completed"
378        );
379    }
380
381    Ok(total_cleaned)
382}
383
384/// Fetches a page of transactions with final statuses for a specific relayer.
385/// Used in tests to verify pagination behavior across all final statuses.
386#[cfg(test)]
387async fn fetch_final_transactions_paginated(
388    relayer_id: &str,
389    transaction_repo: &Arc<impl TransactionRepository>,
390    query: PaginationQuery,
391) -> Result<crate::repositories::PaginatedResult<TransactionRepoModel>> {
392    transaction_repo
393        .find_by_status_paginated(relayer_id, FINAL_TRANSACTION_STATUSES, query, true)
394        .await
395        .map_err(|e| {
396            eyre::eyre!(
397                "Failed to fetch final transactions for relayer {}: {}",
398                relayer_id,
399                e
400            )
401        })
402}
403
404/// Processes a list of transactions for cleanup using batch delete, deleting expired ones.
405///
406/// This function validates that transactions are in final states before deletion,
407/// ensuring data integrity by preventing accidental deletion of active transactions.
408/// Uses batch deletion for improved performance with large numbers of transactions.
409///
410/// # Arguments
411/// * `transactions` - List of transactions to process
412/// * `transaction_repo` - Reference to the transaction repository
413/// * `relayer_id` - ID of the relayer (for logging)
414/// * `now` - Current UTC timestamp for comparison
415///
416/// # Returns
417/// * `usize` - Number of transactions successfully cleaned up
418async fn process_transactions_for_cleanup(
419    transactions: Vec<TransactionRepoModel>,
420    transaction_repo: &Arc<impl TransactionRepository>,
421    relayer_id: &str,
422    now: DateTime<Utc>,
423) -> usize {
424    if transactions.is_empty() {
425        return 0;
426    }
427
428    debug!(
429        transaction_count = transactions.len(),
430        relayer_id = %relayer_id,
431        "processing transactions for cleanup"
432    );
433
434    // Filter expired transactions and validate they are in final states,
435    // then convert to delete requests with pre-extracted data
436    let delete_requests: Vec<TransactionDeleteRequest> = transactions
437        .into_iter()
438        .filter(|tx| {
439            // Must be in a final state
440            if !FINAL_TRANSACTION_STATUSES.contains(&tx.status) {
441                warn!(
442                    tx_id = %tx.id,
443                    status = ?tx.status,
444                    "skipping transaction not in final state"
445                );
446                return false;
447            }
448            // Must be expired
449            should_delete_transaction(tx, now)
450        })
451        .map(|tx| {
452            // Extract nonce from network data for index cleanup
453            let nonce = extract_nonce_from_network_data(&tx.network_data);
454            TransactionDeleteRequest::new(tx.id, tx.relayer_id, nonce)
455        })
456        .collect();
457
458    if delete_requests.is_empty() {
459        debug!(
460            relayer_id = %relayer_id,
461            "no expired transactions found"
462        );
463        return 0;
464    }
465
466    let total_expired = delete_requests.len();
467    debug!(
468        expired_count = total_expired,
469        relayer_id = %relayer_id,
470        "found expired transactions to delete"
471    );
472
473    // Process deletions in batches to avoid overwhelming Redis with large pipelines
474    let mut total_deleted = 0;
475    let mut total_failed = 0;
476
477    for (batch_idx, batch) in delete_requests.chunks(DELETE_BATCH_SIZE).enumerate() {
478        let batch_requests: Vec<TransactionDeleteRequest> = batch.to_vec();
479        let batch_size = batch_requests.len();
480
481        debug!(
482            batch = batch_idx + 1,
483            batch_size = batch_size,
484            relayer_id = %relayer_id,
485            "processing delete batch"
486        );
487
488        match transaction_repo.delete_by_requests(batch_requests).await {
489            Ok(result) => {
490                if !result.failed.is_empty() {
491                    for (id, error) in &result.failed {
492                        error!(
493                            tx_id = %id,
494                            error = %error,
495                            relayer_id = %relayer_id,
496                            "failed to delete expired transaction in batch"
497                        );
498                    }
499                }
500
501                total_deleted += result.deleted_count;
502                total_failed += result.failed.len();
503            }
504            Err(e) => {
505                error!(
506                    error = %e,
507                    relayer_id = %relayer_id,
508                    batch = batch_idx + 1,
509                    batch_size = batch_size,
510                    "batch delete failed completely"
511                );
512                total_failed += batch_size;
513            }
514        }
515    }
516
517    debug!(
518        total_deleted,
519        total_failed,
520        total_expired,
521        relayer_id = %relayer_id,
522        "batch delete completed"
523    );
524
525    total_deleted
526}
527
528/// Extracts the nonce from network transaction data if available.
529/// This is used for cleaning up nonce indexes during deletion.
530fn extract_nonce_from_network_data(network_data: &NetworkTransactionData) -> Option<u64> {
531    match network_data {
532        NetworkTransactionData::Evm(evm_data) => evm_data.nonce,
533        _ => None,
534    }
535}
536
537/// Determines if a transaction should be deleted based on its delete_at timestamp.
538///
539/// # Arguments
540/// * `transaction` - The transaction to check
541/// * `now` - Current UTC timestamp for comparison
542///
543/// # Returns
544/// * `bool` - True if the transaction should be deleted, false otherwise
545fn should_delete_transaction(transaction: &TransactionRepoModel, now: DateTime<Utc>) -> bool {
546    transaction
547        .delete_at
548        .as_ref()
549        .and_then(|delete_at_str| DateTime::parse_from_rfc3339(delete_at_str).ok())
550        .map(|delete_at| {
551            let is_expired = now >= delete_at.with_timezone(&Utc);
552            if is_expired {
553                debug!(
554                    tx_id = %transaction.id,
555                    expired_at = %delete_at.to_rfc3339(),
556                    "transaction is expired"
557                );
558            }
559            is_expired
560        })
561        .unwrap_or_else(|| {
562            if transaction.delete_at.is_some() {
563                warn!(
564                    tx_id = %transaction.id,
565                    "transaction has invalid delete_at timestamp"
566                );
567            }
568            false
569        })
570}
571
572/// Reports the aggregated results of the cleanup operation.
573///
574/// # Arguments
575/// * `cleanup_results` - Results from processing all relayers
576///
577/// # Returns
578/// * `Result<()>` - Success if all went well, error if there were failures
579async fn report_cleanup_results(cleanup_results: Vec<RelayerCleanupResult>) -> Result<()> {
580    let total_cleaned: usize = cleanup_results.iter().map(|r| r.cleaned_count).sum();
581    let total_errors = cleanup_results.iter().filter(|r| r.error.is_some()).count();
582    let total_relayers = cleanup_results.len();
583
584    // Log detailed results for relayers with errors
585    for result in &cleanup_results {
586        if let Some(error) = &result.error {
587            error!(
588                relayer_id = %result.relayer_id,
589                error = %error,
590                "failed to cleanup transactions for relayer"
591            );
592        }
593    }
594
595    if total_errors > 0 {
596        warn!(
597            total_errors,
598            total_relayers, total_cleaned, "transaction cleanup completed with errors"
599        );
600
601        // Return error if there were failures, but don't fail the entire job
602        // This allows for partial success and retry of failed relayers
603        Err(eyre::eyre!(
604            "Cleanup completed with {} errors out of {} relayers",
605            total_errors,
606            total_relayers
607        ))
608    } else {
609        info!(
610            total_cleaned,
611            total_relayers, "transaction cleanup completed successfully"
612        );
613        Ok(())
614    }
615}
616
617#[cfg(test)]
618mod tests {
619
620    use super::*;
621    use crate::{
622        models::{
623            NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel,
624            TransactionRepoModel, TransactionStatus,
625        },
626        repositories::{InMemoryTransactionRepository, Repository},
627        utils::mocks::mockutils::create_mock_transaction,
628    };
629    use chrono::{Duration, Utc};
630
631    fn create_test_transaction(
632        id: &str,
633        relayer_id: &str,
634        status: TransactionStatus,
635        delete_at: Option<String>,
636    ) -> TransactionRepoModel {
637        let mut tx = create_mock_transaction();
638        tx.id = id.to_string();
639        tx.relayer_id = relayer_id.to_string();
640        tx.status = status;
641        tx.delete_at = delete_at;
642        tx
643    }
644
645    #[tokio::test]
646    async fn test_should_delete_transaction_expired() {
647        let now = Utc::now();
648        let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
649
650        let transaction = create_test_transaction(
651            "test-tx",
652            "test-relayer",
653            TransactionStatus::Confirmed,
654            Some(expired_delete_at),
655        );
656
657        assert!(should_delete_transaction(&transaction, now));
658    }
659
660    #[tokio::test]
661    async fn test_should_delete_transaction_not_expired() {
662        let now = Utc::now();
663        let future_delete_at = (now + Duration::hours(1)).to_rfc3339();
664
665        let transaction = create_test_transaction(
666            "test-tx",
667            "test-relayer",
668            TransactionStatus::Confirmed,
669            Some(future_delete_at),
670        );
671
672        assert!(!should_delete_transaction(&transaction, now));
673    }
674
675    #[tokio::test]
676    async fn test_should_delete_transaction_no_delete_at() {
677        let now = Utc::now();
678
679        let transaction = create_test_transaction(
680            "test-tx",
681            "test-relayer",
682            TransactionStatus::Confirmed,
683            None,
684        );
685
686        assert!(!should_delete_transaction(&transaction, now));
687    }
688
689    #[tokio::test]
690    async fn test_should_delete_transaction_invalid_timestamp() {
691        let now = Utc::now();
692
693        let transaction = create_test_transaction(
694            "test-tx",
695            "test-relayer",
696            TransactionStatus::Confirmed,
697            Some("invalid-timestamp".to_string()),
698        );
699
700        assert!(!should_delete_transaction(&transaction, now));
701    }
702
703    #[tokio::test]
704    async fn test_process_transactions_for_cleanup_parallel() {
705        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
706        let relayer_id = "test-relayer";
707        let now = Utc::now();
708
709        // Create test transactions
710        let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
711        let future_delete_at = (now + Duration::hours(1)).to_rfc3339();
712
713        let expired_tx = create_test_transaction(
714            "expired-tx",
715            relayer_id,
716            TransactionStatus::Confirmed,
717            Some(expired_delete_at),
718        );
719        let future_tx = create_test_transaction(
720            "future-tx",
721            relayer_id,
722            TransactionStatus::Failed,
723            Some(future_delete_at),
724        );
725        let no_delete_tx = create_test_transaction(
726            "no-delete-tx",
727            relayer_id,
728            TransactionStatus::Canceled,
729            None,
730        );
731
732        // Store transactions
733        transaction_repo.create(expired_tx.clone()).await.unwrap();
734        transaction_repo.create(future_tx.clone()).await.unwrap();
735        transaction_repo.create(no_delete_tx.clone()).await.unwrap();
736
737        let transactions = vec![expired_tx, future_tx, no_delete_tx];
738
739        // Process transactions
740        let cleaned_count =
741            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
742                .await;
743
744        // Should have cleaned up 1 expired transaction
745        assert_eq!(cleaned_count, 1);
746
747        // Verify expired transaction was deleted
748        assert!(transaction_repo
749            .get_by_id("expired-tx".to_string())
750            .await
751            .is_err());
752
753        // Verify non-expired transactions still exist
754        assert!(transaction_repo
755            .get_by_id("future-tx".to_string())
756            .await
757            .is_ok());
758        assert!(transaction_repo
759            .get_by_id("no-delete-tx".to_string())
760            .await
761            .is_ok());
762    }
763
764    #[tokio::test]
765    async fn test_batch_delete_expired_transactions() {
766        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
767        let relayer_id = "test-relayer";
768        let now = Utc::now();
769
770        // Create multiple expired transactions
771        let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
772
773        for i in 0..5 {
774            let tx = create_test_transaction(
775                &format!("expired-tx-{i}"),
776                relayer_id,
777                TransactionStatus::Confirmed,
778                Some(expired_delete_at.clone()),
779            );
780            transaction_repo.create(tx).await.unwrap();
781        }
782
783        // Verify they exist
784        assert_eq!(transaction_repo.count().await.unwrap(), 5);
785
786        // Delete them using batch delete
787        let ids: Vec<String> = (0..5).map(|i| format!("expired-tx-{i}")).collect();
788        let result = transaction_repo.delete_by_ids(ids).await.unwrap();
789
790        assert_eq!(result.deleted_count, 5);
791        assert!(result.failed.is_empty());
792
793        // Verify they were deleted
794        assert_eq!(transaction_repo.count().await.unwrap(), 0);
795    }
796
797    #[tokio::test]
798    async fn test_batch_delete_with_nonexistent_ids() {
799        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
800        let relayer_id = "test-relayer";
801
802        // Create one transaction
803        let tx = create_test_transaction(
804            "existing-tx",
805            relayer_id,
806            TransactionStatus::Confirmed,
807            Some(Utc::now().to_rfc3339()),
808        );
809        transaction_repo.create(tx).await.unwrap();
810
811        // Try to delete existing and non-existing transactions
812        let ids = vec![
813            "existing-tx".to_string(),
814            "nonexistent-1".to_string(),
815            "nonexistent-2".to_string(),
816        ];
817        let result = transaction_repo.delete_by_ids(ids).await.unwrap();
818
819        // Should delete the existing one and report failures for the others
820        assert_eq!(result.deleted_count, 1);
821        assert_eq!(result.failed.len(), 2);
822
823        // Verify the existing one was deleted
824        assert!(transaction_repo
825            .get_by_id("existing-tx".to_string())
826            .await
827            .is_err());
828    }
829
830    #[tokio::test]
831    async fn test_process_transactions_skips_non_final_status() {
832        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
833        let relayer_id = "test-relayer";
834        let now = Utc::now();
835
836        // Create a transaction with non-final status but expired delete_at
837        let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
838        let pending_tx = create_test_transaction(
839            "pending-tx",
840            relayer_id,
841            TransactionStatus::Pending, // Non-final status
842            Some(expired_delete_at),
843        );
844        transaction_repo.create(pending_tx.clone()).await.unwrap();
845
846        let transactions = vec![pending_tx];
847
848        // Process should skip non-final status transactions
849        let cleaned_count =
850            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
851                .await;
852
853        // Should not have cleaned any transactions
854        assert_eq!(cleaned_count, 0);
855
856        // Transaction should still exist
857        assert!(transaction_repo
858            .get_by_id("pending-tx".to_string())
859            .await
860            .is_ok());
861    }
862
863    #[tokio::test]
864    async fn test_fetch_final_transactions_paginated() {
865        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
866        let relayer_id = "test-relayer";
867
868        // Create transactions with different statuses
869        let confirmed_tx = create_test_transaction(
870            "confirmed-tx",
871            relayer_id,
872            TransactionStatus::Confirmed,
873            None,
874        );
875        let pending_tx =
876            create_test_transaction("pending-tx", relayer_id, TransactionStatus::Pending, None);
877        let failed_tx =
878            create_test_transaction("failed-tx", relayer_id, TransactionStatus::Failed, None);
879
880        // Store transactions
881        transaction_repo.create(confirmed_tx).await.unwrap();
882        transaction_repo.create(pending_tx).await.unwrap();
883        transaction_repo.create(failed_tx).await.unwrap();
884
885        // Fetch final transactions with pagination
886        let query = PaginationQuery {
887            page: 1,
888            per_page: 10,
889        };
890        let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
891            .await
892            .unwrap();
893
894        // Should only return transactions with final statuses (Confirmed, Failed)
895        assert_eq!(result.total, 2);
896        assert_eq!(result.items.len(), 2);
897        let final_ids: Vec<&String> = result.items.iter().map(|tx| &tx.id).collect();
898        assert!(final_ids.contains(&&"confirmed-tx".to_string()));
899        assert!(final_ids.contains(&&"failed-tx".to_string()));
900        assert!(!final_ids.contains(&&"pending-tx".to_string()));
901    }
902
903    #[tokio::test]
904    async fn test_report_cleanup_results_success() {
905        let results = vec![
906            RelayerCleanupResult {
907                relayer_id: "relayer-1".to_string(),
908                cleaned_count: 2,
909                error: None,
910            },
911            RelayerCleanupResult {
912                relayer_id: "relayer-2".to_string(),
913                cleaned_count: 1,
914                error: None,
915            },
916        ];
917
918        let result = report_cleanup_results(results).await;
919        assert!(result.is_ok());
920    }
921
922    #[tokio::test]
923    async fn test_report_cleanup_results_with_errors() {
924        let results = vec![
925            RelayerCleanupResult {
926                relayer_id: "relayer-1".to_string(),
927                cleaned_count: 2,
928                error: None,
929            },
930            RelayerCleanupResult {
931                relayer_id: "relayer-2".to_string(),
932                cleaned_count: 0,
933                error: Some("Database error".to_string()),
934            },
935        ];
936
937        let result = report_cleanup_results(results).await;
938        assert!(result.is_err());
939    }
940
941    #[tokio::test]
942    async fn test_process_single_relayer_success() {
943        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
944        let relayer = RelayerRepoModel {
945            id: "test-relayer".to_string(),
946            name: "Test Relayer".to_string(),
947            network: "ethereum".to_string(),
948            paused: false,
949            network_type: NetworkType::Evm,
950            signer_id: "test-signer".to_string(),
951            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
952            address: "0x1234567890123456789012345678901234567890".to_string(),
953            notification_id: None,
954            system_disabled: false,
955            custom_rpc_urls: None,
956            ..Default::default()
957        };
958        let now = Utc::now();
959
960        // Create expired and non-expired transactions
961        let expired_tx = create_test_transaction(
962            "expired-tx",
963            &relayer.id,
964            TransactionStatus::Confirmed,
965            Some((now - Duration::hours(1)).to_rfc3339()),
966        );
967        let future_tx = create_test_transaction(
968            "future-tx",
969            &relayer.id,
970            TransactionStatus::Failed,
971            Some((now + Duration::hours(1)).to_rfc3339()),
972        );
973
974        transaction_repo.create(expired_tx).await.unwrap();
975        transaction_repo.create(future_tx).await.unwrap();
976
977        let result = process_single_relayer(relayer.clone(), transaction_repo.clone(), now).await;
978
979        assert_eq!(result.relayer_id, relayer.id);
980        assert_eq!(result.cleaned_count, 1);
981        assert!(result.error.is_none());
982    }
983
984    #[tokio::test]
985    async fn test_process_single_relayer_no_transactions() {
986        // Create a relayer with no transactions in the repo
987        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
988        let relayer = RelayerRepoModel {
989            id: "empty-relayer".to_string(),
990            name: "Empty Relayer".to_string(),
991            network: "ethereum".to_string(),
992            paused: false,
993            network_type: NetworkType::Evm,
994            signer_id: "test-signer".to_string(),
995            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
996            address: "0x1234567890123456789012345678901234567890".to_string(),
997            notification_id: None,
998            system_disabled: false,
999            custom_rpc_urls: None,
1000            ..Default::default()
1001        };
1002        let now = Utc::now();
1003
1004        // This should succeed but find no transactions
1005        let result = process_single_relayer(relayer.clone(), transaction_repo, now).await;
1006
1007        assert_eq!(result.relayer_id, relayer.id);
1008        assert_eq!(result.cleaned_count, 0);
1009        assert!(result.error.is_none()); // No error, just no transactions found
1010    }
1011
1012    #[tokio::test]
1013    async fn test_process_transactions_with_empty_list() {
1014        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1015        let relayer_id = "test-relayer";
1016        let now = Utc::now();
1017        let transactions = vec![];
1018
1019        let cleaned_count =
1020            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
1021                .await;
1022
1023        assert_eq!(cleaned_count, 0);
1024    }
1025
1026    #[tokio::test]
1027    async fn test_process_transactions_with_no_expired() {
1028        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1029        let relayer_id = "test-relayer";
1030        let now = Utc::now();
1031
1032        // Create only non-expired transactions
1033        let future_tx1 = create_test_transaction(
1034            "future-tx-1",
1035            relayer_id,
1036            TransactionStatus::Confirmed,
1037            Some((now + Duration::hours(1)).to_rfc3339()),
1038        );
1039        let future_tx2 = create_test_transaction(
1040            "future-tx-2",
1041            relayer_id,
1042            TransactionStatus::Failed,
1043            Some((now + Duration::hours(2)).to_rfc3339()),
1044        );
1045        let no_delete_tx = create_test_transaction(
1046            "no-delete-tx",
1047            relayer_id,
1048            TransactionStatus::Canceled,
1049            None,
1050        );
1051
1052        let transactions = vec![future_tx1, future_tx2, no_delete_tx];
1053
1054        let cleaned_count =
1055            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
1056                .await;
1057
1058        assert_eq!(cleaned_count, 0);
1059    }
1060
1061    #[tokio::test]
1062    async fn test_should_delete_transaction_exactly_at_expiry_time() {
1063        let now = Utc::now();
1064        let exact_expiry_time = now.to_rfc3339();
1065
1066        let transaction = create_test_transaction(
1067            "test-tx",
1068            "test-relayer",
1069            TransactionStatus::Confirmed,
1070            Some(exact_expiry_time),
1071        );
1072
1073        // Should be considered expired when exactly at expiry time
1074        assert!(should_delete_transaction(&transaction, now));
1075    }
1076
1077    #[tokio::test]
1078    async fn test_parallel_processing_with_mixed_results() {
1079        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1080        let relayer_id = "test-relayer";
1081        let now = Utc::now();
1082
1083        // Create multiple expired transactions
1084        let expired_tx1 = create_test_transaction(
1085            "expired-tx-1",
1086            relayer_id,
1087            TransactionStatus::Confirmed,
1088            Some((now - Duration::hours(1)).to_rfc3339()),
1089        );
1090        let expired_tx2 = create_test_transaction(
1091            "expired-tx-2",
1092            relayer_id,
1093            TransactionStatus::Failed,
1094            Some((now - Duration::hours(2)).to_rfc3339()),
1095        );
1096        let expired_tx3 = create_test_transaction(
1097            "expired-tx-3",
1098            relayer_id,
1099            TransactionStatus::Canceled,
1100            Some((now - Duration::hours(3)).to_rfc3339()),
1101        );
1102
1103        // Store only some transactions (others will fail deletion due to NotFound)
1104        transaction_repo.create(expired_tx1.clone()).await.unwrap();
1105        transaction_repo.create(expired_tx2.clone()).await.unwrap();
1106        // Don't store expired_tx3 - it will fail deletion
1107
1108        let transactions = vec![expired_tx1, expired_tx2, expired_tx3];
1109
1110        let cleaned_count =
1111            process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
1112                .await;
1113
1114        // Should have cleaned 2 out of 3 transactions (one failed due to NotFound)
1115        assert_eq!(cleaned_count, 2);
1116    }
1117
1118    #[tokio::test]
1119    async fn test_report_cleanup_results_empty() {
1120        let results = vec![];
1121        let result = report_cleanup_results(results).await;
1122        assert!(result.is_ok());
1123    }
1124
1125    #[tokio::test]
1126    async fn test_fetch_final_transactions_paginated_with_mixed_statuses() {
1127        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1128        let relayer_id = "test-relayer";
1129
1130        // Create transactions with all possible statuses
1131        let confirmed_tx = create_test_transaction(
1132            "confirmed-tx",
1133            relayer_id,
1134            TransactionStatus::Confirmed,
1135            None,
1136        );
1137        let failed_tx =
1138            create_test_transaction("failed-tx", relayer_id, TransactionStatus::Failed, None);
1139        let canceled_tx =
1140            create_test_transaction("canceled-tx", relayer_id, TransactionStatus::Canceled, None);
1141        let expired_tx =
1142            create_test_transaction("expired-tx", relayer_id, TransactionStatus::Expired, None);
1143        let pending_tx =
1144            create_test_transaction("pending-tx", relayer_id, TransactionStatus::Pending, None);
1145        let sent_tx = create_test_transaction("sent-tx", relayer_id, TransactionStatus::Sent, None);
1146
1147        // Store all transactions
1148        transaction_repo.create(confirmed_tx).await.unwrap();
1149        transaction_repo.create(failed_tx).await.unwrap();
1150        transaction_repo.create(canceled_tx).await.unwrap();
1151        transaction_repo.create(expired_tx).await.unwrap();
1152        transaction_repo.create(pending_tx).await.unwrap();
1153        transaction_repo.create(sent_tx).await.unwrap();
1154
1155        // Fetch final transactions with pagination
1156        let query = PaginationQuery {
1157            page: 1,
1158            per_page: 10,
1159        };
1160        let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
1161            .await
1162            .unwrap();
1163
1164        // Should only return the 4 final status transactions
1165        assert_eq!(result.total, 4);
1166        assert_eq!(result.items.len(), 4);
1167        let final_ids: Vec<&String> = result.items.iter().map(|tx| &tx.id).collect();
1168        assert!(final_ids.contains(&&"confirmed-tx".to_string()));
1169        assert!(final_ids.contains(&&"failed-tx".to_string()));
1170        assert!(final_ids.contains(&&"canceled-tx".to_string()));
1171        assert!(final_ids.contains(&&"expired-tx".to_string()));
1172        assert!(!final_ids.contains(&&"pending-tx".to_string()));
1173        assert!(!final_ids.contains(&&"sent-tx".to_string()));
1174    }
1175
1176    #[tokio::test]
1177    async fn test_fetch_final_transactions_paginated_pagination() {
1178        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1179        let relayer_id = "test-relayer";
1180
1181        // Create 5 confirmed transactions
1182        for i in 1..=5 {
1183            let mut tx = create_test_transaction(
1184                &format!("tx-{i}"),
1185                relayer_id,
1186                TransactionStatus::Confirmed,
1187                None,
1188            );
1189            tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
1190            transaction_repo.create(tx).await.unwrap();
1191        }
1192
1193        // Test first page with 2 items
1194        let query = PaginationQuery {
1195            page: 1,
1196            per_page: 2,
1197        };
1198        let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
1199            .await
1200            .unwrap();
1201
1202        assert_eq!(result.total, 5);
1203        assert_eq!(result.items.len(), 2);
1204        assert_eq!(result.page, 1);
1205
1206        // Test second page
1207        let query = PaginationQuery {
1208            page: 2,
1209            per_page: 2,
1210        };
1211        let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
1212            .await
1213            .unwrap();
1214
1215        assert_eq!(result.total, 5);
1216        assert_eq!(result.items.len(), 2);
1217        assert_eq!(result.page, 2);
1218
1219        // Test last page (partial)
1220        let query = PaginationQuery {
1221            page: 3,
1222            per_page: 2,
1223        };
1224        let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
1225            .await
1226            .unwrap();
1227
1228        assert_eq!(result.total, 5);
1229        assert_eq!(result.items.len(), 1);
1230        assert_eq!(result.page, 3);
1231    }
1232
1233    #[tokio::test]
1234    async fn test_process_status_cleanup_deletes_expired() {
1235        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1236        let relayer_id = "test-relayer";
1237        let now = Utc::now();
1238
1239        // Create expired and non-expired transactions of the same status
1240        let expired_tx = create_test_transaction(
1241            "expired-tx",
1242            relayer_id,
1243            TransactionStatus::Confirmed,
1244            Some((now - Duration::hours(1)).to_rfc3339()),
1245        );
1246        let future_tx = create_test_transaction(
1247            "future-tx",
1248            relayer_id,
1249            TransactionStatus::Confirmed,
1250            Some((now + Duration::hours(1)).to_rfc3339()),
1251        );
1252
1253        transaction_repo.create(expired_tx).await.unwrap();
1254        transaction_repo.create(future_tx).await.unwrap();
1255
1256        let cleaned = process_status_cleanup(
1257            relayer_id,
1258            &TransactionStatus::Confirmed,
1259            &transaction_repo,
1260            now,
1261        )
1262        .await
1263        .unwrap();
1264
1265        assert_eq!(cleaned, 1);
1266
1267        // Expired one deleted, future one remains
1268        assert!(transaction_repo
1269            .get_by_id("expired-tx".to_string())
1270            .await
1271            .is_err());
1272        assert!(transaction_repo
1273            .get_by_id("future-tx".to_string())
1274            .await
1275            .is_ok());
1276    }
1277
1278    #[tokio::test]
1279    async fn test_process_status_cleanup_no_transactions() {
1280        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1281        let relayer_id = "test-relayer";
1282        let now = Utc::now();
1283
1284        let cleaned = process_status_cleanup(
1285            relayer_id,
1286            &TransactionStatus::Confirmed,
1287            &transaction_repo,
1288            now,
1289        )
1290        .await
1291        .unwrap();
1292
1293        assert_eq!(cleaned, 0);
1294    }
1295
1296    #[tokio::test]
1297    async fn test_process_status_cleanup_skips_other_statuses() {
1298        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1299        let relayer_id = "test-relayer";
1300        let now = Utc::now();
1301
1302        // Create expired transaction with Failed status
1303        let tx = create_test_transaction(
1304            "failed-tx",
1305            relayer_id,
1306            TransactionStatus::Failed,
1307            Some((now - Duration::hours(1)).to_rfc3339()),
1308        );
1309        transaction_repo.create(tx).await.unwrap();
1310
1311        // Cleanup for Confirmed status should not touch Failed transactions
1312        let cleaned = process_status_cleanup(
1313            relayer_id,
1314            &TransactionStatus::Confirmed,
1315            &transaction_repo,
1316            now,
1317        )
1318        .await
1319        .unwrap();
1320
1321        assert_eq!(cleaned, 0);
1322        assert!(transaction_repo
1323            .get_by_id("failed-tx".to_string())
1324            .await
1325            .is_ok());
1326    }
1327
1328    #[tokio::test]
1329    async fn test_process_single_relayer_processes_all_final_statuses() {
1330        let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1331        let relayer = RelayerRepoModel {
1332            id: "test-relayer".to_string(),
1333            name: "Test Relayer".to_string(),
1334            network: "ethereum".to_string(),
1335            paused: false,
1336            network_type: NetworkType::Evm,
1337            signer_id: "test-signer".to_string(),
1338            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
1339            address: "0x1234567890123456789012345678901234567890".to_string(),
1340            notification_id: None,
1341            system_disabled: false,
1342            custom_rpc_urls: None,
1343            ..Default::default()
1344        };
1345        let now = Utc::now();
1346        let expired_at = (now - Duration::hours(1)).to_rfc3339();
1347
1348        // Create one expired transaction per final status
1349        for (i, status) in [
1350            TransactionStatus::Confirmed,
1351            TransactionStatus::Failed,
1352            TransactionStatus::Canceled,
1353            TransactionStatus::Expired,
1354        ]
1355        .iter()
1356        .enumerate()
1357        {
1358            let tx = create_test_transaction(
1359                &format!("tx-{}", i),
1360                &relayer.id,
1361                status.clone(),
1362                Some(expired_at.clone()),
1363            );
1364            transaction_repo.create(tx).await.unwrap();
1365        }
1366
1367        let result = process_single_relayer(relayer.clone(), transaction_repo.clone(), now).await;
1368
1369        assert_eq!(result.relayer_id, relayer.id);
1370        assert_eq!(result.cleaned_count, 4);
1371        assert!(result.error.is_none());
1372
1373        // All should be deleted
1374        assert_eq!(transaction_repo.count().await.unwrap(), 0);
1375    }
1376}