1use actix_web::web::ThinData;
15use chrono::{DateTime, Utc};
16use eyre::Result;
17use std::sync::Arc;
18use std::time::Duration;
19use tracing::{debug, error, info, instrument, warn};
20
21use crate::{
22 config::ServerConfig,
23 constants::{
24 FINAL_TRANSACTION_STATUSES, TRANSACTION_CLEANUP_LOCK_TTL_SECS,
25 WORKER_TRANSACTION_CLEANUP_RETRIES,
26 },
27 jobs::handle_result,
28 models::{
29 DefaultAppState, NetworkTransactionData, PaginationQuery, RelayerRepoModel,
30 TransactionRepoModel, TransactionStatus,
31 },
32 queues::{HandlerError, WorkerContext},
33 repositories::{Repository, TransactionDeleteRequest, TransactionRepository},
34 utils::DistributedLock,
35};
36
37const MAX_CONCURRENT_RELAYERS: usize = 10;
39
40const CLEANUP_PAGE_SIZE: u32 = 100;
42
43const DELETE_BATCH_SIZE: usize = 100;
46
47const MAX_CLEANUP_ITERATIONS_PER_STATUS: u32 = 1500;
51
52const CLEANUP_LOCK_NAME: &str = "transaction_cleanup";
55
56#[instrument(
73 level = "debug",
74 skip(job, data),
75 fields(
76 job_type = "transaction_cleanup",
77 attempt = %ctx.attempt,
78 ),
79 err
80)]
81pub async fn transaction_cleanup_handler(
82 job: TransactionCleanupCronReminder,
83 data: ThinData<DefaultAppState>,
84 ctx: WorkerContext,
85) -> Result<(), HandlerError> {
86 let result = handle_request(job, &data).await;
87
88 handle_result(
89 result,
90 &ctx,
91 "TransactionCleanup",
92 WORKER_TRANSACTION_CLEANUP_RETRIES,
93 )
94}
95
96#[derive(Default, Debug, Clone)]
98pub struct TransactionCleanupCronReminder();
99
100async fn handle_request(
113 _job: TransactionCleanupCronReminder,
114 data: &ThinData<DefaultAppState>,
115) -> Result<()> {
116 let transaction_repo = data.transaction_repository();
117
118 let lock_guard = if ServerConfig::get_distributed_mode() {
121 if let Some((connections, prefix)) = transaction_repo.connection_info() {
122 let conn = connections.primary().clone();
123 let lock_key = format!("{prefix}:lock:{CLEANUP_LOCK_NAME}");
124 let lock = DistributedLock::new(
125 conn,
126 &lock_key,
127 Duration::from_secs(TRANSACTION_CLEANUP_LOCK_TTL_SECS),
128 );
129
130 match lock.try_acquire().await {
131 Ok(Some(guard)) => {
132 debug!(lock_key = %lock_key, "acquired distributed lock for transaction cleanup");
133 Some(guard)
134 }
135 Ok(None) => {
136 info!(lock_key = %lock_key, "transaction cleanup skipped - another instance is processing");
137 return Ok(());
138 }
139 Err(e) => {
140 warn!(
143 error = %e,
144 lock_key = %lock_key,
145 "failed to acquire distributed lock, skipping cleanup"
146 );
147 return Ok(());
148 }
149 }
150 } else {
151 debug!("in-memory repository detected, skipping distributed lock");
152 None
153 }
154 } else {
155 debug!("distributed mode disabled, skipping lock acquisition");
156 None
157 };
158
159 let now = Utc::now();
160 info!(
161 timestamp = %now.to_rfc3339(),
162 "executing transaction cleanup from storage"
163 );
164
165 let relayer_repo = data.relayer_repository();
166
167 let relayers = relayer_repo.list_all().await.map_err(|e| {
169 error!(
170 error = %e,
171 "failed to fetch relayers for cleanup"
172 );
173 eyre::eyre!("Failed to fetch relayers: {}", e)
174 })?;
175
176 info!(
177 relayer_count = relayers.len(),
178 "found relayers to process for cleanup"
179 );
180
181 let cleanup_results = process_relayers_in_batches(relayers, transaction_repo, now).await;
183
184 let result = report_cleanup_results(cleanup_results).await;
186
187 drop(lock_guard);
190
191 result
192}
193
194async fn process_relayers_in_batches(
204 relayers: Vec<RelayerRepoModel>,
205 transaction_repo: Arc<impl TransactionRepository>,
206 now: DateTime<Utc>,
207) -> Vec<RelayerCleanupResult> {
208 use futures::stream::{self, StreamExt};
209
210 let results: Vec<RelayerCleanupResult> = stream::iter(relayers)
212 .map(|relayer| {
213 let repo_clone = Arc::clone(&transaction_repo);
214 async move { process_single_relayer(relayer, repo_clone, now).await }
215 })
216 .buffer_unordered(MAX_CONCURRENT_RELAYERS)
217 .collect()
218 .await;
219
220 results
221}
222
223#[derive(Debug)]
225struct RelayerCleanupResult {
226 relayer_id: String,
227 cleaned_count: usize,
228 error: Option<String>,
229}
230
231async fn process_single_relayer(
244 relayer: RelayerRepoModel,
245 transaction_repo: Arc<impl TransactionRepository>,
246 now: DateTime<Utc>,
247) -> RelayerCleanupResult {
248 debug!(
249 relayer_id = %relayer.id,
250 "processing cleanup for relayer"
251 );
252
253 let mut total_cleaned = 0usize;
254
255 for status in FINAL_TRANSACTION_STATUSES {
256 match process_status_cleanup(&relayer.id, status, &transaction_repo, now).await {
257 Ok(cleaned) => total_cleaned += cleaned,
258 Err(e) => {
259 error!(
260 error = %e,
261 relayer_id = %relayer.id,
262 status = ?status,
263 "failed to cleanup transactions for status"
264 );
265 return RelayerCleanupResult {
266 relayer_id: relayer.id,
267 cleaned_count: total_cleaned,
268 error: Some(e.to_string()),
269 };
270 }
271 }
272 }
273
274 if total_cleaned > 0 {
275 info!(
276 cleaned_count = total_cleaned,
277 relayer_id = %relayer.id,
278 "cleaned up expired transactions"
279 );
280 }
281
282 RelayerCleanupResult {
283 relayer_id: relayer.id,
284 cleaned_count: total_cleaned,
285 error: None,
286 }
287}
288
289async fn process_status_cleanup(
304 relayer_id: &str,
305 status: &TransactionStatus,
306 transaction_repo: &Arc<impl TransactionRepository>,
307 now: DateTime<Utc>,
308) -> Result<usize> {
309 let mut current_page = 1u32;
310 let mut total_cleaned = 0usize;
311 let mut iterations = 0u32;
312
313 loop {
314 if iterations >= MAX_CLEANUP_ITERATIONS_PER_STATUS {
315 warn!(
316 relayer_id = %relayer_id,
317 status = ?status,
318 iterations,
319 total_cleaned,
320 "reached max cleanup iterations, stopping"
321 );
322 break;
323 }
324 iterations += 1;
325
326 let query = PaginationQuery {
327 page: current_page,
328 per_page: CLEANUP_PAGE_SIZE,
329 };
330
331 let page_result = transaction_repo
332 .find_by_status_paginated(relayer_id, &[status.clone()], query, true)
333 .await
334 .map_err(|e| {
335 eyre::eyre!(
336 "Failed to fetch {:?} transactions for relayer {}: {}",
337 status,
338 relayer_id,
339 e
340 )
341 })?;
342
343 if page_result.items.is_empty() {
344 break;
345 }
346
347 debug!(
348 page = current_page,
349 page_count = page_result.items.len(),
350 total = page_result.total,
351 relayer_id = %relayer_id,
352 status = ?status,
353 "processing page of transactions for cleanup"
354 );
355
356 let cleaned_count =
357 process_transactions_for_cleanup(page_result.items, transaction_repo, relayer_id, now)
358 .await;
359
360 total_cleaned += cleaned_count;
361
362 if cleaned_count == 0 {
363 current_page += 1;
366 }
367 }
371
372 if total_cleaned > 0 {
373 debug!(
374 total_cleaned,
375 relayer_id = %relayer_id,
376 status = ?status,
377 "status cleanup completed"
378 );
379 }
380
381 Ok(total_cleaned)
382}
383
384#[cfg(test)]
387async fn fetch_final_transactions_paginated(
388 relayer_id: &str,
389 transaction_repo: &Arc<impl TransactionRepository>,
390 query: PaginationQuery,
391) -> Result<crate::repositories::PaginatedResult<TransactionRepoModel>> {
392 transaction_repo
393 .find_by_status_paginated(relayer_id, FINAL_TRANSACTION_STATUSES, query, true)
394 .await
395 .map_err(|e| {
396 eyre::eyre!(
397 "Failed to fetch final transactions for relayer {}: {}",
398 relayer_id,
399 e
400 )
401 })
402}
403
404async fn process_transactions_for_cleanup(
419 transactions: Vec<TransactionRepoModel>,
420 transaction_repo: &Arc<impl TransactionRepository>,
421 relayer_id: &str,
422 now: DateTime<Utc>,
423) -> usize {
424 if transactions.is_empty() {
425 return 0;
426 }
427
428 debug!(
429 transaction_count = transactions.len(),
430 relayer_id = %relayer_id,
431 "processing transactions for cleanup"
432 );
433
434 let delete_requests: Vec<TransactionDeleteRequest> = transactions
437 .into_iter()
438 .filter(|tx| {
439 if !FINAL_TRANSACTION_STATUSES.contains(&tx.status) {
441 warn!(
442 tx_id = %tx.id,
443 status = ?tx.status,
444 "skipping transaction not in final state"
445 );
446 return false;
447 }
448 should_delete_transaction(tx, now)
450 })
451 .map(|tx| {
452 let nonce = extract_nonce_from_network_data(&tx.network_data);
454 TransactionDeleteRequest::new(tx.id, tx.relayer_id, nonce)
455 })
456 .collect();
457
458 if delete_requests.is_empty() {
459 debug!(
460 relayer_id = %relayer_id,
461 "no expired transactions found"
462 );
463 return 0;
464 }
465
466 let total_expired = delete_requests.len();
467 debug!(
468 expired_count = total_expired,
469 relayer_id = %relayer_id,
470 "found expired transactions to delete"
471 );
472
473 let mut total_deleted = 0;
475 let mut total_failed = 0;
476
477 for (batch_idx, batch) in delete_requests.chunks(DELETE_BATCH_SIZE).enumerate() {
478 let batch_requests: Vec<TransactionDeleteRequest> = batch.to_vec();
479 let batch_size = batch_requests.len();
480
481 debug!(
482 batch = batch_idx + 1,
483 batch_size = batch_size,
484 relayer_id = %relayer_id,
485 "processing delete batch"
486 );
487
488 match transaction_repo.delete_by_requests(batch_requests).await {
489 Ok(result) => {
490 if !result.failed.is_empty() {
491 for (id, error) in &result.failed {
492 error!(
493 tx_id = %id,
494 error = %error,
495 relayer_id = %relayer_id,
496 "failed to delete expired transaction in batch"
497 );
498 }
499 }
500
501 total_deleted += result.deleted_count;
502 total_failed += result.failed.len();
503 }
504 Err(e) => {
505 error!(
506 error = %e,
507 relayer_id = %relayer_id,
508 batch = batch_idx + 1,
509 batch_size = batch_size,
510 "batch delete failed completely"
511 );
512 total_failed += batch_size;
513 }
514 }
515 }
516
517 debug!(
518 total_deleted,
519 total_failed,
520 total_expired,
521 relayer_id = %relayer_id,
522 "batch delete completed"
523 );
524
525 total_deleted
526}
527
528fn extract_nonce_from_network_data(network_data: &NetworkTransactionData) -> Option<u64> {
531 match network_data {
532 NetworkTransactionData::Evm(evm_data) => evm_data.nonce,
533 _ => None,
534 }
535}
536
537fn should_delete_transaction(transaction: &TransactionRepoModel, now: DateTime<Utc>) -> bool {
546 transaction
547 .delete_at
548 .as_ref()
549 .and_then(|delete_at_str| DateTime::parse_from_rfc3339(delete_at_str).ok())
550 .map(|delete_at| {
551 let is_expired = now >= delete_at.with_timezone(&Utc);
552 if is_expired {
553 debug!(
554 tx_id = %transaction.id,
555 expired_at = %delete_at.to_rfc3339(),
556 "transaction is expired"
557 );
558 }
559 is_expired
560 })
561 .unwrap_or_else(|| {
562 if transaction.delete_at.is_some() {
563 warn!(
564 tx_id = %transaction.id,
565 "transaction has invalid delete_at timestamp"
566 );
567 }
568 false
569 })
570}
571
572async fn report_cleanup_results(cleanup_results: Vec<RelayerCleanupResult>) -> Result<()> {
580 let total_cleaned: usize = cleanup_results.iter().map(|r| r.cleaned_count).sum();
581 let total_errors = cleanup_results.iter().filter(|r| r.error.is_some()).count();
582 let total_relayers = cleanup_results.len();
583
584 for result in &cleanup_results {
586 if let Some(error) = &result.error {
587 error!(
588 relayer_id = %result.relayer_id,
589 error = %error,
590 "failed to cleanup transactions for relayer"
591 );
592 }
593 }
594
595 if total_errors > 0 {
596 warn!(
597 total_errors,
598 total_relayers, total_cleaned, "transaction cleanup completed with errors"
599 );
600
601 Err(eyre::eyre!(
604 "Cleanup completed with {} errors out of {} relayers",
605 total_errors,
606 total_relayers
607 ))
608 } else {
609 info!(
610 total_cleaned,
611 total_relayers, "transaction cleanup completed successfully"
612 );
613 Ok(())
614 }
615}
616
617#[cfg(test)]
618mod tests {
619
620 use super::*;
621 use crate::{
622 models::{
623 NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel,
624 TransactionRepoModel, TransactionStatus,
625 },
626 repositories::{InMemoryTransactionRepository, Repository},
627 utils::mocks::mockutils::create_mock_transaction,
628 };
629 use chrono::{Duration, Utc};
630
631 fn create_test_transaction(
632 id: &str,
633 relayer_id: &str,
634 status: TransactionStatus,
635 delete_at: Option<String>,
636 ) -> TransactionRepoModel {
637 let mut tx = create_mock_transaction();
638 tx.id = id.to_string();
639 tx.relayer_id = relayer_id.to_string();
640 tx.status = status;
641 tx.delete_at = delete_at;
642 tx
643 }
644
645 #[tokio::test]
646 async fn test_should_delete_transaction_expired() {
647 let now = Utc::now();
648 let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
649
650 let transaction = create_test_transaction(
651 "test-tx",
652 "test-relayer",
653 TransactionStatus::Confirmed,
654 Some(expired_delete_at),
655 );
656
657 assert!(should_delete_transaction(&transaction, now));
658 }
659
660 #[tokio::test]
661 async fn test_should_delete_transaction_not_expired() {
662 let now = Utc::now();
663 let future_delete_at = (now + Duration::hours(1)).to_rfc3339();
664
665 let transaction = create_test_transaction(
666 "test-tx",
667 "test-relayer",
668 TransactionStatus::Confirmed,
669 Some(future_delete_at),
670 );
671
672 assert!(!should_delete_transaction(&transaction, now));
673 }
674
675 #[tokio::test]
676 async fn test_should_delete_transaction_no_delete_at() {
677 let now = Utc::now();
678
679 let transaction = create_test_transaction(
680 "test-tx",
681 "test-relayer",
682 TransactionStatus::Confirmed,
683 None,
684 );
685
686 assert!(!should_delete_transaction(&transaction, now));
687 }
688
689 #[tokio::test]
690 async fn test_should_delete_transaction_invalid_timestamp() {
691 let now = Utc::now();
692
693 let transaction = create_test_transaction(
694 "test-tx",
695 "test-relayer",
696 TransactionStatus::Confirmed,
697 Some("invalid-timestamp".to_string()),
698 );
699
700 assert!(!should_delete_transaction(&transaction, now));
701 }
702
703 #[tokio::test]
704 async fn test_process_transactions_for_cleanup_parallel() {
705 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
706 let relayer_id = "test-relayer";
707 let now = Utc::now();
708
709 let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
711 let future_delete_at = (now + Duration::hours(1)).to_rfc3339();
712
713 let expired_tx = create_test_transaction(
714 "expired-tx",
715 relayer_id,
716 TransactionStatus::Confirmed,
717 Some(expired_delete_at),
718 );
719 let future_tx = create_test_transaction(
720 "future-tx",
721 relayer_id,
722 TransactionStatus::Failed,
723 Some(future_delete_at),
724 );
725 let no_delete_tx = create_test_transaction(
726 "no-delete-tx",
727 relayer_id,
728 TransactionStatus::Canceled,
729 None,
730 );
731
732 transaction_repo.create(expired_tx.clone()).await.unwrap();
734 transaction_repo.create(future_tx.clone()).await.unwrap();
735 transaction_repo.create(no_delete_tx.clone()).await.unwrap();
736
737 let transactions = vec![expired_tx, future_tx, no_delete_tx];
738
739 let cleaned_count =
741 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
742 .await;
743
744 assert_eq!(cleaned_count, 1);
746
747 assert!(transaction_repo
749 .get_by_id("expired-tx".to_string())
750 .await
751 .is_err());
752
753 assert!(transaction_repo
755 .get_by_id("future-tx".to_string())
756 .await
757 .is_ok());
758 assert!(transaction_repo
759 .get_by_id("no-delete-tx".to_string())
760 .await
761 .is_ok());
762 }
763
764 #[tokio::test]
765 async fn test_batch_delete_expired_transactions() {
766 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
767 let relayer_id = "test-relayer";
768 let now = Utc::now();
769
770 let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
772
773 for i in 0..5 {
774 let tx = create_test_transaction(
775 &format!("expired-tx-{i}"),
776 relayer_id,
777 TransactionStatus::Confirmed,
778 Some(expired_delete_at.clone()),
779 );
780 transaction_repo.create(tx).await.unwrap();
781 }
782
783 assert_eq!(transaction_repo.count().await.unwrap(), 5);
785
786 let ids: Vec<String> = (0..5).map(|i| format!("expired-tx-{i}")).collect();
788 let result = transaction_repo.delete_by_ids(ids).await.unwrap();
789
790 assert_eq!(result.deleted_count, 5);
791 assert!(result.failed.is_empty());
792
793 assert_eq!(transaction_repo.count().await.unwrap(), 0);
795 }
796
797 #[tokio::test]
798 async fn test_batch_delete_with_nonexistent_ids() {
799 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
800 let relayer_id = "test-relayer";
801
802 let tx = create_test_transaction(
804 "existing-tx",
805 relayer_id,
806 TransactionStatus::Confirmed,
807 Some(Utc::now().to_rfc3339()),
808 );
809 transaction_repo.create(tx).await.unwrap();
810
811 let ids = vec![
813 "existing-tx".to_string(),
814 "nonexistent-1".to_string(),
815 "nonexistent-2".to_string(),
816 ];
817 let result = transaction_repo.delete_by_ids(ids).await.unwrap();
818
819 assert_eq!(result.deleted_count, 1);
821 assert_eq!(result.failed.len(), 2);
822
823 assert!(transaction_repo
825 .get_by_id("existing-tx".to_string())
826 .await
827 .is_err());
828 }
829
830 #[tokio::test]
831 async fn test_process_transactions_skips_non_final_status() {
832 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
833 let relayer_id = "test-relayer";
834 let now = Utc::now();
835
836 let expired_delete_at = (now - Duration::hours(1)).to_rfc3339();
838 let pending_tx = create_test_transaction(
839 "pending-tx",
840 relayer_id,
841 TransactionStatus::Pending, Some(expired_delete_at),
843 );
844 transaction_repo.create(pending_tx.clone()).await.unwrap();
845
846 let transactions = vec![pending_tx];
847
848 let cleaned_count =
850 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
851 .await;
852
853 assert_eq!(cleaned_count, 0);
855
856 assert!(transaction_repo
858 .get_by_id("pending-tx".to_string())
859 .await
860 .is_ok());
861 }
862
863 #[tokio::test]
864 async fn test_fetch_final_transactions_paginated() {
865 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
866 let relayer_id = "test-relayer";
867
868 let confirmed_tx = create_test_transaction(
870 "confirmed-tx",
871 relayer_id,
872 TransactionStatus::Confirmed,
873 None,
874 );
875 let pending_tx =
876 create_test_transaction("pending-tx", relayer_id, TransactionStatus::Pending, None);
877 let failed_tx =
878 create_test_transaction("failed-tx", relayer_id, TransactionStatus::Failed, None);
879
880 transaction_repo.create(confirmed_tx).await.unwrap();
882 transaction_repo.create(pending_tx).await.unwrap();
883 transaction_repo.create(failed_tx).await.unwrap();
884
885 let query = PaginationQuery {
887 page: 1,
888 per_page: 10,
889 };
890 let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
891 .await
892 .unwrap();
893
894 assert_eq!(result.total, 2);
896 assert_eq!(result.items.len(), 2);
897 let final_ids: Vec<&String> = result.items.iter().map(|tx| &tx.id).collect();
898 assert!(final_ids.contains(&&"confirmed-tx".to_string()));
899 assert!(final_ids.contains(&&"failed-tx".to_string()));
900 assert!(!final_ids.contains(&&"pending-tx".to_string()));
901 }
902
903 #[tokio::test]
904 async fn test_report_cleanup_results_success() {
905 let results = vec![
906 RelayerCleanupResult {
907 relayer_id: "relayer-1".to_string(),
908 cleaned_count: 2,
909 error: None,
910 },
911 RelayerCleanupResult {
912 relayer_id: "relayer-2".to_string(),
913 cleaned_count: 1,
914 error: None,
915 },
916 ];
917
918 let result = report_cleanup_results(results).await;
919 assert!(result.is_ok());
920 }
921
922 #[tokio::test]
923 async fn test_report_cleanup_results_with_errors() {
924 let results = vec![
925 RelayerCleanupResult {
926 relayer_id: "relayer-1".to_string(),
927 cleaned_count: 2,
928 error: None,
929 },
930 RelayerCleanupResult {
931 relayer_id: "relayer-2".to_string(),
932 cleaned_count: 0,
933 error: Some("Database error".to_string()),
934 },
935 ];
936
937 let result = report_cleanup_results(results).await;
938 assert!(result.is_err());
939 }
940
941 #[tokio::test]
942 async fn test_process_single_relayer_success() {
943 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
944 let relayer = RelayerRepoModel {
945 id: "test-relayer".to_string(),
946 name: "Test Relayer".to_string(),
947 network: "ethereum".to_string(),
948 paused: false,
949 network_type: NetworkType::Evm,
950 signer_id: "test-signer".to_string(),
951 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
952 address: "0x1234567890123456789012345678901234567890".to_string(),
953 notification_id: None,
954 system_disabled: false,
955 custom_rpc_urls: None,
956 ..Default::default()
957 };
958 let now = Utc::now();
959
960 let expired_tx = create_test_transaction(
962 "expired-tx",
963 &relayer.id,
964 TransactionStatus::Confirmed,
965 Some((now - Duration::hours(1)).to_rfc3339()),
966 );
967 let future_tx = create_test_transaction(
968 "future-tx",
969 &relayer.id,
970 TransactionStatus::Failed,
971 Some((now + Duration::hours(1)).to_rfc3339()),
972 );
973
974 transaction_repo.create(expired_tx).await.unwrap();
975 transaction_repo.create(future_tx).await.unwrap();
976
977 let result = process_single_relayer(relayer.clone(), transaction_repo.clone(), now).await;
978
979 assert_eq!(result.relayer_id, relayer.id);
980 assert_eq!(result.cleaned_count, 1);
981 assert!(result.error.is_none());
982 }
983
984 #[tokio::test]
985 async fn test_process_single_relayer_no_transactions() {
986 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
988 let relayer = RelayerRepoModel {
989 id: "empty-relayer".to_string(),
990 name: "Empty Relayer".to_string(),
991 network: "ethereum".to_string(),
992 paused: false,
993 network_type: NetworkType::Evm,
994 signer_id: "test-signer".to_string(),
995 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
996 address: "0x1234567890123456789012345678901234567890".to_string(),
997 notification_id: None,
998 system_disabled: false,
999 custom_rpc_urls: None,
1000 ..Default::default()
1001 };
1002 let now = Utc::now();
1003
1004 let result = process_single_relayer(relayer.clone(), transaction_repo, now).await;
1006
1007 assert_eq!(result.relayer_id, relayer.id);
1008 assert_eq!(result.cleaned_count, 0);
1009 assert!(result.error.is_none()); }
1011
1012 #[tokio::test]
1013 async fn test_process_transactions_with_empty_list() {
1014 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1015 let relayer_id = "test-relayer";
1016 let now = Utc::now();
1017 let transactions = vec![];
1018
1019 let cleaned_count =
1020 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
1021 .await;
1022
1023 assert_eq!(cleaned_count, 0);
1024 }
1025
1026 #[tokio::test]
1027 async fn test_process_transactions_with_no_expired() {
1028 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1029 let relayer_id = "test-relayer";
1030 let now = Utc::now();
1031
1032 let future_tx1 = create_test_transaction(
1034 "future-tx-1",
1035 relayer_id,
1036 TransactionStatus::Confirmed,
1037 Some((now + Duration::hours(1)).to_rfc3339()),
1038 );
1039 let future_tx2 = create_test_transaction(
1040 "future-tx-2",
1041 relayer_id,
1042 TransactionStatus::Failed,
1043 Some((now + Duration::hours(2)).to_rfc3339()),
1044 );
1045 let no_delete_tx = create_test_transaction(
1046 "no-delete-tx",
1047 relayer_id,
1048 TransactionStatus::Canceled,
1049 None,
1050 );
1051
1052 let transactions = vec![future_tx1, future_tx2, no_delete_tx];
1053
1054 let cleaned_count =
1055 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
1056 .await;
1057
1058 assert_eq!(cleaned_count, 0);
1059 }
1060
1061 #[tokio::test]
1062 async fn test_should_delete_transaction_exactly_at_expiry_time() {
1063 let now = Utc::now();
1064 let exact_expiry_time = now.to_rfc3339();
1065
1066 let transaction = create_test_transaction(
1067 "test-tx",
1068 "test-relayer",
1069 TransactionStatus::Confirmed,
1070 Some(exact_expiry_time),
1071 );
1072
1073 assert!(should_delete_transaction(&transaction, now));
1075 }
1076
1077 #[tokio::test]
1078 async fn test_parallel_processing_with_mixed_results() {
1079 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1080 let relayer_id = "test-relayer";
1081 let now = Utc::now();
1082
1083 let expired_tx1 = create_test_transaction(
1085 "expired-tx-1",
1086 relayer_id,
1087 TransactionStatus::Confirmed,
1088 Some((now - Duration::hours(1)).to_rfc3339()),
1089 );
1090 let expired_tx2 = create_test_transaction(
1091 "expired-tx-2",
1092 relayer_id,
1093 TransactionStatus::Failed,
1094 Some((now - Duration::hours(2)).to_rfc3339()),
1095 );
1096 let expired_tx3 = create_test_transaction(
1097 "expired-tx-3",
1098 relayer_id,
1099 TransactionStatus::Canceled,
1100 Some((now - Duration::hours(3)).to_rfc3339()),
1101 );
1102
1103 transaction_repo.create(expired_tx1.clone()).await.unwrap();
1105 transaction_repo.create(expired_tx2.clone()).await.unwrap();
1106 let transactions = vec![expired_tx1, expired_tx2, expired_tx3];
1109
1110 let cleaned_count =
1111 process_transactions_for_cleanup(transactions, &transaction_repo, relayer_id, now)
1112 .await;
1113
1114 assert_eq!(cleaned_count, 2);
1116 }
1117
1118 #[tokio::test]
1119 async fn test_report_cleanup_results_empty() {
1120 let results = vec![];
1121 let result = report_cleanup_results(results).await;
1122 assert!(result.is_ok());
1123 }
1124
1125 #[tokio::test]
1126 async fn test_fetch_final_transactions_paginated_with_mixed_statuses() {
1127 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1128 let relayer_id = "test-relayer";
1129
1130 let confirmed_tx = create_test_transaction(
1132 "confirmed-tx",
1133 relayer_id,
1134 TransactionStatus::Confirmed,
1135 None,
1136 );
1137 let failed_tx =
1138 create_test_transaction("failed-tx", relayer_id, TransactionStatus::Failed, None);
1139 let canceled_tx =
1140 create_test_transaction("canceled-tx", relayer_id, TransactionStatus::Canceled, None);
1141 let expired_tx =
1142 create_test_transaction("expired-tx", relayer_id, TransactionStatus::Expired, None);
1143 let pending_tx =
1144 create_test_transaction("pending-tx", relayer_id, TransactionStatus::Pending, None);
1145 let sent_tx = create_test_transaction("sent-tx", relayer_id, TransactionStatus::Sent, None);
1146
1147 transaction_repo.create(confirmed_tx).await.unwrap();
1149 transaction_repo.create(failed_tx).await.unwrap();
1150 transaction_repo.create(canceled_tx).await.unwrap();
1151 transaction_repo.create(expired_tx).await.unwrap();
1152 transaction_repo.create(pending_tx).await.unwrap();
1153 transaction_repo.create(sent_tx).await.unwrap();
1154
1155 let query = PaginationQuery {
1157 page: 1,
1158 per_page: 10,
1159 };
1160 let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
1161 .await
1162 .unwrap();
1163
1164 assert_eq!(result.total, 4);
1166 assert_eq!(result.items.len(), 4);
1167 let final_ids: Vec<&String> = result.items.iter().map(|tx| &tx.id).collect();
1168 assert!(final_ids.contains(&&"confirmed-tx".to_string()));
1169 assert!(final_ids.contains(&&"failed-tx".to_string()));
1170 assert!(final_ids.contains(&&"canceled-tx".to_string()));
1171 assert!(final_ids.contains(&&"expired-tx".to_string()));
1172 assert!(!final_ids.contains(&&"pending-tx".to_string()));
1173 assert!(!final_ids.contains(&&"sent-tx".to_string()));
1174 }
1175
1176 #[tokio::test]
1177 async fn test_fetch_final_transactions_paginated_pagination() {
1178 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1179 let relayer_id = "test-relayer";
1180
1181 for i in 1..=5 {
1183 let mut tx = create_test_transaction(
1184 &format!("tx-{i}"),
1185 relayer_id,
1186 TransactionStatus::Confirmed,
1187 None,
1188 );
1189 tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
1190 transaction_repo.create(tx).await.unwrap();
1191 }
1192
1193 let query = PaginationQuery {
1195 page: 1,
1196 per_page: 2,
1197 };
1198 let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
1199 .await
1200 .unwrap();
1201
1202 assert_eq!(result.total, 5);
1203 assert_eq!(result.items.len(), 2);
1204 assert_eq!(result.page, 1);
1205
1206 let query = PaginationQuery {
1208 page: 2,
1209 per_page: 2,
1210 };
1211 let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
1212 .await
1213 .unwrap();
1214
1215 assert_eq!(result.total, 5);
1216 assert_eq!(result.items.len(), 2);
1217 assert_eq!(result.page, 2);
1218
1219 let query = PaginationQuery {
1221 page: 3,
1222 per_page: 2,
1223 };
1224 let result = fetch_final_transactions_paginated(relayer_id, &transaction_repo, query)
1225 .await
1226 .unwrap();
1227
1228 assert_eq!(result.total, 5);
1229 assert_eq!(result.items.len(), 1);
1230 assert_eq!(result.page, 3);
1231 }
1232
1233 #[tokio::test]
1234 async fn test_process_status_cleanup_deletes_expired() {
1235 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1236 let relayer_id = "test-relayer";
1237 let now = Utc::now();
1238
1239 let expired_tx = create_test_transaction(
1241 "expired-tx",
1242 relayer_id,
1243 TransactionStatus::Confirmed,
1244 Some((now - Duration::hours(1)).to_rfc3339()),
1245 );
1246 let future_tx = create_test_transaction(
1247 "future-tx",
1248 relayer_id,
1249 TransactionStatus::Confirmed,
1250 Some((now + Duration::hours(1)).to_rfc3339()),
1251 );
1252
1253 transaction_repo.create(expired_tx).await.unwrap();
1254 transaction_repo.create(future_tx).await.unwrap();
1255
1256 let cleaned = process_status_cleanup(
1257 relayer_id,
1258 &TransactionStatus::Confirmed,
1259 &transaction_repo,
1260 now,
1261 )
1262 .await
1263 .unwrap();
1264
1265 assert_eq!(cleaned, 1);
1266
1267 assert!(transaction_repo
1269 .get_by_id("expired-tx".to_string())
1270 .await
1271 .is_err());
1272 assert!(transaction_repo
1273 .get_by_id("future-tx".to_string())
1274 .await
1275 .is_ok());
1276 }
1277
1278 #[tokio::test]
1279 async fn test_process_status_cleanup_no_transactions() {
1280 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1281 let relayer_id = "test-relayer";
1282 let now = Utc::now();
1283
1284 let cleaned = process_status_cleanup(
1285 relayer_id,
1286 &TransactionStatus::Confirmed,
1287 &transaction_repo,
1288 now,
1289 )
1290 .await
1291 .unwrap();
1292
1293 assert_eq!(cleaned, 0);
1294 }
1295
1296 #[tokio::test]
1297 async fn test_process_status_cleanup_skips_other_statuses() {
1298 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1299 let relayer_id = "test-relayer";
1300 let now = Utc::now();
1301
1302 let tx = create_test_transaction(
1304 "failed-tx",
1305 relayer_id,
1306 TransactionStatus::Failed,
1307 Some((now - Duration::hours(1)).to_rfc3339()),
1308 );
1309 transaction_repo.create(tx).await.unwrap();
1310
1311 let cleaned = process_status_cleanup(
1313 relayer_id,
1314 &TransactionStatus::Confirmed,
1315 &transaction_repo,
1316 now,
1317 )
1318 .await
1319 .unwrap();
1320
1321 assert_eq!(cleaned, 0);
1322 assert!(transaction_repo
1323 .get_by_id("failed-tx".to_string())
1324 .await
1325 .is_ok());
1326 }
1327
1328 #[tokio::test]
1329 async fn test_process_single_relayer_processes_all_final_statuses() {
1330 let transaction_repo = Arc::new(InMemoryTransactionRepository::new());
1331 let relayer = RelayerRepoModel {
1332 id: "test-relayer".to_string(),
1333 name: "Test Relayer".to_string(),
1334 network: "ethereum".to_string(),
1335 paused: false,
1336 network_type: NetworkType::Evm,
1337 signer_id: "test-signer".to_string(),
1338 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
1339 address: "0x1234567890123456789012345678901234567890".to_string(),
1340 notification_id: None,
1341 system_disabled: false,
1342 custom_rpc_urls: None,
1343 ..Default::default()
1344 };
1345 let now = Utc::now();
1346 let expired_at = (now - Duration::hours(1)).to_rfc3339();
1347
1348 for (i, status) in [
1350 TransactionStatus::Confirmed,
1351 TransactionStatus::Failed,
1352 TransactionStatus::Canceled,
1353 TransactionStatus::Expired,
1354 ]
1355 .iter()
1356 .enumerate()
1357 {
1358 let tx = create_test_transaction(
1359 &format!("tx-{}", i),
1360 &relayer.id,
1361 status.clone(),
1362 Some(expired_at.clone()),
1363 );
1364 transaction_repo.create(tx).await.unwrap();
1365 }
1366
1367 let result = process_single_relayer(relayer.clone(), transaction_repo.clone(), now).await;
1368
1369 assert_eq!(result.relayer_id, relayer.id);
1370 assert_eq!(result.cleaned_count, 4);
1371 assert!(result.error.is_none());
1372
1373 assert_eq!(transaction_repo.count().await.unwrap(), 0);
1375 }
1376}