1use crate::domain::transaction::common::is_final_state;
4use crate::metrics::{
5 TRANSACTIONS_BY_STATUS, TRANSACTIONS_CREATED, TRANSACTIONS_FAILED, TRANSACTIONS_SUBMITTED,
6 TRANSACTIONS_SUCCESS, TRANSACTION_PROCESSING_TIME,
7};
8use crate::models::{
9 NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
10 TransactionStatus, TransactionUpdateRequest,
11};
12use crate::repositories::redis_base::RedisRepository;
13use crate::repositories::{
14 BatchDeleteResult, BatchRetrievalResult, PaginatedResult, Repository, TransactionDeleteRequest,
15 TransactionRepository,
16};
17use crate::utils::RedisConnections;
18use async_trait::async_trait;
19use chrono::Utc;
20use redis::{AsyncCommands, Script};
21use std::fmt;
22use std::sync::Arc;
23use tracing::{debug, error, warn};
24
25const RELAYER_PREFIX: &str = "relayer";
26const TX_PREFIX: &str = "tx";
27const STATUS_PREFIX: &str = "status";
28const STATUS_SORTED_PREFIX: &str = "status_sorted";
29const NONCE_PREFIX: &str = "nonce";
30const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
31const RELAYER_LIST_KEY: &str = "relayer_list";
32const TX_BY_CREATED_AT_PREFIX: &str = "tx_by_created_at";
33
34#[derive(Clone)]
35pub struct RedisTransactionRepository {
36 pub connections: Arc<RedisConnections>,
37 pub key_prefix: String,
38}
39
40impl RedisRepository for RedisTransactionRepository {}
41
42impl RedisTransactionRepository {
43 pub fn new(
44 connections: Arc<RedisConnections>,
45 key_prefix: String,
46 ) -> Result<Self, RepositoryError> {
47 if key_prefix.is_empty() {
48 return Err(RepositoryError::InvalidData(
49 "Redis key prefix cannot be empty".to_string(),
50 ));
51 }
52
53 Ok(Self {
54 connections,
55 key_prefix,
56 })
57 }
58
59 fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
61 format!(
62 "{}:{}:{}:{}:{}",
63 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
64 )
65 }
66
67 fn tx_to_relayer_key(&self, tx_id: &str) -> String {
69 format!(
70 "{}:{}:{}:{}",
71 self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
72 )
73 }
74
75 fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
77 format!(
78 "{}:{}:{}:{}:{}",
79 self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
80 )
81 }
82
83 fn relayer_status_sorted_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
86 format!(
87 "{}:{}:{}:{}:{}",
88 self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_SORTED_PREFIX, status
89 )
90 }
91
92 fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
94 format!(
95 "{}:{}:{}:{}:{}",
96 self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
97 )
98 }
99
100 fn relayer_list_key(&self) -> String {
102 format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
103 }
104
105 fn relayer_tx_by_created_at_key(&self, relayer_id: &str) -> String {
107 format!(
108 "{}:{}:{}:{}",
109 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_BY_CREATED_AT_PREFIX
110 )
111 }
112
113 fn timestamp_to_score(&self, timestamp: &str) -> f64 {
115 chrono::DateTime::parse_from_rfc3339(timestamp)
116 .map(|dt| dt.timestamp_millis() as f64)
117 .unwrap_or_else(|_| {
118 warn!(timestamp = %timestamp, "failed to parse timestamp, using 0");
119 0.0
120 })
121 }
122
123 fn status_sorted_score(&self, tx: &TransactionRepoModel) -> f64 {
127 if tx.status == TransactionStatus::Confirmed {
128 if let Some(ref confirmed_at) = tx.confirmed_at {
130 return self.timestamp_to_score(confirmed_at);
131 }
132 warn!(tx_id = %tx.id, "Confirmed transaction missing confirmed_at, using created_at");
134 }
135 self.timestamp_to_score(&tx.created_at)
136 }
137
138 async fn get_transactions_by_ids(
140 &self,
141 ids: &[String],
142 ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
143 if ids.is_empty() {
144 debug!("no transaction IDs provided for batch fetch");
145 return Ok(BatchRetrievalResult {
146 results: vec![],
147 failed_ids: vec![],
148 });
149 }
150
151 let mut conn = self
152 .get_connection(self.connections.reader(), "batch_fetch_transactions")
153 .await?;
154
155 let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
156
157 debug!(count = %ids.len(), "fetching relayer IDs for transactions");
158
159 let relayer_ids: Vec<Option<String>> = conn
160 .mget(&reverse_keys)
161 .await
162 .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
163
164 let mut tx_keys = Vec::new();
165 let mut valid_ids = Vec::new();
166 let mut failed_ids = Vec::new();
167 for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
168 match relayer_id {
169 Some(relayer_id) => {
170 tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
171 valid_ids.push(ids[i].clone());
172 }
173 None => {
174 warn!(tx_id = %ids[i], "no relayer found for transaction");
175 failed_ids.push(ids[i].clone());
176 }
177 }
178 }
179
180 if tx_keys.is_empty() {
181 debug!("no valid transactions found for batch fetch");
182 return Ok(BatchRetrievalResult {
183 results: vec![],
184 failed_ids,
185 });
186 }
187
188 debug!(count = %tx_keys.len(), "batch fetching transaction data");
189
190 let values: Vec<Option<String>> = conn
191 .mget(&tx_keys)
192 .await
193 .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
194
195 let mut transactions = Vec::new();
196 let mut failed_count = 0;
197 let mut failed_ids = Vec::new();
198 for (i, value) in values.into_iter().enumerate() {
199 match value {
200 Some(json) => {
201 match self.deserialize_entity::<TransactionRepoModel>(
202 &json,
203 &valid_ids[i],
204 "transaction",
205 ) {
206 Ok(tx) => transactions.push(tx),
207 Err(e) => {
208 failed_count += 1;
209 error!(tx_id = %valid_ids[i], error = %e, "failed to deserialize transaction");
210 }
212 }
213 }
214 None => {
215 warn!(tx_id = %valid_ids[i], "transaction not found in batch fetch");
216 failed_ids.push(valid_ids[i].clone());
217 }
218 }
219 }
220
221 if failed_count > 0 {
222 warn!(failed_count = %failed_count, total_count = %valid_ids.len(), "failed to deserialize transactions in batch");
223 }
224
225 debug!(count = %transactions.len(), "successfully fetched transactions");
226 Ok(BatchRetrievalResult {
227 results: transactions,
228 failed_ids,
229 })
230 }
231
232 fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
234 match network_data.get_evm_transaction_data() {
235 Ok(tx_data) => tx_data.nonce,
236 Err(_) => {
237 debug!("no EVM transaction data available for nonce extraction");
238 None
239 }
240 }
241 }
242
243 async fn ensure_status_sorted_set(
260 &self,
261 relayer_id: &str,
262 status: &TransactionStatus,
263 ) -> Result<u64, RepositoryError> {
264 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
265 let legacy_key = self.relayer_status_key(relayer_id, status);
266
267 let legacy_ids = {
269 let mut conn = self
270 .get_connection(self.connections.primary(), "ensure_status_sorted_set_check")
271 .await?;
272
273 let legacy_count: u64 = conn
275 .scard(&legacy_key)
276 .await
277 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_scard"))?;
278
279 if legacy_count == 0 {
280 let sorted_count: u64 = conn
282 .zcard(&sorted_key)
283 .await
284 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_zcard"))?;
285 return Ok(sorted_count);
286 }
287
288 debug!(
290 relayer_id = %relayer_id,
291 status = %status,
292 legacy_count = %legacy_count,
293 "migrating status set to sorted set"
294 );
295
296 let ids: Vec<String> = conn
297 .smembers(&legacy_key)
298 .await
299 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_smembers"))?;
300
301 ids
302 };
304
305 if legacy_ids.is_empty() {
306 return Ok(0);
307 }
308
309 let transactions = self.get_transactions_by_ids(&legacy_ids).await?;
311
312 let mut conn = self
314 .get_connection(
315 self.connections.primary(),
316 "ensure_status_sorted_set_migrate",
317 )
318 .await?;
319
320 if transactions.results.is_empty() {
321 let _: () = conn
323 .del(&legacy_key)
324 .await
325 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_del_stale"))?;
326 return Ok(0);
327 }
328
329 let mut pipe = redis::pipe();
332 pipe.atomic();
333
334 for tx in &transactions.results {
335 let score = self.status_sorted_score(tx);
336 pipe.zadd(&sorted_key, &tx.id, score);
337 }
338
339 pipe.del(&legacy_key);
341
342 pipe.query_async::<()>(&mut conn)
343 .await
344 .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_migrate"))?;
345
346 let migrated_count = transactions.results.len() as u64;
347 debug!(
348 relayer_id = %relayer_id,
349 status = %status,
350 migrated_count = %migrated_count,
351 "completed migration of status set to sorted set"
352 );
353
354 Ok(migrated_count)
355 }
356
357 async fn update_indexes(
359 &self,
360 tx: &TransactionRepoModel,
361 old_tx: Option<&TransactionRepoModel>,
362 ) -> Result<(), RepositoryError> {
363 let mut conn = self
364 .get_connection(self.connections.primary(), "update_indexes")
365 .await?;
366 let mut pipe = redis::pipe();
367 pipe.atomic();
368
369 debug!(tx_id = %tx.id, "updating indexes for transaction");
370
371 let relayer_list_key = self.relayer_list_key();
373 pipe.sadd(&relayer_list_key, &tx.relayer_id);
374
375 let status_score = self.status_sorted_score(tx);
378 let created_at_score = self.timestamp_to_score(&tx.created_at);
380
381 let new_status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, &tx.status);
383 pipe.zadd(&new_status_sorted_key, &tx.id, status_score);
384 debug!(tx_id = %tx.id, status = %tx.status, score = %status_score, "added transaction to status sorted set");
385
386 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
387 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
388 pipe.set(&nonce_key, &tx.id);
389 debug!(tx_id = %tx.id, nonce = %nonce, "added nonce index for transaction");
390 }
391
392 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
394 pipe.zadd(&relayer_sorted_key, &tx.id, created_at_score);
395 debug!(tx_id = %tx.id, score = %created_at_score, "added transaction to sorted set by created_at");
396
397 if let Some(old) = old_tx {
399 if old.status != tx.status {
400 let old_status_sorted_key =
402 self.relayer_status_sorted_key(&old.relayer_id, &old.status);
403 pipe.zrem(&old_status_sorted_key, &tx.id);
404
405 let old_status_legacy_key = self.relayer_status_key(&old.relayer_id, &old.status);
407 pipe.srem(&old_status_legacy_key, &tx.id);
408
409 debug!(tx_id = %tx.id, old_status = %old.status, new_status = %tx.status, "removing old status indexes for transaction");
410 }
411
412 if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
414 let new_nonce = self.extract_nonce(&tx.network_data);
415 if Some(old_nonce) != new_nonce {
416 let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
417 pipe.del(&old_nonce_key);
418 debug!(tx_id = %tx.id, old_nonce = %old_nonce, new_nonce = ?new_nonce, "removing old nonce index for transaction");
419 }
420 }
421 }
422
423 pipe.exec_async(&mut conn).await.map_err(|e| {
425 error!(tx_id = %tx.id, error = %e, "index update pipeline failed for transaction");
426 self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
427 })?;
428
429 debug!(tx_id = %tx.id, "successfully updated indexes for transaction");
430 Ok(())
431 }
432
433 async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
435 let mut conn = self
436 .get_connection(self.connections.primary(), "remove_all_indexes")
437 .await?;
438 let mut pipe = redis::pipe();
439 pipe.atomic();
440
441 debug!(tx_id = %tx.id, "removing all indexes for transaction");
442
443 for status in &[
447 TransactionStatus::Canceled,
448 TransactionStatus::Pending,
449 TransactionStatus::Sent,
450 TransactionStatus::Submitted,
451 TransactionStatus::Mined,
452 TransactionStatus::Confirmed,
453 TransactionStatus::Failed,
454 TransactionStatus::Expired,
455 ] {
456 let status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, status);
458 pipe.zrem(&status_sorted_key, &tx.id);
459
460 let status_legacy_key = self.relayer_status_key(&tx.relayer_id, status);
462 pipe.srem(&status_legacy_key, &tx.id);
463 }
464
465 if let Some(nonce) = self.extract_nonce(&tx.network_data) {
467 let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
468 pipe.del(&nonce_key);
469 debug!(tx_id = %tx.id, nonce = %nonce, "removing nonce index for transaction");
470 }
471
472 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
474 pipe.zrem(&relayer_sorted_key, &tx.id);
475 debug!(tx_id = %tx.id, "removing transaction from sorted set by created_at");
476
477 let reverse_key = self.tx_to_relayer_key(&tx.id);
479 pipe.del(&reverse_key);
480
481 pipe.exec_async(&mut conn).await.map_err(|e| {
482 error!(tx_id = %tx.id, error = %e, "index removal failed for transaction");
483 self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
484 })?;
485
486 debug!(tx_id = %tx.id, "successfully removed all indexes for transaction");
487 Ok(())
488 }
489}
490
491impl fmt::Debug for RedisTransactionRepository {
492 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493 f.debug_struct("RedisTransactionRepository")
494 .field("connections", &"<RedisConnections>")
495 .field("key_prefix", &self.key_prefix)
496 .finish()
497 }
498}
499
500#[async_trait]
501impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
502 async fn create(
503 &self,
504 entity: TransactionRepoModel,
505 ) -> Result<TransactionRepoModel, RepositoryError> {
506 if entity.id.is_empty() {
507 return Err(RepositoryError::InvalidData(
508 "Transaction ID cannot be empty".to_string(),
509 ));
510 }
511
512 let key = self.tx_key(&entity.relayer_id, &entity.id);
513 let reverse_key = self.tx_to_relayer_key(&entity.id);
514 let mut conn = self
515 .get_connection(self.connections.primary(), "create")
516 .await?;
517
518 debug!(tx_id = %entity.id, "creating transaction");
519
520 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
521
522 let existing: Option<String> = conn
524 .get(&reverse_key)
525 .await
526 .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
527
528 if existing.is_some() {
529 return Err(RepositoryError::ConstraintViolation(format!(
530 "Transaction with ID {} already exists",
531 entity.id
532 )));
533 }
534
535 let mut pipe = redis::pipe();
537 pipe.atomic();
538 pipe.set(&key, &value);
539 pipe.set(&reverse_key, &entity.relayer_id);
540
541 pipe.exec_async(&mut conn)
542 .await
543 .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
544
545 if let Err(e) = self.update_indexes(&entity, None).await {
547 error!(tx_id = %entity.id, error = %e, "failed to update indexes for new transaction");
548 return Err(e);
549 }
550
551 let network_type = format!("{:?}", entity.network_type).to_lowercase();
553 let relayer_id = entity.relayer_id.as_str();
554 TRANSACTIONS_CREATED
555 .with_label_values(&[relayer_id, &network_type])
556 .inc();
557
558 let status = &entity.status;
560 let status_str = format!("{status:?}").to_lowercase();
561 TRANSACTIONS_BY_STATUS
562 .with_label_values(&[relayer_id, &network_type, &status_str])
563 .inc();
564
565 debug!(tx_id = %entity.id, "successfully created transaction");
566 Ok(entity)
567 }
568
569 async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
570 if id.is_empty() {
571 return Err(RepositoryError::InvalidData(
572 "Transaction ID cannot be empty".to_string(),
573 ));
574 }
575
576 let mut conn = self
577 .get_connection(self.connections.reader(), "get_by_id")
578 .await?;
579
580 debug!(tx_id = %id, "fetching transaction");
581
582 let reverse_key = self.tx_to_relayer_key(&id);
583 let relayer_id: Option<String> = conn
584 .get(&reverse_key)
585 .await
586 .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
587
588 let relayer_id = match relayer_id {
589 Some(relayer_id) => relayer_id,
590 None => {
591 debug!(tx_id = %id, "transaction not found (no reverse lookup)");
592 return Err(RepositoryError::NotFound(format!(
593 "Transaction with ID {id} not found"
594 )));
595 }
596 };
597
598 let key = self.tx_key(&relayer_id, &id);
599 let value: Option<String> = conn
600 .get(&key)
601 .await
602 .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
603
604 match value {
605 Some(json) => {
606 let tx =
607 self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
608 debug!(tx_id = %id, "successfully fetched transaction");
609 Ok(tx)
610 }
611 None => {
612 debug!(tx_id = %id, "transaction not found");
613 Err(RepositoryError::NotFound(format!(
614 "Transaction with ID {id} not found"
615 )))
616 }
617 }
618 }
619
620 async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
622 let mut conn = self
623 .get_connection(self.connections.reader(), "list_all")
624 .await?;
625
626 debug!("fetching all transactions sorted by created_at (newest first)");
627
628 let relayer_list_key = self.relayer_list_key();
630 let relayer_ids: Vec<String> = conn
631 .smembers(&relayer_list_key)
632 .await
633 .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
634
635 debug!(count = %relayer_ids.len(), "found relayers");
636
637 let mut all_tx_ids = Vec::new();
639 for relayer_id in relayer_ids {
640 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
641 let tx_ids: Vec<String> = redis::cmd("ZRANGE")
642 .arg(&relayer_sorted_key)
643 .arg(0)
644 .arg(-1)
645 .arg("REV")
646 .query_async(&mut conn)
647 .await
648 .map_err(|e| self.map_redis_error(e, "list_all_relayer_sorted"))?;
649
650 all_tx_ids.extend(tx_ids);
651 }
652
653 drop(conn);
655
656 let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
658 let mut all_transactions = batch_result.results;
659
660 all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
662
663 debug!(count = %all_transactions.len(), "found transactions");
664 Ok(all_transactions)
665 }
666
667 async fn list_paginated(
669 &self,
670 query: PaginationQuery,
671 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
672 if query.per_page == 0 {
673 return Err(RepositoryError::InvalidData(
674 "per_page must be greater than 0".to_string(),
675 ));
676 }
677
678 let mut conn = self
679 .get_connection(self.connections.reader(), "list_paginated")
680 .await?;
681
682 debug!(page = %query.page, per_page = %query.per_page, "fetching paginated transactions sorted by created_at (newest first)");
683
684 let relayer_list_key = self.relayer_list_key();
686 let relayer_ids: Vec<String> = conn
687 .smembers(&relayer_list_key)
688 .await
689 .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
690
691 let mut all_tx_ids = Vec::new();
693 for relayer_id in relayer_ids {
694 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
695 let tx_ids: Vec<String> = redis::cmd("ZRANGE")
696 .arg(&relayer_sorted_key)
697 .arg(0)
698 .arg(-1)
699 .arg("REV")
700 .query_async(&mut conn)
701 .await
702 .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_sorted"))?;
703
704 all_tx_ids.extend(tx_ids);
705 }
706
707 drop(conn);
709
710 let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
712 let mut all_transactions = batch_result.results;
713
714 all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
716
717 let total = all_transactions.len() as u64;
718 let start = ((query.page - 1) * query.per_page) as usize;
719 let end = (start + query.per_page as usize).min(all_transactions.len());
720
721 if start >= all_transactions.len() {
722 debug!(page = %query.page, total = %total, "page is beyond available data");
723 return Ok(PaginatedResult {
724 items: vec![],
725 total,
726 page: query.page,
727 per_page: query.per_page,
728 });
729 }
730
731 let items = all_transactions[start..end].to_vec();
732
733 debug!(count = %items.len(), page = %query.page, "successfully fetched transactions for page");
734
735 Ok(PaginatedResult {
736 items,
737 total,
738 page: query.page,
739 per_page: query.per_page,
740 })
741 }
742
743 async fn update(
744 &self,
745 id: String,
746 entity: TransactionRepoModel,
747 ) -> Result<TransactionRepoModel, RepositoryError> {
748 if id.is_empty() {
749 return Err(RepositoryError::InvalidData(
750 "Transaction ID cannot be empty".to_string(),
751 ));
752 }
753
754 debug!(tx_id = %id, "updating transaction");
755
756 let old_tx = self.get_by_id(id.clone()).await?;
758
759 let key = self.tx_key(&entity.relayer_id, &id);
760 let mut conn = self
761 .get_connection(self.connections.primary(), "update")
762 .await?;
763
764 let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
765
766 let _: () = conn
768 .set(&key, value)
769 .await
770 .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
771
772 self.update_indexes(&entity, Some(&old_tx)).await?;
774
775 debug!(tx_id = %id, "successfully updated transaction");
776 Ok(entity)
777 }
778
779 async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
780 if id.is_empty() {
781 return Err(RepositoryError::InvalidData(
782 "Transaction ID cannot be empty".to_string(),
783 ));
784 }
785
786 debug!(tx_id = %id, "deleting transaction");
787
788 let tx = self.get_by_id(id.clone()).await?;
790
791 let key = self.tx_key(&tx.relayer_id, &id);
792 let reverse_key = self.tx_to_relayer_key(&id);
793 let mut conn = self
794 .get_connection(self.connections.primary(), "delete_by_id")
795 .await?;
796
797 let mut pipe = redis::pipe();
798 pipe.atomic();
799 pipe.del(&key);
800 pipe.del(&reverse_key);
801
802 pipe.exec_async(&mut conn)
803 .await
804 .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
805
806 if let Err(e) = self.remove_all_indexes(&tx).await {
808 error!(tx_id = %id, error = %e, "failed to remove indexes for deleted transaction");
809 }
810
811 debug!(tx_id = %id, "successfully deleted transaction");
812 Ok(())
813 }
814
815 async fn count(&self) -> Result<usize, RepositoryError> {
817 let mut conn = self
818 .get_connection(self.connections.reader(), "count")
819 .await?;
820
821 debug!("counting transactions");
822
823 let relayer_list_key = self.relayer_list_key();
825 let relayer_ids: Vec<String> = conn
826 .smembers(&relayer_list_key)
827 .await
828 .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
829
830 let mut total_count = 0usize;
831 for relayer_id in relayer_ids {
832 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
833 let count: usize = conn
834 .zcard(&relayer_sorted_key)
835 .await
836 .map_err(|e| self.map_redis_error(e, "count_relayer_transactions"))?;
837 total_count += count;
838 }
839
840 debug!(count = %total_count, "transaction count");
841 Ok(total_count)
842 }
843
844 async fn has_entries(&self) -> Result<bool, RepositoryError> {
845 let mut conn = self
846 .get_connection(self.connections.reader(), "has_entries")
847 .await?;
848 let relayer_list_key = self.relayer_list_key();
849
850 debug!("checking if transaction entries exist");
851
852 let exists: bool = conn
853 .exists(&relayer_list_key)
854 .await
855 .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
856
857 debug!(exists = %exists, "transaction entries exist");
858 Ok(exists)
859 }
860
861 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
862 let mut conn = self
863 .get_connection(self.connections.primary(), "drop_all_entries")
864 .await?;
865 let relayer_list_key = self.relayer_list_key();
866
867 debug!("dropping all transaction entries");
868
869 let relayer_ids: Vec<String> = conn
871 .smembers(&relayer_list_key)
872 .await
873 .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
874
875 if relayer_ids.is_empty() {
876 debug!("no transaction entries to drop");
877 return Ok(());
878 }
879
880 let mut pipe = redis::pipe();
882 pipe.atomic();
883
884 for relayer_id in &relayer_ids {
886 let pattern = format!(
888 "{}:{}:{}:{}:*",
889 self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
890 );
891 let mut cursor = 0;
892 let mut tx_ids = Vec::new();
893
894 loop {
895 let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
896 .cursor_arg(cursor)
897 .arg("MATCH")
898 .arg(&pattern)
899 .query_async(&mut conn)
900 .await
901 .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
902
903 for key in keys {
905 pipe.del(&key);
906 if let Some(tx_id) = key.split(':').next_back() {
907 tx_ids.push(tx_id.to_string());
908 }
909 }
910
911 cursor = next_cursor;
912 if cursor == 0 {
913 break;
914 }
915 }
916
917 for tx_id in tx_ids {
919 let reverse_key = self.tx_to_relayer_key(&tx_id);
920 pipe.del(&reverse_key);
921
922 for status in &[
925 TransactionStatus::Canceled,
926 TransactionStatus::Pending,
927 TransactionStatus::Sent,
928 TransactionStatus::Submitted,
929 TransactionStatus::Mined,
930 TransactionStatus::Confirmed,
931 TransactionStatus::Failed,
932 TransactionStatus::Expired,
933 ] {
934 let status_sorted_key = self.relayer_status_sorted_key(relayer_id, status);
936 pipe.zrem(&status_sorted_key, &tx_id);
937
938 let status_key = self.relayer_status_key(relayer_id, status);
940 pipe.srem(&status_key, &tx_id);
941 }
942 }
943
944 let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
946 pipe.del(&relayer_sorted_key);
947 }
948
949 pipe.del(&relayer_list_key);
951
952 pipe.exec_async(&mut conn)
953 .await
954 .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
955
956 debug!(count = %relayer_ids.len(), "dropped all transaction entries for relayers");
957 Ok(())
958 }
959}
960
961#[async_trait]
962impl TransactionRepository for RedisTransactionRepository {
963 async fn find_by_relayer_id(
964 &self,
965 relayer_id: &str,
966 query: PaginationQuery,
967 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
968 let mut conn = self
969 .get_connection(self.connections.reader(), "find_by_relayer_id")
970 .await?;
971
972 debug!(relayer_id = %relayer_id, page = %query.page, per_page = %query.per_page, "fetching transactions for relayer sorted by created_at (newest first)");
973
974 let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
975
976 let sorted_set_count: u64 = conn
978 .zcard(&relayer_sorted_key)
979 .await
980 .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_count"))?;
981
982 if sorted_set_count == 0 {
985 debug!(relayer_id = %relayer_id, "no transactions found for relayer (sorted set is empty)");
986 return Ok(PaginatedResult {
987 items: vec![],
988 total: 0,
989 page: query.page,
990 per_page: query.per_page,
991 });
992 }
993
994 let total = sorted_set_count;
995
996 let start = ((query.page - 1) * query.per_page) as isize;
998 let end = start + query.per_page as isize - 1;
999
1000 if start as u64 >= total {
1001 debug!(relayer_id = %relayer_id, page = %query.page, total = %total, "page is beyond available data");
1002 return Ok(PaginatedResult {
1003 items: vec![],
1004 total,
1005 page: query.page,
1006 per_page: query.per_page,
1007 });
1008 }
1009
1010 let page_ids: Vec<String> = redis::cmd("ZRANGE")
1012 .arg(&relayer_sorted_key)
1013 .arg(start)
1014 .arg(end)
1015 .arg("REV")
1016 .query_async(&mut conn)
1017 .await
1018 .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_sorted"))?;
1019
1020 drop(conn);
1022
1023 let items = self.get_transactions_by_ids(&page_ids).await?;
1024
1025 debug!(relayer_id = %relayer_id, count = %items.results.len(), page = %query.page, "successfully fetched transactions for relayer");
1026
1027 Ok(PaginatedResult {
1028 items: items.results,
1029 total,
1030 page: query.page,
1031 per_page: query.per_page,
1032 })
1033 }
1034
1035 async fn find_by_status(
1037 &self,
1038 relayer_id: &str,
1039 statuses: &[TransactionStatus],
1040 ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
1041 for status in statuses {
1043 self.ensure_status_sorted_set(relayer_id, status).await?;
1044 }
1045
1046 let mut conn = self
1048 .get_connection(self.connections.reader(), "find_by_status")
1049 .await?;
1050
1051 let mut all_ids: Vec<String> = Vec::new();
1052 for status in statuses {
1053 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1055 let ids: Vec<String> = redis::cmd("ZRANGE")
1056 .arg(&sorted_key)
1057 .arg(0)
1058 .arg(-1)
1059 .arg("REV") .query_async(&mut conn)
1061 .await
1062 .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
1063
1064 all_ids.extend(ids);
1065 }
1066
1067 drop(conn);
1069
1070 if all_ids.is_empty() {
1071 return Ok(vec![]);
1072 }
1073
1074 all_ids.sort();
1076 all_ids.dedup();
1077
1078 let mut transactions = self.get_transactions_by_ids(&all_ids).await?;
1080
1081 transactions
1083 .results
1084 .sort_by(|a, b| b.created_at.cmp(&a.created_at));
1085
1086 Ok(transactions.results)
1087 }
1088
1089 async fn find_by_status_paginated(
1090 &self,
1091 relayer_id: &str,
1092 statuses: &[TransactionStatus],
1093 query: PaginationQuery,
1094 oldest_first: bool,
1095 ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1096 for status in statuses {
1098 self.ensure_status_sorted_set(relayer_id, status).await?;
1099 }
1100
1101 let mut conn = self
1102 .get_connection(self.connections.reader(), "find_by_status_paginated")
1103 .await?;
1104
1105 if statuses.len() == 1 {
1107 let sorted_key = self.relayer_status_sorted_key(relayer_id, &statuses[0]);
1108
1109 let total: u64 = conn
1111 .zcard(&sorted_key)
1112 .await
1113 .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_count"))?;
1114
1115 if total == 0 {
1116 return Ok(PaginatedResult {
1117 items: vec![],
1118 total: 0,
1119 page: query.page,
1120 per_page: query.per_page,
1121 });
1122 }
1123
1124 let start = ((query.page.saturating_sub(1)) * query.per_page) as isize;
1126 let end = start + query.per_page as isize - 1;
1127
1128 let mut cmd = redis::cmd("ZRANGE");
1131 cmd.arg(&sorted_key).arg(start).arg(end);
1132 if !oldest_first {
1133 cmd.arg("REV");
1134 }
1135 let page_ids: Vec<String> = cmd
1136 .query_async(&mut conn)
1137 .await
1138 .map_err(|e| self.map_redis_error(e, "find_by_status_paginated"))?;
1139
1140 drop(conn);
1142
1143 let transactions = self.get_transactions_by_ids(&page_ids).await?;
1144
1145 debug!(
1146 relayer_id = %relayer_id,
1147 status = %statuses[0],
1148 total = %total,
1149 page = %query.page,
1150 page_size = %transactions.results.len(),
1151 "fetched paginated transactions by single status"
1152 );
1153
1154 return Ok(PaginatedResult {
1155 items: transactions.results,
1156 total,
1157 page: query.page,
1158 per_page: query.per_page,
1159 });
1160 }
1161
1162 let mut all_ids: Vec<(String, f64)> = Vec::new();
1164 for status in statuses {
1165 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1166
1167 let ids_with_scores: Vec<(String, f64)> = redis::cmd("ZRANGE")
1169 .arg(&sorted_key)
1170 .arg(0)
1171 .arg(-1)
1172 .arg("WITHSCORES")
1173 .query_async(&mut conn)
1174 .await
1175 .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_multi"))?;
1176
1177 all_ids.extend(ids_with_scores);
1178 }
1179
1180 drop(conn);
1182
1183 let mut id_map: std::collections::HashMap<String, f64> = std::collections::HashMap::new();
1185 for (id, score) in all_ids {
1186 id_map
1187 .entry(id)
1188 .and_modify(|s| {
1189 if oldest_first {
1191 if score < *s {
1192 *s = score
1193 }
1194 } else if score > *s {
1195 *s = score
1196 }
1197 })
1198 .or_insert(score);
1199 }
1200
1201 let mut sorted_ids: Vec<(String, f64)> = id_map.into_iter().collect();
1203 if oldest_first {
1204 sorted_ids.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1205 } else {
1206 sorted_ids.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1207 }
1208
1209 let total = sorted_ids.len() as u64;
1210
1211 if total == 0 {
1212 return Ok(PaginatedResult {
1213 items: vec![],
1214 total: 0,
1215 page: query.page,
1216 per_page: query.per_page,
1217 });
1218 }
1219
1220 let start = ((query.page.saturating_sub(1)) * query.per_page) as usize;
1222 let page_ids: Vec<String> = sorted_ids
1223 .into_iter()
1224 .skip(start)
1225 .take(query.per_page as usize)
1226 .map(|(id, _)| id)
1227 .collect();
1228
1229 let transactions = self.get_transactions_by_ids(&page_ids).await?;
1231
1232 debug!(
1233 relayer_id = %relayer_id,
1234 total = %total,
1235 page = %query.page,
1236 page_size = %transactions.results.len(),
1237 "fetched paginated transactions by status"
1238 );
1239
1240 Ok(PaginatedResult {
1241 items: transactions.results,
1242 total,
1243 page: query.page,
1244 per_page: query.per_page,
1245 })
1246 }
1247
1248 async fn find_by_nonce(
1249 &self,
1250 relayer_id: &str,
1251 nonce: u64,
1252 ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
1253 let mut conn = self
1254 .get_connection(self.connections.reader(), "find_by_nonce")
1255 .await?;
1256 let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
1257
1258 let tx_id: Option<String> = conn
1260 .get(nonce_key)
1261 .await
1262 .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
1263
1264 match tx_id {
1265 Some(tx_id) => {
1266 match self.get_by_id(tx_id.clone()).await {
1267 Ok(tx) => Ok(Some(tx)),
1268 Err(RepositoryError::NotFound(_)) => {
1269 warn!(relayer_id = %relayer_id, nonce = %nonce, "stale nonce index found for relayer");
1271 Ok(None)
1272 }
1273 Err(e) => Err(e),
1274 }
1275 }
1276 None => Ok(None),
1277 }
1278 }
1279
1280 async fn update_status(
1281 &self,
1282 tx_id: String,
1283 status: TransactionStatus,
1284 ) -> Result<TransactionRepoModel, RepositoryError> {
1285 let update = TransactionUpdateRequest {
1286 status: Some(status),
1287 ..Default::default()
1288 };
1289 self.partial_update(tx_id, update).await
1290 }
1291
1292 async fn partial_update(
1293 &self,
1294 tx_id: String,
1295 update: TransactionUpdateRequest,
1296 ) -> Result<TransactionRepoModel, RepositoryError> {
1297 const MAX_RETRIES: u32 = 3;
1298 const BACKOFF_MS: u64 = 100;
1299
1300 let mut original_tx = self.get_by_id(tx_id.clone()).await?;
1303 let mut updated_tx = original_tx.clone();
1304 updated_tx.apply_partial_update(update.clone());
1305
1306 let key = self.tx_key(&updated_tx.relayer_id, &tx_id);
1307 let mut original_value = self.serialize_entity(&original_tx, |t| &t.id, "transaction")?;
1308 let mut updated_value = self.serialize_entity(&updated_tx, |t| &t.id, "transaction")?;
1309 let mut data_updated = false;
1310
1311 let mut last_error = None;
1312
1313 for attempt in 0..MAX_RETRIES {
1314 let mut conn = match self
1315 .get_connection(self.connections.primary(), "partial_update")
1316 .await
1317 {
1318 Ok(conn) => conn,
1319 Err(e) => {
1320 last_error = Some(e);
1321 if attempt < MAX_RETRIES - 1 {
1322 tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
1323 continue;
1324 }
1325 return Err(last_error.unwrap());
1326 }
1327 };
1328
1329 if !data_updated {
1330 let cas_script = Script::new(
1331 r#"
1332 local current = redis.call('GET', KEYS[1])
1333 if not current then
1334 return -1
1335 end
1336 if current == ARGV[1] then
1337 redis.call('SET', KEYS[1], ARGV[2])
1338 return 1
1339 end
1340 return 0
1341 "#,
1342 );
1343
1344 let cas_result: i32 = match cas_script
1345 .key(&key)
1346 .arg(&original_value)
1347 .arg(&updated_value)
1348 .invoke_async(&mut conn)
1349 .await
1350 {
1351 Ok(result) => result,
1352 Err(e) => {
1353 if attempt < MAX_RETRIES - 1 {
1354 warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed CAS transaction update, retrying");
1355 last_error = Some(self.map_redis_error(e, "partial_update_cas"));
1356 tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS))
1357 .await;
1358 continue;
1359 }
1360 return Err(self.map_redis_error(e, "partial_update_cas"));
1361 }
1362 };
1363
1364 if cas_result == -1 {
1365 return Err(RepositoryError::NotFound(format!(
1366 "Transaction with ID {tx_id} not found"
1367 )));
1368 }
1369
1370 if cas_result == 0 {
1371 if attempt < MAX_RETRIES - 1 {
1372 warn!(tx_id = %tx_id, attempt = %attempt, "concurrent transaction update detected, rebasing retry");
1373 original_tx = self.get_by_id(tx_id.clone()).await?;
1374 updated_tx = original_tx.clone();
1375 updated_tx.apply_partial_update(update.clone());
1376 original_value =
1377 self.serialize_entity(&original_tx, |t| &t.id, "transaction")?;
1378 updated_value =
1379 self.serialize_entity(&updated_tx, |t| &t.id, "transaction")?;
1380 tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
1381 continue;
1382 }
1383 return Err(RepositoryError::TransactionFailure(format!(
1384 "Concurrent update conflict for transaction {tx_id}"
1385 )));
1386 }
1387
1388 data_updated = true;
1389 }
1390
1391 match self.update_indexes(&updated_tx, Some(&original_tx)).await {
1394 Ok(_) => {
1395 debug!(tx_id = %tx_id, attempt = %attempt, "successfully updated transaction");
1396
1397 if let Some(new_status) = &update.status {
1399 let network_type = format!("{:?}", updated_tx.network_type).to_lowercase();
1400 let relayer_id = updated_tx.relayer_id.as_str();
1401
1402 if original_tx.status != TransactionStatus::Submitted
1404 && *new_status == TransactionStatus::Submitted
1405 {
1406 TRANSACTIONS_SUBMITTED
1407 .with_label_values(&[relayer_id, &network_type])
1408 .inc();
1409
1410 if let Ok(created_time) =
1412 chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at)
1413 {
1414 let processing_seconds =
1415 (Utc::now() - created_time.with_timezone(&Utc)).num_seconds()
1416 as f64;
1417 TRANSACTION_PROCESSING_TIME
1418 .with_label_values(&[
1419 relayer_id,
1420 &network_type,
1421 "creation_to_submission",
1422 ])
1423 .observe(processing_seconds);
1424 }
1425 }
1426
1427 if original_tx.status != *new_status {
1429 let old_status = &original_tx.status;
1431 let old_status_str = format!("{old_status:?}").to_lowercase();
1432 let old_status_gauge = TRANSACTIONS_BY_STATUS.with_label_values(&[
1433 relayer_id,
1434 &network_type,
1435 &old_status_str,
1436 ]);
1437 let clamped_value = (old_status_gauge.get() - 1.0).max(0.0);
1438 old_status_gauge.set(clamped_value);
1439
1440 let new_status_str = format!("{new_status:?}").to_lowercase();
1442 TRANSACTIONS_BY_STATUS
1443 .with_label_values(&[relayer_id, &network_type, &new_status_str])
1444 .inc();
1445 }
1446
1447 let was_final = is_final_state(&original_tx.status);
1450 let is_final = is_final_state(new_status);
1451
1452 if !was_final && is_final {
1453 match new_status {
1454 TransactionStatus::Confirmed => {
1455 TRANSACTIONS_SUCCESS
1456 .with_label_values(&[relayer_id, &network_type])
1457 .inc();
1458
1459 if let (Some(sent_at_str), Some(confirmed_at_str)) =
1461 (&updated_tx.sent_at, &updated_tx.confirmed_at)
1462 {
1463 if let (Ok(sent_time), Ok(confirmed_time)) = (
1464 chrono::DateTime::parse_from_rfc3339(sent_at_str),
1465 chrono::DateTime::parse_from_rfc3339(confirmed_at_str),
1466 ) {
1467 let processing_seconds = (confirmed_time
1468 .with_timezone(&Utc)
1469 - sent_time.with_timezone(&Utc))
1470 .num_seconds()
1471 as f64;
1472 TRANSACTION_PROCESSING_TIME
1473 .with_label_values(&[
1474 relayer_id,
1475 &network_type,
1476 "submission_to_confirmation",
1477 ])
1478 .observe(processing_seconds);
1479 }
1480 }
1481
1482 if let Ok(created_time) =
1484 chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at)
1485 {
1486 if let Some(confirmed_at_str) = &updated_tx.confirmed_at {
1487 if let Ok(confirmed_time) =
1488 chrono::DateTime::parse_from_rfc3339(
1489 confirmed_at_str,
1490 )
1491 {
1492 let processing_seconds = (confirmed_time
1493 .with_timezone(&Utc)
1494 - created_time.with_timezone(&Utc))
1495 .num_seconds()
1496 as f64;
1497 TRANSACTION_PROCESSING_TIME
1498 .with_label_values(&[
1499 relayer_id,
1500 &network_type,
1501 "creation_to_confirmation",
1502 ])
1503 .observe(processing_seconds);
1504 }
1505 }
1506 }
1507 }
1508 TransactionStatus::Failed => {
1509 let failure_reason = updated_tx
1511 .status_reason
1512 .as_deref()
1513 .map(|reason| {
1514 if reason.starts_with("Submission failed:") {
1515 "submission_failed"
1516 } else if reason.starts_with("Preparation failed:") {
1517 "preparation_failed"
1518 } else {
1519 "failed"
1520 }
1521 })
1522 .unwrap_or("failed");
1523 TRANSACTIONS_FAILED
1524 .with_label_values(&[
1525 relayer_id,
1526 &network_type,
1527 failure_reason,
1528 ])
1529 .inc();
1530 }
1531 TransactionStatus::Expired => {
1532 TRANSACTIONS_FAILED
1533 .with_label_values(&[relayer_id, &network_type, "expired"])
1534 .inc();
1535 }
1536 TransactionStatus::Canceled => {
1537 TRANSACTIONS_FAILED
1538 .with_label_values(&[relayer_id, &network_type, "canceled"])
1539 .inc();
1540 }
1541 _ => {
1542 }
1544 }
1545 }
1546 }
1547 return Ok(updated_tx);
1548 }
1549 Err(e) if attempt < MAX_RETRIES - 1 => {
1550 warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed to update indexes, retrying");
1551 last_error = Some(e);
1552 tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
1553 continue;
1554 }
1555 Err(e) => return Err(e),
1556 }
1557 }
1558
1559 Err(last_error.unwrap_or_else(|| {
1560 RepositoryError::UnexpectedError("partial_update exhausted retries".to_string())
1561 }))
1562 }
1563
1564 async fn update_network_data(
1565 &self,
1566 tx_id: String,
1567 network_data: NetworkTransactionData,
1568 ) -> Result<TransactionRepoModel, RepositoryError> {
1569 let update = TransactionUpdateRequest {
1570 network_data: Some(network_data),
1571 ..Default::default()
1572 };
1573 self.partial_update(tx_id, update).await
1574 }
1575
1576 async fn set_sent_at(
1577 &self,
1578 tx_id: String,
1579 sent_at: String,
1580 ) -> Result<TransactionRepoModel, RepositoryError> {
1581 let update = TransactionUpdateRequest {
1582 sent_at: Some(sent_at),
1583 ..Default::default()
1584 };
1585 self.partial_update(tx_id, update).await
1586 }
1587
1588 async fn set_confirmed_at(
1589 &self,
1590 tx_id: String,
1591 confirmed_at: String,
1592 ) -> Result<TransactionRepoModel, RepositoryError> {
1593 let update = TransactionUpdateRequest {
1594 confirmed_at: Some(confirmed_at),
1595 ..Default::default()
1596 };
1597 self.partial_update(tx_id, update).await
1598 }
1599
1600 async fn count_by_status(
1604 &self,
1605 relayer_id: &str,
1606 statuses: &[TransactionStatus],
1607 ) -> Result<u64, RepositoryError> {
1608 let mut conn = self
1609 .get_connection(self.connections.reader(), "count_by_status")
1610 .await?;
1611 let mut total_count: u64 = 0;
1612
1613 for status in statuses {
1614 self.ensure_status_sorted_set(relayer_id, status).await?;
1616
1617 let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1618 let count: u64 = conn
1619 .zcard(&sorted_key)
1620 .await
1621 .map_err(|e| self.map_redis_error(e, "count_by_status"))?;
1622 total_count += count;
1623 }
1624
1625 debug!(relayer_id = %relayer_id, count = %total_count, "counted transactions by status");
1626 Ok(total_count)
1627 }
1628
1629 async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError> {
1630 if ids.is_empty() {
1631 debug!("no transaction IDs provided for batch delete");
1632 return Ok(BatchDeleteResult::default());
1633 }
1634
1635 debug!(count = %ids.len(), "batch deleting transactions by IDs (with fetch)");
1636
1637 let batch_result = self.get_transactions_by_ids(&ids).await?;
1639
1640 let requests: Vec<TransactionDeleteRequest> = batch_result
1642 .results
1643 .iter()
1644 .map(|tx| TransactionDeleteRequest {
1645 id: tx.id.clone(),
1646 relayer_id: tx.relayer_id.clone(),
1647 nonce: self.extract_nonce(&tx.network_data),
1648 })
1649 .collect();
1650
1651 let mut result = self.delete_by_requests(requests).await?;
1653
1654 for id in batch_result.failed_ids {
1656 result
1657 .failed
1658 .push((id.clone(), format!("Transaction with ID {id} not found")));
1659 }
1660
1661 Ok(result)
1662 }
1663
1664 async fn delete_by_requests(
1665 &self,
1666 requests: Vec<TransactionDeleteRequest>,
1667 ) -> Result<BatchDeleteResult, RepositoryError> {
1668 if requests.is_empty() {
1669 debug!("no delete requests provided for batch delete");
1670 return Ok(BatchDeleteResult::default());
1671 }
1672
1673 debug!(count = %requests.len(), "batch deleting transactions by requests (no fetch)");
1674 let mut conn = self
1675 .get_connection(self.connections.primary(), "batch_delete_no_fetch")
1676 .await?;
1677 let mut pipe = redis::pipe();
1678 pipe.atomic();
1679
1680 let all_statuses = [
1682 TransactionStatus::Canceled,
1683 TransactionStatus::Pending,
1684 TransactionStatus::Sent,
1685 TransactionStatus::Submitted,
1686 TransactionStatus::Mined,
1687 TransactionStatus::Confirmed,
1688 TransactionStatus::Failed,
1689 TransactionStatus::Expired,
1690 ];
1691
1692 for req in &requests {
1694 let tx_key = self.tx_key(&req.relayer_id, &req.id);
1696 pipe.del(&tx_key);
1697
1698 let reverse_key = self.tx_to_relayer_key(&req.id);
1700 pipe.del(&reverse_key);
1701
1702 for status in &all_statuses {
1704 let status_sorted_key = self.relayer_status_sorted_key(&req.relayer_id, status);
1705 pipe.zrem(&status_sorted_key, &req.id);
1706
1707 let status_legacy_key = self.relayer_status_key(&req.relayer_id, status);
1708 pipe.srem(&status_legacy_key, &req.id);
1709 }
1710
1711 if let Some(nonce) = req.nonce {
1713 let nonce_key = self.relayer_nonce_key(&req.relayer_id, nonce);
1714 pipe.del(&nonce_key);
1715 }
1716
1717 let relayer_sorted_key = self.relayer_tx_by_created_at_key(&req.relayer_id);
1719 pipe.zrem(&relayer_sorted_key, &req.id);
1720 }
1721
1722 match pipe.exec_async(&mut conn).await {
1724 Ok(_) => {
1725 let deleted_count = requests.len();
1726 debug!(
1727 deleted_count = %deleted_count,
1728 "batch delete completed"
1729 );
1730 Ok(BatchDeleteResult {
1731 deleted_count,
1732 failed: vec![],
1733 })
1734 }
1735 Err(e) => {
1736 error!(error = %e, "batch delete pipeline failed");
1737 let failed: Vec<(String, String)> = requests
1739 .iter()
1740 .map(|req| (req.id.clone(), format!("Redis pipeline error: {e}")))
1741 .collect();
1742 Ok(BatchDeleteResult {
1743 deleted_count: 0,
1744 failed,
1745 })
1746 }
1747 }
1748 }
1749}
1750
1751#[cfg(test)]
1752mod tests {
1753 use super::*;
1754 use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
1755 use alloy::primitives::U256;
1756 use deadpool_redis::{Config, Runtime};
1757 use lazy_static::lazy_static;
1758 use std::str::FromStr;
1759 use tokio;
1760 use uuid::Uuid;
1761
1762 use tokio::sync::Mutex;
1763
1764 lazy_static! {
1766 static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
1767 }
1768
1769 fn create_test_transaction(id: &str) -> TransactionRepoModel {
1771 TransactionRepoModel {
1772 id: id.to_string(),
1773 relayer_id: "relayer-1".to_string(),
1774 status: TransactionStatus::Pending,
1775 status_reason: None,
1776 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
1777 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
1778 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
1779 valid_until: None,
1780 delete_at: None,
1781 network_type: NetworkType::Evm,
1782 priced_at: None,
1783 hashes: vec![],
1784 network_data: NetworkTransactionData::Evm(EvmTransactionData {
1785 gas_price: Some(1000000000),
1786 gas_limit: Some(21000),
1787 nonce: Some(1),
1788 value: U256::from_str("1000000000000000000").unwrap(),
1789 data: Some("0x".to_string()),
1790 from: "0xSender".to_string(),
1791 to: Some("0xRecipient".to_string()),
1792 chain_id: 1,
1793 signature: None,
1794 hash: Some(format!("0x{id}")),
1795 speed: Some(Speed::Fast),
1796 max_fee_per_gas: None,
1797 max_priority_fee_per_gas: None,
1798 raw: None,
1799 }),
1800 noop_count: None,
1801 is_canceled: Some(false),
1802 metadata: None,
1803 }
1804 }
1805
1806 fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
1807 let mut tx = create_test_transaction(id);
1808 tx.relayer_id = relayer_id.to_string();
1809 tx
1810 }
1811
1812 fn create_test_transaction_with_status(
1813 id: &str,
1814 relayer_id: &str,
1815 status: TransactionStatus,
1816 ) -> TransactionRepoModel {
1817 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1818 tx.status = status;
1819 tx
1820 }
1821
1822 fn create_test_transaction_with_nonce(
1823 id: &str,
1824 nonce: u64,
1825 relayer_id: &str,
1826 ) -> TransactionRepoModel {
1827 let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1828 if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
1829 evm_data.nonce = Some(nonce);
1830 }
1831 tx
1832 }
1833
1834 async fn setup_test_repo() -> RedisTransactionRepository {
1835 let redis_url = std::env::var("REDIS_TEST_URL")
1837 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1838
1839 let cfg = Config::from_url(&redis_url);
1840 let pool = Arc::new(
1841 cfg.builder()
1842 .expect("Failed to create pool builder")
1843 .max_size(16)
1844 .runtime(Runtime::Tokio1)
1845 .build()
1846 .expect("Failed to build Redis pool"),
1847 );
1848
1849 let connections = Arc::new(RedisConnections::new_single_pool(pool));
1851
1852 let random_id = Uuid::new_v4().to_string();
1853 let key_prefix = format!("test_prefix:{random_id}");
1854
1855 RedisTransactionRepository::new(connections, key_prefix)
1856 .expect("Failed to create RedisTransactionRepository")
1857 }
1858
1859 #[tokio::test]
1860 #[ignore = "Requires active Redis instance"]
1861 async fn test_new_repository_creation() {
1862 let repo = setup_test_repo().await;
1863 assert!(repo.key_prefix.contains("test_prefix"));
1864 }
1865
1866 #[tokio::test]
1867 #[ignore = "Requires active Redis instance"]
1868 async fn test_new_repository_empty_prefix_fails() {
1869 let redis_url = std::env::var("REDIS_TEST_URL")
1870 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1871 let cfg = Config::from_url(&redis_url);
1872 let pool = Arc::new(
1873 cfg.builder()
1874 .expect("Failed to create pool builder")
1875 .max_size(16)
1876 .runtime(Runtime::Tokio1)
1877 .build()
1878 .expect("Failed to build Redis pool"),
1879 );
1880 let connections = Arc::new(RedisConnections::new_single_pool(pool));
1881
1882 let result = RedisTransactionRepository::new(connections, "".to_string());
1883 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1884 }
1885
1886 #[tokio::test]
1887 #[ignore = "Requires active Redis instance"]
1888 async fn test_key_generation() {
1889 let repo = setup_test_repo().await;
1890
1891 assert!(repo
1892 .tx_key("relayer-1", "test-id")
1893 .contains(":relayer:relayer-1:tx:test-id"));
1894 assert!(repo
1895 .tx_to_relayer_key("test-id")
1896 .contains(":relayer:tx_to_relayer:test-id"));
1897 assert!(repo.relayer_list_key().contains(":relayer_list"));
1898 assert!(repo
1899 .relayer_status_key("relayer-1", &TransactionStatus::Pending)
1900 .contains(":relayer:relayer-1:status:Pending"));
1901 assert!(repo
1902 .relayer_nonce_key("relayer-1", 42)
1903 .contains(":relayer:relayer-1:nonce:42"));
1904 }
1905
1906 #[tokio::test]
1907 #[ignore = "Requires active Redis instance"]
1908 async fn test_serialize_deserialize_transaction() {
1909 let repo = setup_test_repo().await;
1910 let tx = create_test_transaction("test-1");
1911
1912 let serialized = repo
1913 .serialize_entity(&tx, |t| &t.id, "transaction")
1914 .expect("Serialization should succeed");
1915 let deserialized: TransactionRepoModel = repo
1916 .deserialize_entity(&serialized, "test-1", "transaction")
1917 .expect("Deserialization should succeed");
1918
1919 assert_eq!(tx.id, deserialized.id);
1920 assert_eq!(tx.relayer_id, deserialized.relayer_id);
1921 assert_eq!(tx.status, deserialized.status);
1922 }
1923
1924 #[tokio::test]
1925 #[ignore = "Requires active Redis instance"]
1926 async fn test_extract_nonce() {
1927 let repo = setup_test_repo().await;
1928 let random_id = Uuid::new_v4().to_string();
1929 let relayer_id = Uuid::new_v4().to_string();
1930 let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1931
1932 let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
1933 assert_eq!(nonce, Some(42));
1934 }
1935
1936 #[tokio::test]
1937 #[ignore = "Requires active Redis instance"]
1938 async fn test_create_transaction() {
1939 let repo = setup_test_repo().await;
1940 let random_id = Uuid::new_v4().to_string();
1941 let tx = create_test_transaction(&random_id);
1942
1943 let result = repo.create(tx.clone()).await.unwrap();
1944 assert_eq!(result.id, tx.id);
1945 }
1946
1947 #[tokio::test]
1948 #[ignore = "Requires active Redis instance"]
1949 async fn test_get_transaction() {
1950 let repo = setup_test_repo().await;
1951 let random_id = Uuid::new_v4().to_string();
1952 let tx = create_test_transaction(&random_id);
1953
1954 repo.create(tx.clone()).await.unwrap();
1955 let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
1956 assert_eq!(stored.id, tx.id);
1957 assert_eq!(stored.relayer_id, tx.relayer_id);
1958 }
1959
1960 #[tokio::test]
1961 #[ignore = "Requires active Redis instance"]
1962 async fn test_update_transaction() {
1963 let repo = setup_test_repo().await;
1964 let random_id = Uuid::new_v4().to_string();
1965 let mut tx = create_test_transaction(&random_id);
1966
1967 repo.create(tx.clone()).await.unwrap();
1968 tx.status = TransactionStatus::Confirmed;
1969
1970 let updated = repo.update(random_id.to_string(), tx).await.unwrap();
1971 assert!(matches!(updated.status, TransactionStatus::Confirmed));
1972 }
1973
1974 #[tokio::test]
1975 #[ignore = "Requires active Redis instance"]
1976 async fn test_delete_transaction() {
1977 let repo = setup_test_repo().await;
1978 let random_id = Uuid::new_v4().to_string();
1979 let tx = create_test_transaction(&random_id);
1980
1981 repo.create(tx).await.unwrap();
1982 repo.delete_by_id(random_id.to_string()).await.unwrap();
1983
1984 let result = repo.get_by_id(random_id.to_string()).await;
1985 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1986 }
1987
1988 #[tokio::test]
1989 #[ignore = "Requires active Redis instance"]
1990 async fn test_list_all_transactions() {
1991 let repo = setup_test_repo().await;
1992 let random_id = Uuid::new_v4().to_string();
1993 let random_id2 = Uuid::new_v4().to_string();
1994
1995 let tx1 = create_test_transaction(&random_id);
1996 let tx2 = create_test_transaction(&random_id2);
1997
1998 repo.create(tx1).await.unwrap();
1999 repo.create(tx2).await.unwrap();
2000
2001 let transactions = repo.list_all().await.unwrap();
2002 assert!(transactions.len() >= 2);
2003 }
2004
2005 #[tokio::test]
2006 #[ignore = "Requires active Redis instance"]
2007 async fn test_count_transactions() {
2008 let repo = setup_test_repo().await;
2009 let random_id = Uuid::new_v4().to_string();
2010 let tx = create_test_transaction(&random_id);
2011
2012 let count = repo.count().await.unwrap();
2013 repo.create(tx).await.unwrap();
2014 assert!(repo.count().await.unwrap() > count);
2015 }
2016
2017 #[tokio::test]
2018 #[ignore = "Requires active Redis instance"]
2019 async fn test_get_nonexistent_transaction() {
2020 let repo = setup_test_repo().await;
2021 let result = repo.get_by_id("nonexistent".to_string()).await;
2022 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2023 }
2024
2025 #[tokio::test]
2026 #[ignore = "Requires active Redis instance"]
2027 async fn test_duplicate_transaction_creation() {
2028 let repo = setup_test_repo().await;
2029 let random_id = Uuid::new_v4().to_string();
2030
2031 let tx = create_test_transaction(&random_id);
2032
2033 repo.create(tx.clone()).await.unwrap();
2034 let result = repo.create(tx).await;
2035
2036 assert!(matches!(
2037 result,
2038 Err(RepositoryError::ConstraintViolation(_))
2039 ));
2040 }
2041
2042 #[tokio::test]
2043 #[ignore = "Requires active Redis instance"]
2044 async fn test_update_nonexistent_transaction() {
2045 let repo = setup_test_repo().await;
2046 let tx = create_test_transaction("test-1");
2047
2048 let result = repo.update("nonexistent".to_string(), tx).await;
2049 assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2050 }
2051
2052 #[tokio::test]
2053 #[ignore = "Requires active Redis instance"]
2054 async fn test_list_paginated() {
2055 let repo = setup_test_repo().await;
2056
2057 for _ in 1..=10 {
2059 let random_id = Uuid::new_v4().to_string();
2060 let tx = create_test_transaction(&random_id);
2061 repo.create(tx).await.unwrap();
2062 }
2063
2064 let query = PaginationQuery {
2066 page: 1,
2067 per_page: 3,
2068 };
2069 let result = repo.list_paginated(query).await.unwrap();
2070 assert_eq!(result.items.len(), 3);
2071 assert!(result.total >= 10);
2072 assert_eq!(result.page, 1);
2073 assert_eq!(result.per_page, 3);
2074
2075 let query = PaginationQuery {
2077 page: 1000,
2078 per_page: 3,
2079 };
2080 let result = repo.list_paginated(query).await.unwrap();
2081 assert_eq!(result.items.len(), 0);
2082 }
2083
2084 #[tokio::test]
2085 #[ignore = "Requires active Redis instance"]
2086 async fn test_find_by_relayer_id() {
2087 let repo = setup_test_repo().await;
2088 let random_id = Uuid::new_v4().to_string();
2089 let random_id2 = Uuid::new_v4().to_string();
2090 let random_id3 = Uuid::new_v4().to_string();
2091
2092 let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
2093 let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
2094 let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
2095
2096 repo.create(tx1).await.unwrap();
2097 repo.create(tx2).await.unwrap();
2098 repo.create(tx3).await.unwrap();
2099
2100 let query = PaginationQuery {
2102 page: 1,
2103 per_page: 10,
2104 };
2105 let result = repo
2106 .find_by_relayer_id("relayer-1", query.clone())
2107 .await
2108 .unwrap();
2109 assert!(result.total >= 2);
2110 assert!(result.items.len() >= 2);
2111 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
2112
2113 let result = repo
2115 .find_by_relayer_id("relayer-2", query.clone())
2116 .await
2117 .unwrap();
2118 assert!(result.total >= 1);
2119 assert!(!result.items.is_empty());
2120 assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
2121
2122 let result = repo
2124 .find_by_relayer_id("non-existent", query.clone())
2125 .await
2126 .unwrap();
2127 assert_eq!(result.total, 0);
2128 assert_eq!(result.items.len(), 0);
2129 }
2130
2131 #[tokio::test]
2132 #[ignore = "Requires active Redis instance"]
2133 async fn test_find_by_relayer_id_sorted_by_created_at_newest_first() {
2134 let repo = setup_test_repo().await;
2135 let relayer_id = Uuid::new_v4().to_string();
2136
2137 let mut tx1 = create_test_transaction_with_relayer("test-1", &relayer_id);
2139 tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); let mut tx2 = create_test_transaction_with_relayer("test-2", &relayer_id);
2142 tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); let mut tx3 = create_test_transaction_with_relayer("test-3", &relayer_id);
2145 tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); repo.create(tx2.clone()).await.unwrap(); repo.create(tx1.clone()).await.unwrap(); repo.create(tx3.clone()).await.unwrap(); let query = PaginationQuery {
2153 page: 1,
2154 per_page: 10,
2155 };
2156 let result = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
2157
2158 assert_eq!(result.total, 3);
2159 assert_eq!(result.items.len(), 3);
2160
2161 assert_eq!(
2163 result.items[0].id, "test-3",
2164 "First item should be newest (test-3)"
2165 );
2166 assert_eq!(
2167 result.items[0].created_at,
2168 "2025-01-27T14:00:00.000000+00:00"
2169 );
2170
2171 assert_eq!(
2172 result.items[1].id, "test-2",
2173 "Second item should be middle (test-2)"
2174 );
2175 assert_eq!(
2176 result.items[1].created_at,
2177 "2025-01-27T12:00:00.000000+00:00"
2178 );
2179
2180 assert_eq!(
2181 result.items[2].id, "test-1",
2182 "Third item should be oldest (test-1)"
2183 );
2184 assert_eq!(
2185 result.items[2].created_at,
2186 "2025-01-27T10:00:00.000000+00:00"
2187 );
2188 }
2189
2190 #[tokio::test]
2191 #[ignore = "Requires active Redis instance"]
2192 async fn test_find_by_relayer_id_migration_from_old_index() {
2193 let repo = setup_test_repo().await;
2194 let relayer_id = Uuid::new_v4().to_string();
2195
2196 let mut tx1 = create_test_transaction_with_relayer("migrate-test-1", &relayer_id);
2198 tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); let mut tx2 = create_test_transaction_with_relayer("migrate-test-2", &relayer_id);
2201 tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); let mut tx3 = create_test_transaction_with_relayer("migrate-test-3", &relayer_id);
2204 tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); let mut conn = repo.connections.primary().get().await.unwrap();
2209 let relayer_list_key = repo.relayer_list_key();
2210 let _: () = conn.sadd(&relayer_list_key, &relayer_id).await.unwrap();
2211
2212 for tx in &[&tx1, &tx2, &tx3] {
2213 let key = repo.tx_key(&tx.relayer_id, &tx.id);
2214 let reverse_key = repo.tx_to_relayer_key(&tx.id);
2215 let value = repo.serialize_entity(tx, |t| &t.id, "transaction").unwrap();
2216
2217 let mut pipe = redis::pipe();
2218 pipe.atomic();
2219 pipe.set(&key, &value);
2220 pipe.set(&reverse_key, &tx.relayer_id);
2221
2222 let status_key = repo.relayer_status_key(&tx.relayer_id, &tx.status);
2224 pipe.sadd(&status_key, &tx.id);
2225
2226 pipe.exec_async(&mut conn).await.unwrap();
2227 }
2228
2229 let relayer_sorted_key = repo.relayer_tx_by_created_at_key(&relayer_id);
2231 let count: u64 = conn.zcard(&relayer_sorted_key).await.unwrap();
2232 assert_eq!(count, 0, "Sorted set should be empty for old transactions");
2233
2234 let query = PaginationQuery {
2236 page: 1,
2237 per_page: 10,
2238 };
2239 let result = repo
2240 .find_by_relayer_id(&relayer_id, query.clone())
2241 .await
2242 .unwrap();
2243
2244 let count_after: u64 = conn.zcard(&relayer_sorted_key).await.unwrap();
2246 assert_eq!(
2247 count_after, 3,
2248 "Sorted set should be populated after migration"
2249 );
2250
2251 assert_eq!(result.total, 3);
2253 assert_eq!(result.items.len(), 3);
2254
2255 assert_eq!(
2256 result.items[0].id, "migrate-test-3",
2257 "First item should be newest after migration"
2258 );
2259 assert_eq!(
2260 result.items[0].created_at,
2261 "2025-01-27T14:00:00.000000+00:00"
2262 );
2263
2264 assert_eq!(
2265 result.items[1].id, "migrate-test-2",
2266 "Second item should be middle after migration"
2267 );
2268 assert_eq!(
2269 result.items[1].created_at,
2270 "2025-01-27T12:00:00.000000+00:00"
2271 );
2272
2273 assert_eq!(
2274 result.items[2].id, "migrate-test-1",
2275 "Third item should be oldest after migration"
2276 );
2277 assert_eq!(
2278 result.items[2].created_at,
2279 "2025-01-27T10:00:00.000000+00:00"
2280 );
2281
2282 let result2 = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
2284 assert_eq!(result2.total, 3);
2285 assert_eq!(result2.items.len(), 3);
2286 assert_eq!(result.items[0].id, result2.items[0].id);
2288 }
2289
2290 #[tokio::test]
2291 #[ignore = "Requires active Redis instance"]
2292 async fn test_find_by_status() {
2293 let repo = setup_test_repo().await;
2294 let random_id = Uuid::new_v4().to_string();
2295 let random_id2 = Uuid::new_v4().to_string();
2296 let random_id3 = Uuid::new_v4().to_string();
2297 let relayer_id = Uuid::new_v4().to_string();
2298 let tx1 = create_test_transaction_with_status(
2299 &random_id,
2300 &relayer_id,
2301 TransactionStatus::Pending,
2302 );
2303 let tx2 =
2304 create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
2305 let tx3 = create_test_transaction_with_status(
2306 &random_id3,
2307 &relayer_id,
2308 TransactionStatus::Confirmed,
2309 );
2310
2311 repo.create(tx1).await.unwrap();
2312 repo.create(tx2).await.unwrap();
2313 repo.create(tx3).await.unwrap();
2314
2315 let result = repo
2317 .find_by_status(&relayer_id, &[TransactionStatus::Pending])
2318 .await
2319 .unwrap();
2320 assert_eq!(result.len(), 1);
2321 assert_eq!(result[0].status, TransactionStatus::Pending);
2322
2323 let result = repo
2325 .find_by_status(
2326 &relayer_id,
2327 &[TransactionStatus::Pending, TransactionStatus::Sent],
2328 )
2329 .await
2330 .unwrap();
2331 assert_eq!(result.len(), 2);
2332
2333 let result = repo
2335 .find_by_status(&relayer_id, &[TransactionStatus::Failed])
2336 .await
2337 .unwrap();
2338 assert_eq!(result.len(), 0);
2339 }
2340
2341 #[tokio::test]
2342 #[ignore = "Requires active Redis instance"]
2343 async fn test_find_by_status_paginated() {
2344 let repo = setup_test_repo().await;
2345 let relayer_id = Uuid::new_v4().to_string();
2346
2347 for i in 1..=5 {
2349 let tx_id = Uuid::new_v4().to_string();
2350 let mut tx = create_test_transaction_with_status(
2351 &tx_id,
2352 &relayer_id,
2353 TransactionStatus::Pending,
2354 );
2355 tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2356 repo.create(tx).await.unwrap();
2357 }
2358
2359 for i in 6..=7 {
2361 let tx_id = Uuid::new_v4().to_string();
2362 let mut tx = create_test_transaction_with_status(
2363 &tx_id,
2364 &relayer_id,
2365 TransactionStatus::Confirmed,
2366 );
2367 tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2368 repo.create(tx).await.unwrap();
2369 }
2370
2371 let query = PaginationQuery {
2373 page: 1,
2374 per_page: 2,
2375 };
2376 let result = repo
2377 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2378 .await
2379 .unwrap();
2380
2381 assert_eq!(result.total, 5);
2382 assert_eq!(result.items.len(), 2);
2383 assert_eq!(result.page, 1);
2384 assert_eq!(result.per_page, 2);
2385
2386 let query = PaginationQuery {
2388 page: 2,
2389 per_page: 2,
2390 };
2391 let result = repo
2392 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2393 .await
2394 .unwrap();
2395
2396 assert_eq!(result.total, 5);
2397 assert_eq!(result.items.len(), 2);
2398 assert_eq!(result.page, 2);
2399
2400 let query = PaginationQuery {
2402 page: 3,
2403 per_page: 2,
2404 };
2405 let result = repo
2406 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2407 .await
2408 .unwrap();
2409
2410 assert_eq!(result.total, 5);
2411 assert_eq!(result.items.len(), 1);
2412
2413 let query = PaginationQuery {
2415 page: 1,
2416 per_page: 10,
2417 };
2418 let result = repo
2419 .find_by_status_paginated(
2420 &relayer_id,
2421 &[TransactionStatus::Pending, TransactionStatus::Confirmed],
2422 query,
2423 false,
2424 )
2425 .await
2426 .unwrap();
2427
2428 assert_eq!(result.total, 7);
2429 assert_eq!(result.items.len(), 7);
2430
2431 let query = PaginationQuery {
2433 page: 1,
2434 per_page: 10,
2435 };
2436 let result = repo
2437 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Failed], query, false)
2438 .await
2439 .unwrap();
2440
2441 assert_eq!(result.total, 0);
2442 assert_eq!(result.items.len(), 0);
2443 }
2444
2445 #[tokio::test]
2446 #[ignore = "Requires active Redis instance"]
2447 async fn test_find_by_status_paginated_oldest_first() {
2448 let repo = setup_test_repo().await;
2449 let relayer_id = Uuid::new_v4().to_string();
2450
2451 for i in 1..=5 {
2453 let tx_id = format!("tx{}-{}", i, Uuid::new_v4());
2454 let mut tx = create_test_transaction(&tx_id);
2455 tx.relayer_id = relayer_id.clone();
2456 tx.status = TransactionStatus::Pending;
2457 tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2458 repo.create(tx).await.unwrap();
2459 }
2460
2461 let query = PaginationQuery {
2463 page: 1,
2464 per_page: 3,
2465 };
2466 let result = repo
2467 .find_by_status_paginated(
2468 &relayer_id,
2469 &[TransactionStatus::Pending],
2470 query.clone(),
2471 true,
2472 )
2473 .await
2474 .unwrap();
2475
2476 assert_eq!(result.total, 5);
2477 assert_eq!(result.items.len(), 3);
2478 assert!(
2480 result.items[0].created_at < result.items[1].created_at,
2481 "First item should be older than second"
2482 );
2483 assert!(
2484 result.items[1].created_at < result.items[2].created_at,
2485 "Second item should be older than third"
2486 );
2487
2488 let result_newest = repo
2490 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2491 .await
2492 .unwrap();
2493
2494 assert_eq!(result_newest.items.len(), 3);
2495 assert!(
2497 result_newest.items[0].created_at > result_newest.items[1].created_at,
2498 "First item should be newer than second"
2499 );
2500 assert!(
2501 result_newest.items[1].created_at > result_newest.items[2].created_at,
2502 "Second item should be newer than third"
2503 );
2504 }
2505
2506 #[tokio::test]
2507 #[ignore = "Requires active Redis instance"]
2508 async fn test_find_by_status_paginated_oldest_first_single_item() {
2509 let repo = setup_test_repo().await;
2510 let relayer_id = Uuid::new_v4().to_string();
2511
2512 let timestamps = [
2514 "2025-01-27T08:00:00.000000+00:00", "2025-01-27T10:00:00.000000+00:00", "2025-01-27T12:00:00.000000+00:00", ];
2518
2519 let mut oldest_id = String::new();
2520 let mut newest_id = String::new();
2521
2522 for (i, timestamp) in timestamps.iter().enumerate() {
2523 let tx_id = format!("tx-{}-{}", i, Uuid::new_v4());
2524 if i == 0 {
2525 oldest_id = tx_id.clone();
2526 }
2527 if i == 2 {
2528 newest_id = tx_id.clone();
2529 }
2530 let mut tx = create_test_transaction(&tx_id);
2531 tx.relayer_id = relayer_id.clone();
2532 tx.status = TransactionStatus::Pending;
2533 tx.created_at = timestamp.to_string();
2534 repo.create(tx).await.unwrap();
2535 }
2536
2537 let query = PaginationQuery {
2539 page: 1,
2540 per_page: 1,
2541 };
2542 let result = repo
2543 .find_by_status_paginated(
2544 &relayer_id,
2545 &[TransactionStatus::Pending],
2546 query.clone(),
2547 true,
2548 )
2549 .await
2550 .unwrap();
2551
2552 assert_eq!(result.total, 3);
2553 assert_eq!(result.items.len(), 1);
2554 assert_eq!(
2555 result.items[0].id, oldest_id,
2556 "With oldest_first=true and per_page=1, should return the oldest transaction"
2557 );
2558
2559 let result = repo
2561 .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2562 .await
2563 .unwrap();
2564
2565 assert_eq!(result.items.len(), 1);
2566 assert_eq!(
2567 result.items[0].id, newest_id,
2568 "With oldest_first=false and per_page=1, should return the newest transaction"
2569 );
2570 }
2571
2572 #[tokio::test]
2573 #[ignore = "Requires active Redis instance"]
2574 async fn test_find_by_nonce() {
2575 let repo = setup_test_repo().await;
2576 let random_id = Uuid::new_v4().to_string();
2577 let random_id2 = Uuid::new_v4().to_string();
2578 let relayer_id = Uuid::new_v4().to_string();
2579
2580 let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2581 let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
2582
2583 repo.create(tx1.clone()).await.unwrap();
2584 repo.create(tx2).await.unwrap();
2585
2586 let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2588 assert!(result.is_some());
2589 assert_eq!(result.unwrap().id, random_id);
2590
2591 let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
2593 assert!(result.is_none());
2594
2595 let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
2597 assert!(result.is_none());
2598 }
2599
2600 #[tokio::test]
2601 #[ignore = "Requires active Redis instance"]
2602 async fn test_update_status() {
2603 let repo = setup_test_repo().await;
2604 let random_id = Uuid::new_v4().to_string();
2605 let tx = create_test_transaction(&random_id);
2606
2607 repo.create(tx).await.unwrap();
2608 let updated = repo
2609 .update_status(random_id.to_string(), TransactionStatus::Confirmed)
2610 .await
2611 .unwrap();
2612 assert_eq!(updated.status, TransactionStatus::Confirmed);
2613 }
2614
2615 #[tokio::test]
2616 #[ignore = "Requires active Redis instance"]
2617 async fn test_partial_update() {
2618 let repo = setup_test_repo().await;
2619 let random_id = Uuid::new_v4().to_string();
2620 let tx = create_test_transaction(&random_id);
2621
2622 repo.create(tx).await.unwrap();
2623
2624 let update = TransactionUpdateRequest {
2625 status: Some(TransactionStatus::Sent),
2626 status_reason: Some("Transaction sent".to_string()),
2627 sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
2628 confirmed_at: None,
2629 network_data: None,
2630 hashes: None,
2631 is_canceled: None,
2632 priced_at: None,
2633 noop_count: None,
2634 delete_at: None,
2635 metadata: None,
2636 };
2637
2638 let updated = repo
2639 .partial_update(random_id.to_string(), update)
2640 .await
2641 .unwrap();
2642 assert_eq!(updated.status, TransactionStatus::Sent);
2643 assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
2644 assert_eq!(
2645 updated.sent_at,
2646 Some("2025-01-27T16:00:00.000000+00:00".to_string())
2647 );
2648 }
2649
2650 #[tokio::test]
2651 #[ignore = "Requires active Redis instance"]
2652 async fn test_set_sent_at() {
2653 let repo = setup_test_repo().await;
2654 let random_id = Uuid::new_v4().to_string();
2655 let tx = create_test_transaction(&random_id);
2656
2657 repo.create(tx).await.unwrap();
2658 let updated = repo
2659 .set_sent_at(
2660 random_id.to_string(),
2661 "2025-01-27T16:00:00.000000+00:00".to_string(),
2662 )
2663 .await
2664 .unwrap();
2665 assert_eq!(
2666 updated.sent_at,
2667 Some("2025-01-27T16:00:00.000000+00:00".to_string())
2668 );
2669 }
2670
2671 #[tokio::test]
2672 #[ignore = "Requires active Redis instance"]
2673 async fn test_set_confirmed_at() {
2674 let repo = setup_test_repo().await;
2675 let random_id = Uuid::new_v4().to_string();
2676 let tx = create_test_transaction(&random_id);
2677
2678 repo.create(tx).await.unwrap();
2679 let updated = repo
2680 .set_confirmed_at(
2681 random_id.to_string(),
2682 "2025-01-27T16:00:00.000000+00:00".to_string(),
2683 )
2684 .await
2685 .unwrap();
2686 assert_eq!(
2687 updated.confirmed_at,
2688 Some("2025-01-27T16:00:00.000000+00:00".to_string())
2689 );
2690 }
2691
2692 #[tokio::test]
2693 #[ignore = "Requires active Redis instance"]
2694 async fn test_update_network_data() {
2695 let repo = setup_test_repo().await;
2696 let random_id = Uuid::new_v4().to_string();
2697 let tx = create_test_transaction(&random_id);
2698
2699 repo.create(tx).await.unwrap();
2700
2701 let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
2702 gas_price: Some(2000000000),
2703 gas_limit: Some(42000),
2704 nonce: Some(2),
2705 value: U256::from_str("2000000000000000000").unwrap(),
2706 data: Some("0x1234".to_string()),
2707 from: "0xNewSender".to_string(),
2708 to: Some("0xNewRecipient".to_string()),
2709 chain_id: 1,
2710 signature: None,
2711 hash: Some("0xnewhash".to_string()),
2712 speed: Some(Speed::SafeLow),
2713 max_fee_per_gas: None,
2714 max_priority_fee_per_gas: None,
2715 raw: None,
2716 });
2717
2718 let updated = repo
2719 .update_network_data(random_id.to_string(), new_network_data.clone())
2720 .await
2721 .unwrap();
2722 assert_eq!(
2723 updated
2724 .network_data
2725 .get_evm_transaction_data()
2726 .unwrap()
2727 .hash,
2728 new_network_data.get_evm_transaction_data().unwrap().hash
2729 );
2730 }
2731
2732 #[tokio::test]
2733 #[ignore = "Requires active Redis instance"]
2734 async fn test_debug_implementation() {
2735 let repo = setup_test_repo().await;
2736 let debug_str = format!("{repo:?}");
2737 assert!(debug_str.contains("RedisTransactionRepository"));
2738 assert!(debug_str.contains("test_prefix"));
2739 }
2740
2741 #[tokio::test]
2742 #[ignore = "Requires active Redis instance"]
2743 async fn test_error_handling_empty_id() {
2744 let repo = setup_test_repo().await;
2745
2746 let result = repo.get_by_id("".to_string()).await;
2747 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2748
2749 let result = repo
2750 .update("".to_string(), create_test_transaction("test"))
2751 .await;
2752 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2753
2754 let result = repo.delete_by_id("".to_string()).await;
2755 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2756 }
2757
2758 #[tokio::test]
2759 #[ignore = "Requires active Redis instance"]
2760 async fn test_pagination_validation() {
2761 let repo = setup_test_repo().await;
2762
2763 let query = PaginationQuery {
2764 page: 1,
2765 per_page: 0,
2766 };
2767 let result = repo.list_paginated(query).await;
2768 assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2769 }
2770
2771 #[tokio::test]
2772 #[ignore = "Requires active Redis instance"]
2773 async fn test_index_consistency() {
2774 let repo = setup_test_repo().await;
2775 let random_id = Uuid::new_v4().to_string();
2776 let relayer_id = Uuid::new_v4().to_string();
2777 let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2778
2779 repo.create(tx.clone()).await.unwrap();
2781
2782 let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2784 assert!(found.is_some());
2785
2786 let mut updated_tx = tx.clone();
2788 if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
2789 evm_data.nonce = Some(43);
2790 }
2791
2792 repo.update(random_id.to_string(), updated_tx)
2793 .await
2794 .unwrap();
2795
2796 let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2798 assert!(old_nonce_result.is_none());
2799
2800 let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
2802 assert!(new_nonce_result.is_some());
2803 }
2804
2805 #[tokio::test]
2806 #[ignore = "Requires active Redis instance"]
2807 async fn test_has_entries() {
2808 let repo = setup_test_repo().await;
2809 assert!(!repo.has_entries().await.unwrap());
2810
2811 let tx_id = uuid::Uuid::new_v4().to_string();
2812 let tx = create_test_transaction(&tx_id);
2813 repo.create(tx.clone()).await.unwrap();
2814
2815 assert!(repo.has_entries().await.unwrap());
2816 }
2817
2818 #[tokio::test]
2819 #[ignore = "Requires active Redis instance"]
2820 async fn test_drop_all_entries() {
2821 let repo = setup_test_repo().await;
2822 let tx_id = uuid::Uuid::new_v4().to_string();
2823 let tx = create_test_transaction(&tx_id);
2824 repo.create(tx.clone()).await.unwrap();
2825 assert!(repo.has_entries().await.unwrap());
2826
2827 repo.drop_all_entries().await.unwrap();
2828 assert!(!repo.has_entries().await.unwrap());
2829 }
2830
2831 #[tokio::test]
2833 #[ignore = "Requires active Redis instance"]
2834 async fn test_update_status_sets_delete_at_for_final_statuses() {
2835 let _lock = ENV_MUTEX.lock().await;
2836
2837 use chrono::{DateTime, Duration, Utc};
2838 use std::env;
2839
2840 env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
2842
2843 let repo = setup_test_repo().await;
2844
2845 let final_statuses = [
2846 TransactionStatus::Canceled,
2847 TransactionStatus::Confirmed,
2848 TransactionStatus::Failed,
2849 TransactionStatus::Expired,
2850 ];
2851
2852 for (i, status) in final_statuses.iter().enumerate() {
2853 let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
2854 let mut tx = create_test_transaction(&tx_id);
2855
2856 tx.delete_at = None;
2858 tx.status = TransactionStatus::Pending;
2859
2860 repo.create(tx).await.unwrap();
2861
2862 let before_update = Utc::now();
2863
2864 let updated = repo
2866 .update_status(tx_id.clone(), status.clone())
2867 .await
2868 .unwrap();
2869
2870 assert!(
2872 updated.delete_at.is_some(),
2873 "delete_at should be set for status: {status:?}"
2874 );
2875
2876 let delete_at_str = updated.delete_at.unwrap();
2878 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
2879 .expect("delete_at should be valid RFC3339")
2880 .with_timezone(&Utc);
2881
2882 let duration_from_before = delete_at.signed_duration_since(before_update);
2883 let expected_duration = Duration::hours(6);
2884 let tolerance = Duration::minutes(5);
2885
2886 assert!(
2887 duration_from_before >= expected_duration - tolerance
2888 && duration_from_before <= expected_duration + tolerance,
2889 "delete_at should be approximately 6 hours from now for status: {status:?}. Duration: {duration_from_before:?}"
2890 );
2891 }
2892
2893 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2895 }
2896
2897 #[tokio::test]
2898 #[ignore = "Requires active Redis instance"]
2899 async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
2900 let _lock = ENV_MUTEX.lock().await;
2901
2902 use std::env;
2903
2904 env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
2905
2906 let repo = setup_test_repo().await;
2907
2908 let non_final_statuses = [
2909 TransactionStatus::Pending,
2910 TransactionStatus::Sent,
2911 TransactionStatus::Submitted,
2912 TransactionStatus::Mined,
2913 ];
2914
2915 for (i, status) in non_final_statuses.iter().enumerate() {
2916 let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
2917 let mut tx = create_test_transaction(&tx_id);
2918 tx.delete_at = None;
2919 tx.status = TransactionStatus::Pending;
2920
2921 repo.create(tx).await.unwrap();
2922
2923 let updated = repo
2925 .update_status(tx_id.clone(), status.clone())
2926 .await
2927 .unwrap();
2928
2929 assert!(
2931 updated.delete_at.is_none(),
2932 "delete_at should NOT be set for status: {status:?}"
2933 );
2934 }
2935
2936 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2938 }
2939
2940 #[tokio::test]
2941 #[ignore = "Requires active Redis instance"]
2942 async fn test_partial_update_sets_delete_at_for_final_statuses() {
2943 let _lock = ENV_MUTEX.lock().await;
2944
2945 use chrono::{DateTime, Duration, Utc};
2946 use std::env;
2947
2948 env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
2949
2950 let repo = setup_test_repo().await;
2951 let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
2952 let mut tx = create_test_transaction(&tx_id);
2953 tx.delete_at = None;
2954 tx.status = TransactionStatus::Pending;
2955
2956 repo.create(tx).await.unwrap();
2957
2958 let before_update = Utc::now();
2959
2960 let update = TransactionUpdateRequest {
2962 status: Some(TransactionStatus::Confirmed),
2963 status_reason: Some("Transaction completed".to_string()),
2964 confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
2965 ..Default::default()
2966 };
2967
2968 let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
2969
2970 assert!(
2972 updated.delete_at.is_some(),
2973 "delete_at should be set when updating to Confirmed status"
2974 );
2975
2976 let delete_at_str = updated.delete_at.unwrap();
2978 let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
2979 .expect("delete_at should be valid RFC3339")
2980 .with_timezone(&Utc);
2981
2982 let duration_from_before = delete_at.signed_duration_since(before_update);
2983 let expected_duration = Duration::hours(8);
2984 let tolerance = Duration::minutes(5);
2985
2986 assert!(
2987 duration_from_before >= expected_duration - tolerance
2988 && duration_from_before <= expected_duration + tolerance,
2989 "delete_at should be approximately 8 hours from now. Duration: {duration_from_before:?}"
2990 );
2991
2992 assert_eq!(updated.status, TransactionStatus::Confirmed);
2994 assert_eq!(
2995 updated.status_reason,
2996 Some("Transaction completed".to_string())
2997 );
2998 assert_eq!(
2999 updated.confirmed_at,
3000 Some("2023-01-01T12:05:00Z".to_string())
3001 );
3002
3003 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3005 }
3006
3007 #[tokio::test]
3008 #[ignore = "Requires active Redis instance"]
3009 async fn test_update_status_preserves_existing_delete_at() {
3010 let _lock = ENV_MUTEX.lock().await;
3011
3012 use std::env;
3013
3014 env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
3015
3016 let repo = setup_test_repo().await;
3017 let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
3018 let mut tx = create_test_transaction(&tx_id);
3019
3020 let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
3022 tx.delete_at = Some(existing_delete_at.clone());
3023 tx.status = TransactionStatus::Pending;
3024
3025 repo.create(tx).await.unwrap();
3026
3027 let updated = repo
3029 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3030 .await
3031 .unwrap();
3032
3033 assert_eq!(
3035 updated.delete_at,
3036 Some(existing_delete_at),
3037 "Existing delete_at should be preserved when updating to final status"
3038 );
3039
3040 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3042 }
3043 #[tokio::test]
3044 #[ignore = "Requires active Redis instance"]
3045 async fn test_partial_update_without_status_change_preserves_delete_at() {
3046 let _lock = ENV_MUTEX.lock().await;
3047
3048 use std::env;
3049
3050 env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
3051
3052 let repo = setup_test_repo().await;
3053 let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
3054 let mut tx = create_test_transaction(&tx_id);
3055 tx.delete_at = None;
3056 tx.status = TransactionStatus::Pending;
3057
3058 repo.create(tx).await.unwrap();
3059
3060 let updated1 = repo
3062 .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3063 .await
3064 .unwrap();
3065
3066 assert!(updated1.delete_at.is_some());
3067 let original_delete_at = updated1.delete_at.clone();
3068
3069 let update = TransactionUpdateRequest {
3071 status: None, status_reason: Some("Updated reason".to_string()),
3073 confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
3074 ..Default::default()
3075 };
3076
3077 let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
3078
3079 assert_eq!(
3081 updated2.delete_at, original_delete_at,
3082 "delete_at should be preserved when status is not updated"
3083 );
3084
3085 assert_eq!(updated2.status, TransactionStatus::Confirmed); assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
3088 assert_eq!(
3089 updated2.confirmed_at,
3090 Some("2023-01-01T12:10:00Z".to_string())
3091 );
3092
3093 env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3095 }
3096
3097 #[tokio::test]
3100 #[ignore = "Requires active Redis instance"]
3101 async fn test_delete_by_ids_empty_list() {
3102 let repo = setup_test_repo().await;
3103 let tx_id = format!("test-empty-{}", Uuid::new_v4());
3104
3105 let tx = create_test_transaction(&tx_id);
3107 repo.create(tx).await.unwrap();
3108
3109 let result = repo.delete_by_ids(vec![]).await.unwrap();
3111
3112 assert_eq!(result.deleted_count, 0);
3113 assert!(result.failed.is_empty());
3114
3115 assert!(repo.get_by_id(tx_id).await.is_ok());
3117 }
3118
3119 #[tokio::test]
3120 #[ignore = "Requires active Redis instance"]
3121 async fn test_delete_by_ids_single_transaction() {
3122 let repo = setup_test_repo().await;
3123 let tx_id = format!("test-single-{}", Uuid::new_v4());
3124
3125 let tx = create_test_transaction(&tx_id);
3126 repo.create(tx).await.unwrap();
3127
3128 let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3129
3130 assert_eq!(result.deleted_count, 1);
3131 assert!(result.failed.is_empty());
3132
3133 assert!(repo.get_by_id(tx_id).await.is_err());
3135 }
3136
3137 #[tokio::test]
3138 #[ignore = "Requires active Redis instance"]
3139 async fn test_delete_by_ids_multiple_transactions() {
3140 let repo = setup_test_repo().await;
3141 let base_id = Uuid::new_v4();
3142
3143 let mut created_ids = Vec::new();
3145 for i in 1..=5 {
3146 let tx_id = format!("test-multi-{base_id}-{i}");
3147 let tx = create_test_transaction(&tx_id);
3148 repo.create(tx).await.unwrap();
3149 created_ids.push(tx_id);
3150 }
3151
3152 let ids_to_delete = vec![
3154 created_ids[0].clone(),
3155 created_ids[2].clone(),
3156 created_ids[4].clone(),
3157 ];
3158 let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3159
3160 assert_eq!(result.deleted_count, 3);
3161 assert!(result.failed.is_empty());
3162
3163 assert!(repo.get_by_id(created_ids[0].clone()).await.is_err());
3165 assert!(repo.get_by_id(created_ids[1].clone()).await.is_ok()); assert!(repo.get_by_id(created_ids[2].clone()).await.is_err());
3167 assert!(repo.get_by_id(created_ids[3].clone()).await.is_ok()); assert!(repo.get_by_id(created_ids[4].clone()).await.is_err());
3169 }
3170
3171 #[tokio::test]
3172 #[ignore = "Requires active Redis instance"]
3173 async fn test_delete_by_ids_nonexistent_transactions() {
3174 let repo = setup_test_repo().await;
3175 let base_id = Uuid::new_v4();
3176
3177 let ids_to_delete = vec![
3179 format!("nonexistent-{}-1", base_id),
3180 format!("nonexistent-{}-2", base_id),
3181 ];
3182 let result = repo.delete_by_ids(ids_to_delete.clone()).await.unwrap();
3183
3184 assert_eq!(result.deleted_count, 0);
3185 assert_eq!(result.failed.len(), 2);
3186
3187 let failed_ids: Vec<&String> = result.failed.iter().map(|(id, _)| id).collect();
3189 assert!(failed_ids.contains(&&ids_to_delete[0]));
3190 assert!(failed_ids.contains(&&ids_to_delete[1]));
3191 }
3192
3193 #[tokio::test]
3194 #[ignore = "Requires active Redis instance"]
3195 async fn test_delete_by_ids_mixed_existing_and_nonexistent() {
3196 let repo = setup_test_repo().await;
3197 let base_id = Uuid::new_v4();
3198
3199 let existing_ids: Vec<String> = (1..=3)
3201 .map(|i| format!("test-mixed-existing-{base_id}-{i}"))
3202 .collect();
3203
3204 for id in &existing_ids {
3205 let tx = create_test_transaction(id);
3206 repo.create(tx).await.unwrap();
3207 }
3208
3209 let nonexistent_ids: Vec<String> = (1..=2)
3210 .map(|i| format!("test-mixed-nonexistent-{base_id}-{i}"))
3211 .collect();
3212
3213 let ids_to_delete = vec![
3215 existing_ids[0].clone(),
3216 nonexistent_ids[0].clone(),
3217 existing_ids[1].clone(),
3218 nonexistent_ids[1].clone(),
3219 ];
3220 let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3221
3222 assert_eq!(result.deleted_count, 2);
3223 assert_eq!(result.failed.len(), 2);
3224
3225 assert!(repo.get_by_id(existing_ids[0].clone()).await.is_err());
3227 assert!(repo.get_by_id(existing_ids[1].clone()).await.is_err());
3228
3229 assert!(repo.get_by_id(existing_ids[2].clone()).await.is_ok());
3231 }
3232
3233 #[tokio::test]
3234 #[ignore = "Requires active Redis instance"]
3235 async fn test_delete_by_ids_removes_all_indexes() {
3236 let repo = setup_test_repo().await;
3237 let relayer_id = format!("relayer-{}", Uuid::new_v4());
3238 let tx_id = format!("test-indexes-{}", Uuid::new_v4());
3239
3240 let mut tx = create_test_transaction(&tx_id);
3242 tx.relayer_id = relayer_id.clone();
3243 tx.status = TransactionStatus::Confirmed;
3244 repo.create(tx).await.unwrap();
3245
3246 let found = repo
3248 .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3249 .await
3250 .unwrap();
3251 assert!(found.iter().any(|t| t.id == tx_id));
3252
3253 let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3255 assert_eq!(result.deleted_count, 1);
3256
3257 let found_after = repo
3259 .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3260 .await
3261 .unwrap();
3262 assert!(!found_after.iter().any(|t| t.id == tx_id));
3263
3264 assert!(repo.get_by_id(tx_id).await.is_err());
3266 }
3267
3268 #[tokio::test]
3269 #[ignore = "Requires active Redis instance"]
3270 async fn test_delete_by_ids_removes_nonce_index() {
3271 let repo = setup_test_repo().await;
3272 let relayer_id = format!("relayer-{}", Uuid::new_v4());
3273 let tx_id = format!("test-nonce-{}", Uuid::new_v4());
3274 let nonce = 12345u64;
3275
3276 let tx = create_test_transaction_with_nonce(&tx_id, nonce, &relayer_id);
3278 repo.create(tx).await.unwrap();
3279
3280 let found = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3282 assert!(found.is_some());
3283 assert_eq!(found.unwrap().id, tx_id);
3284
3285 let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3287 assert_eq!(result.deleted_count, 1);
3288
3289 let found_after = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3291 assert!(found_after.is_none());
3292 }
3293
3294 #[tokio::test]
3295 #[ignore = "Requires active Redis instance"]
3296 async fn test_delete_by_ids_large_batch() {
3297 let repo = setup_test_repo().await;
3298 let base_id = Uuid::new_v4();
3299
3300 let count = 50;
3302 let mut created_ids = Vec::new();
3303
3304 for i in 0..count {
3305 let tx_id = format!("test-large-{base_id}-{i}");
3306 let tx = create_test_transaction(&tx_id);
3307 repo.create(tx).await.unwrap();
3308 created_ids.push(tx_id);
3309 }
3310
3311 let result = repo.delete_by_ids(created_ids.clone()).await.unwrap();
3313
3314 assert_eq!(result.deleted_count, count);
3315 assert!(result.failed.is_empty());
3316
3317 for id in created_ids {
3319 assert!(repo.get_by_id(id).await.is_err());
3320 }
3321 }
3322
3323 #[tokio::test]
3324 #[ignore = "Requires active Redis instance"]
3325 async fn test_delete_by_ids_preserves_other_relayer_transactions() {
3326 let repo = setup_test_repo().await;
3327 let relayer_1 = format!("relayer-1-{}", Uuid::new_v4());
3328 let relayer_2 = format!("relayer-2-{}", Uuid::new_v4());
3329 let tx_id_1 = format!("tx-relayer-1-{}", Uuid::new_v4());
3330 let tx_id_2 = format!("tx-relayer-2-{}", Uuid::new_v4());
3331
3332 let tx1 = create_test_transaction_with_relayer(&tx_id_1, &relayer_1);
3334 let tx2 = create_test_transaction_with_relayer(&tx_id_2, &relayer_2);
3335
3336 repo.create(tx1).await.unwrap();
3337 repo.create(tx2).await.unwrap();
3338
3339 let result = repo.delete_by_ids(vec![tx_id_1.clone()]).await.unwrap();
3341
3342 assert_eq!(result.deleted_count, 1);
3343
3344 assert!(repo.get_by_id(tx_id_1).await.is_err());
3346
3347 let remaining = repo.get_by_id(tx_id_2).await.unwrap();
3349 assert_eq!(remaining.relayer_id, relayer_2);
3350 }
3351}