openzeppelin_relayer/repositories/transaction/
transaction_redis.rs

1//! Redis-backed implementation of the TransactionRepository.
2
3use crate::domain::transaction::common::is_final_state;
4use crate::metrics::{
5    TRANSACTIONS_BY_STATUS, TRANSACTIONS_CREATED, TRANSACTIONS_FAILED, TRANSACTIONS_SUBMITTED,
6    TRANSACTIONS_SUCCESS, TRANSACTION_PROCESSING_TIME,
7};
8use crate::models::{
9    NetworkTransactionData, PaginationQuery, RepositoryError, TransactionRepoModel,
10    TransactionStatus, TransactionUpdateRequest,
11};
12use crate::repositories::redis_base::RedisRepository;
13use crate::repositories::{
14    BatchDeleteResult, BatchRetrievalResult, PaginatedResult, Repository, TransactionDeleteRequest,
15    TransactionRepository,
16};
17use crate::utils::RedisConnections;
18use async_trait::async_trait;
19use chrono::Utc;
20use redis::{AsyncCommands, Script};
21use std::fmt;
22use std::sync::Arc;
23use tracing::{debug, error, warn};
24
25const RELAYER_PREFIX: &str = "relayer";
26const TX_PREFIX: &str = "tx";
27const STATUS_PREFIX: &str = "status";
28const STATUS_SORTED_PREFIX: &str = "status_sorted";
29const NONCE_PREFIX: &str = "nonce";
30const TX_TO_RELAYER_PREFIX: &str = "tx_to_relayer";
31const RELAYER_LIST_KEY: &str = "relayer_list";
32const TX_BY_CREATED_AT_PREFIX: &str = "tx_by_created_at";
33
34#[derive(Clone)]
35pub struct RedisTransactionRepository {
36    pub connections: Arc<RedisConnections>,
37    pub key_prefix: String,
38}
39
40impl RedisRepository for RedisTransactionRepository {}
41
42impl RedisTransactionRepository {
43    pub fn new(
44        connections: Arc<RedisConnections>,
45        key_prefix: String,
46    ) -> Result<Self, RepositoryError> {
47        if key_prefix.is_empty() {
48            return Err(RepositoryError::InvalidData(
49                "Redis key prefix cannot be empty".to_string(),
50            ));
51        }
52
53        Ok(Self {
54            connections,
55            key_prefix,
56        })
57    }
58
59    /// Generate key for transaction data: relayer:{relayer_id}:tx:{tx_id}
60    fn tx_key(&self, relayer_id: &str, tx_id: &str) -> String {
61        format!(
62            "{}:{}:{}:{}:{}",
63            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX, tx_id
64        )
65    }
66
67    /// Generate key for reverse lookup: tx_to_relayer:{tx_id}
68    fn tx_to_relayer_key(&self, tx_id: &str) -> String {
69        format!(
70            "{}:{}:{}:{}",
71            self.key_prefix, RELAYER_PREFIX, TX_TO_RELAYER_PREFIX, tx_id
72        )
73    }
74
75    /// Generate key for relayer status index (legacy SET): relayer:{relayer_id}:status:{status}
76    fn relayer_status_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
77        format!(
78            "{}:{}:{}:{}:{}",
79            self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_PREFIX, status
80        )
81    }
82
83    /// Generate key for relayer status sorted index (SORTED SET): relayer:{relayer_id}:status_sorted:{status}
84    /// Score is created_at timestamp in milliseconds for efficient ordering.
85    fn relayer_status_sorted_key(&self, relayer_id: &str, status: &TransactionStatus) -> String {
86        format!(
87            "{}:{}:{}:{}:{}",
88            self.key_prefix, RELAYER_PREFIX, relayer_id, STATUS_SORTED_PREFIX, status
89        )
90    }
91
92    /// Generate key for relayer nonce index: relayer:{relayer_id}:nonce:{nonce}
93    fn relayer_nonce_key(&self, relayer_id: &str, nonce: u64) -> String {
94        format!(
95            "{}:{}:{}:{}:{}",
96            self.key_prefix, RELAYER_PREFIX, relayer_id, NONCE_PREFIX, nonce
97        )
98    }
99
100    /// Generate key for relayer list: relayer_list (set of all relayer IDs)
101    fn relayer_list_key(&self) -> String {
102        format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
103    }
104
105    /// Generate key for relayer's sorted set by created_at: relayer:{relayer_id}:tx_by_created_at
106    fn relayer_tx_by_created_at_key(&self, relayer_id: &str) -> String {
107        format!(
108            "{}:{}:{}:{}",
109            self.key_prefix, RELAYER_PREFIX, relayer_id, TX_BY_CREATED_AT_PREFIX
110        )
111    }
112
113    /// Parse timestamp string to score for sorted set (milliseconds since epoch)
114    fn timestamp_to_score(&self, timestamp: &str) -> f64 {
115        chrono::DateTime::parse_from_rfc3339(timestamp)
116            .map(|dt| dt.timestamp_millis() as f64)
117            .unwrap_or_else(|_| {
118                warn!(timestamp = %timestamp, "failed to parse timestamp, using 0");
119                0.0
120            })
121    }
122
123    /// Compute the appropriate score for a transaction's status sorted set.
124    /// - For Confirmed status: use confirmed_at (on-chain confirmation order)
125    /// - For all other statuses: use created_at (queue/processing order)
126    fn status_sorted_score(&self, tx: &TransactionRepoModel) -> f64 {
127        if tx.status == TransactionStatus::Confirmed {
128            // For Confirmed, prefer confirmed_at for accurate on-chain ordering
129            if let Some(ref confirmed_at) = tx.confirmed_at {
130                return self.timestamp_to_score(confirmed_at);
131            }
132            // Fallback to created_at if confirmed_at not set (shouldn't happen)
133            warn!(tx_id = %tx.id, "Confirmed transaction missing confirmed_at, using created_at");
134        }
135        self.timestamp_to_score(&tx.created_at)
136    }
137
138    /// Batch fetch transactions by IDs using reverse lookup
139    async fn get_transactions_by_ids(
140        &self,
141        ids: &[String],
142    ) -> Result<BatchRetrievalResult<TransactionRepoModel>, RepositoryError> {
143        if ids.is_empty() {
144            debug!("no transaction IDs provided for batch fetch");
145            return Ok(BatchRetrievalResult {
146                results: vec![],
147                failed_ids: vec![],
148            });
149        }
150
151        let mut conn = self
152            .get_connection(self.connections.reader(), "batch_fetch_transactions")
153            .await?;
154
155        let reverse_keys: Vec<String> = ids.iter().map(|id| self.tx_to_relayer_key(id)).collect();
156
157        debug!(count = %ids.len(), "fetching relayer IDs for transactions");
158
159        let relayer_ids: Vec<Option<String>> = conn
160            .mget(&reverse_keys)
161            .await
162            .map_err(|e| self.map_redis_error(e, "batch_fetch_relayer_ids"))?;
163
164        let mut tx_keys = Vec::new();
165        let mut valid_ids = Vec::new();
166        let mut failed_ids = Vec::new();
167        for (i, relayer_id) in relayer_ids.into_iter().enumerate() {
168            match relayer_id {
169                Some(relayer_id) => {
170                    tx_keys.push(self.tx_key(&relayer_id, &ids[i]));
171                    valid_ids.push(ids[i].clone());
172                }
173                None => {
174                    warn!(tx_id = %ids[i], "no relayer found for transaction");
175                    failed_ids.push(ids[i].clone());
176                }
177            }
178        }
179
180        if tx_keys.is_empty() {
181            debug!("no valid transactions found for batch fetch");
182            return Ok(BatchRetrievalResult {
183                results: vec![],
184                failed_ids,
185            });
186        }
187
188        debug!(count = %tx_keys.len(), "batch fetching transaction data");
189
190        let values: Vec<Option<String>> = conn
191            .mget(&tx_keys)
192            .await
193            .map_err(|e| self.map_redis_error(e, "batch_fetch_transactions"))?;
194
195        let mut transactions = Vec::new();
196        let mut failed_count = 0;
197        let mut failed_ids = Vec::new();
198        for (i, value) in values.into_iter().enumerate() {
199            match value {
200                Some(json) => {
201                    match self.deserialize_entity::<TransactionRepoModel>(
202                        &json,
203                        &valid_ids[i],
204                        "transaction",
205                    ) {
206                        Ok(tx) => transactions.push(tx),
207                        Err(e) => {
208                            failed_count += 1;
209                            error!(tx_id = %valid_ids[i], error = %e, "failed to deserialize transaction");
210                            // Continue processing other transactions
211                        }
212                    }
213                }
214                None => {
215                    warn!(tx_id = %valid_ids[i], "transaction not found in batch fetch");
216                    failed_ids.push(valid_ids[i].clone());
217                }
218            }
219        }
220
221        if failed_count > 0 {
222            warn!(failed_count = %failed_count, total_count = %valid_ids.len(), "failed to deserialize transactions in batch");
223        }
224
225        debug!(count = %transactions.len(), "successfully fetched transactions");
226        Ok(BatchRetrievalResult {
227            results: transactions,
228            failed_ids,
229        })
230    }
231
232    /// Extract nonce from EVM transaction data
233    fn extract_nonce(&self, network_data: &NetworkTransactionData) -> Option<u64> {
234        match network_data.get_evm_transaction_data() {
235            Ok(tx_data) => tx_data.nonce,
236            Err(_) => {
237                debug!("no EVM transaction data available for nonce extraction");
238                None
239            }
240        }
241    }
242
243    /// Ensures the status sorted set exists, migrating from legacy SET if needed.
244    ///
245    /// This handles the transition from unordered SETs to sorted SETs for status indexing.
246    /// If the sorted set is empty but the legacy set has data, it migrates the data
247    /// by looking up each transaction's created_at timestamp to compute the score.
248    ///
249    /// # Concurrency
250    /// This function is safe for concurrent calls. If multiple calls race to migrate
251    /// the same status set:
252    /// - ZADD is idempotent (same member + score = no-op)
253    /// - DEL on non-existent key is safe (returns 0)
254    /// - After first successful migration, subsequent calls hit the fast path (ZCARD > 0)
255    ///
256    /// The only downside of concurrent migrations is wasted work, not data corruption.
257    ///
258    /// Returns the count of items in the sorted set after migration.
259    async fn ensure_status_sorted_set(
260        &self,
261        relayer_id: &str,
262        status: &TransactionStatus,
263    ) -> Result<u64, RepositoryError> {
264        let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
265        let legacy_key = self.relayer_status_key(relayer_id, status);
266
267        // Phase 1: Check if migration is needed
268        let legacy_ids = {
269            let mut conn = self
270                .get_connection(self.connections.primary(), "ensure_status_sorted_set_check")
271                .await?;
272
273            // Always check if legacy set has data that needs migration
274            let legacy_count: u64 = conn
275                .scard(&legacy_key)
276                .await
277                .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_scard"))?;
278
279            if legacy_count == 0 {
280                // No legacy data to migrate, return current ZSET count
281                let sorted_count: u64 = conn
282                    .zcard(&sorted_key)
283                    .await
284                    .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_zcard"))?;
285                return Ok(sorted_count);
286            }
287
288            // Migration needed: get all IDs from legacy set
289            debug!(
290                relayer_id = %relayer_id,
291                status = %status,
292                legacy_count = %legacy_count,
293                "migrating status set to sorted set"
294            );
295
296            let ids: Vec<String> = conn
297                .smembers(&legacy_key)
298                .await
299                .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_smembers"))?;
300
301            ids
302            // Connection dropped here before nested call to avoid connection doubling
303        };
304
305        if legacy_ids.is_empty() {
306            return Ok(0);
307        }
308
309        // Phase 2: Fetch transactions (uses its own connection internally)
310        let transactions = self.get_transactions_by_ids(&legacy_ids).await?;
311
312        // Phase 3: Perform migration with a new connection
313        let mut conn = self
314            .get_connection(
315                self.connections.primary(),
316                "ensure_status_sorted_set_migrate",
317            )
318            .await?;
319
320        if transactions.results.is_empty() {
321            // All transactions were stale/deleted, clean up legacy set
322            let _: () = conn
323                .del(&legacy_key)
324                .await
325                .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_del_stale"))?;
326            return Ok(0);
327        }
328
329        // Build sorted set entries and migrate atomically
330        // Use status-aware scoring: confirmed_at for Confirmed, created_at for others
331        let mut pipe = redis::pipe();
332        pipe.atomic();
333
334        for tx in &transactions.results {
335            let score = self.status_sorted_score(tx);
336            pipe.zadd(&sorted_key, &tx.id, score);
337        }
338
339        // Delete legacy set after migration
340        pipe.del(&legacy_key);
341
342        pipe.query_async::<()>(&mut conn)
343            .await
344            .map_err(|e| self.map_redis_error(e, "ensure_status_sorted_set_migrate"))?;
345
346        let migrated_count = transactions.results.len() as u64;
347        debug!(
348            relayer_id = %relayer_id,
349            status = %status,
350            migrated_count = %migrated_count,
351            "completed migration of status set to sorted set"
352        );
353
354        Ok(migrated_count)
355    }
356
357    /// Update indexes atomically with comprehensive error handling
358    async fn update_indexes(
359        &self,
360        tx: &TransactionRepoModel,
361        old_tx: Option<&TransactionRepoModel>,
362    ) -> Result<(), RepositoryError> {
363        let mut conn = self
364            .get_connection(self.connections.primary(), "update_indexes")
365            .await?;
366        let mut pipe = redis::pipe();
367        pipe.atomic();
368
369        debug!(tx_id = %tx.id, "updating indexes for transaction");
370
371        // Add relayer to the global relayer list
372        let relayer_list_key = self.relayer_list_key();
373        pipe.sadd(&relayer_list_key, &tx.relayer_id);
374
375        // Compute scores for sorted sets
376        // Status sorted set: uses confirmed_at for Confirmed status, created_at for others
377        let status_score = self.status_sorted_score(tx);
378        // Global tx_by_created_at: always uses created_at for consistent ordering
379        let created_at_score = self.timestamp_to_score(&tx.created_at);
380
381        // Handle status index updates - write to SORTED SET (new format)
382        let new_status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, &tx.status);
383        pipe.zadd(&new_status_sorted_key, &tx.id, status_score);
384        debug!(tx_id = %tx.id, status = %tx.status, score = %status_score, "added transaction to status sorted set");
385
386        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
387            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
388            pipe.set(&nonce_key, &tx.id);
389            debug!(tx_id = %tx.id, nonce = %nonce, "added nonce index for transaction");
390        }
391
392        // Add to per-relayer sorted set by created_at (for efficient sorted pagination)
393        let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
394        pipe.zadd(&relayer_sorted_key, &tx.id, created_at_score);
395        debug!(tx_id = %tx.id, score = %created_at_score, "added transaction to sorted set by created_at");
396
397        // Remove old indexes if updating
398        if let Some(old) = old_tx {
399            if old.status != tx.status {
400                // Remove from old status sorted set (new format)
401                let old_status_sorted_key =
402                    self.relayer_status_sorted_key(&old.relayer_id, &old.status);
403                pipe.zrem(&old_status_sorted_key, &tx.id);
404
405                // Also clean up legacy SET if it exists (for migration cleanup)
406                let old_status_legacy_key = self.relayer_status_key(&old.relayer_id, &old.status);
407                pipe.srem(&old_status_legacy_key, &tx.id);
408
409                debug!(tx_id = %tx.id, old_status = %old.status, new_status = %tx.status, "removing old status indexes for transaction");
410            }
411
412            // Handle nonce index cleanup
413            if let Some(old_nonce) = self.extract_nonce(&old.network_data) {
414                let new_nonce = self.extract_nonce(&tx.network_data);
415                if Some(old_nonce) != new_nonce {
416                    let old_nonce_key = self.relayer_nonce_key(&old.relayer_id, old_nonce);
417                    pipe.del(&old_nonce_key);
418                    debug!(tx_id = %tx.id, old_nonce = %old_nonce, new_nonce = ?new_nonce, "removing old nonce index for transaction");
419                }
420            }
421        }
422
423        // Execute all operations in a single pipeline
424        pipe.exec_async(&mut conn).await.map_err(|e| {
425            error!(tx_id = %tx.id, error = %e, "index update pipeline failed for transaction");
426            self.map_redis_error(e, &format!("update_indexes_for_tx_{}", tx.id))
427        })?;
428
429        debug!(tx_id = %tx.id, "successfully updated indexes for transaction");
430        Ok(())
431    }
432
433    /// Remove all indexes with error recovery
434    async fn remove_all_indexes(&self, tx: &TransactionRepoModel) -> Result<(), RepositoryError> {
435        let mut conn = self
436            .get_connection(self.connections.primary(), "remove_all_indexes")
437            .await?;
438        let mut pipe = redis::pipe();
439        pipe.atomic();
440
441        debug!(tx_id = %tx.id, "removing all indexes for transaction");
442
443        // Remove from ALL possible status indexes to ensure complete cleanup
444        // This handles cases where a transaction might be in multiple status sets
445        // due to race conditions, partial failures, or bugs
446        for status in &[
447            TransactionStatus::Canceled,
448            TransactionStatus::Pending,
449            TransactionStatus::Sent,
450            TransactionStatus::Submitted,
451            TransactionStatus::Mined,
452            TransactionStatus::Confirmed,
453            TransactionStatus::Failed,
454            TransactionStatus::Expired,
455        ] {
456            // Remove from sorted status set (new format)
457            let status_sorted_key = self.relayer_status_sorted_key(&tx.relayer_id, status);
458            pipe.zrem(&status_sorted_key, &tx.id);
459
460            // Remove from legacy status set (for migration cleanup)
461            let status_legacy_key = self.relayer_status_key(&tx.relayer_id, status);
462            pipe.srem(&status_legacy_key, &tx.id);
463        }
464
465        // Remove nonce index if exists
466        if let Some(nonce) = self.extract_nonce(&tx.network_data) {
467            let nonce_key = self.relayer_nonce_key(&tx.relayer_id, nonce);
468            pipe.del(&nonce_key);
469            debug!(tx_id = %tx.id, nonce = %nonce, "removing nonce index for transaction");
470        }
471
472        // Remove from per-relayer sorted set by created_at
473        let relayer_sorted_key = self.relayer_tx_by_created_at_key(&tx.relayer_id);
474        pipe.zrem(&relayer_sorted_key, &tx.id);
475        debug!(tx_id = %tx.id, "removing transaction from sorted set by created_at");
476
477        // Remove reverse lookup
478        let reverse_key = self.tx_to_relayer_key(&tx.id);
479        pipe.del(&reverse_key);
480
481        pipe.exec_async(&mut conn).await.map_err(|e| {
482            error!(tx_id = %tx.id, error = %e, "index removal failed for transaction");
483            self.map_redis_error(e, &format!("remove_indexes_for_tx_{}", tx.id))
484        })?;
485
486        debug!(tx_id = %tx.id, "successfully removed all indexes for transaction");
487        Ok(())
488    }
489}
490
491impl fmt::Debug for RedisTransactionRepository {
492    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493        f.debug_struct("RedisTransactionRepository")
494            .field("connections", &"<RedisConnections>")
495            .field("key_prefix", &self.key_prefix)
496            .finish()
497    }
498}
499
500#[async_trait]
501impl Repository<TransactionRepoModel, String> for RedisTransactionRepository {
502    async fn create(
503        &self,
504        entity: TransactionRepoModel,
505    ) -> Result<TransactionRepoModel, RepositoryError> {
506        if entity.id.is_empty() {
507            return Err(RepositoryError::InvalidData(
508                "Transaction ID cannot be empty".to_string(),
509            ));
510        }
511
512        let key = self.tx_key(&entity.relayer_id, &entity.id);
513        let reverse_key = self.tx_to_relayer_key(&entity.id);
514        let mut conn = self
515            .get_connection(self.connections.primary(), "create")
516            .await?;
517
518        debug!(tx_id = %entity.id, "creating transaction");
519
520        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
521
522        // Check if transaction already exists by checking reverse lookup
523        let existing: Option<String> = conn
524            .get(&reverse_key)
525            .await
526            .map_err(|e| self.map_redis_error(e, "create_transaction_check"))?;
527
528        if existing.is_some() {
529            return Err(RepositoryError::ConstraintViolation(format!(
530                "Transaction with ID {} already exists",
531                entity.id
532            )));
533        }
534
535        // Use atomic pipeline for consistency
536        let mut pipe = redis::pipe();
537        pipe.atomic();
538        pipe.set(&key, &value);
539        pipe.set(&reverse_key, &entity.relayer_id);
540
541        pipe.exec_async(&mut conn)
542            .await
543            .map_err(|e| self.map_redis_error(e, "create_transaction"))?;
544
545        // Update indexes separately to handle partial failures gracefully
546        if let Err(e) = self.update_indexes(&entity, None).await {
547            error!(tx_id = %entity.id, error = %e, "failed to update indexes for new transaction");
548            return Err(e);
549        }
550
551        // Track transaction creation metric
552        let network_type = format!("{:?}", entity.network_type).to_lowercase();
553        let relayer_id = entity.relayer_id.as_str();
554        TRANSACTIONS_CREATED
555            .with_label_values(&[relayer_id, &network_type])
556            .inc();
557
558        // Track initial status distribution (Pending)
559        let status = &entity.status;
560        let status_str = format!("{status:?}").to_lowercase();
561        TRANSACTIONS_BY_STATUS
562            .with_label_values(&[relayer_id, &network_type, &status_str])
563            .inc();
564
565        debug!(tx_id = %entity.id, "successfully created transaction");
566        Ok(entity)
567    }
568
569    async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
570        if id.is_empty() {
571            return Err(RepositoryError::InvalidData(
572                "Transaction ID cannot be empty".to_string(),
573            ));
574        }
575
576        let mut conn = self
577            .get_connection(self.connections.reader(), "get_by_id")
578            .await?;
579
580        debug!(tx_id = %id, "fetching transaction");
581
582        let reverse_key = self.tx_to_relayer_key(&id);
583        let relayer_id: Option<String> = conn
584            .get(&reverse_key)
585            .await
586            .map_err(|e| self.map_redis_error(e, "get_transaction_reverse_lookup"))?;
587
588        let relayer_id = match relayer_id {
589            Some(relayer_id) => relayer_id,
590            None => {
591                debug!(tx_id = %id, "transaction not found (no reverse lookup)");
592                return Err(RepositoryError::NotFound(format!(
593                    "Transaction with ID {id} not found"
594                )));
595            }
596        };
597
598        let key = self.tx_key(&relayer_id, &id);
599        let value: Option<String> = conn
600            .get(&key)
601            .await
602            .map_err(|e| self.map_redis_error(e, "get_transaction_by_id"))?;
603
604        match value {
605            Some(json) => {
606                let tx =
607                    self.deserialize_entity::<TransactionRepoModel>(&json, &id, "transaction")?;
608                debug!(tx_id = %id, "successfully fetched transaction");
609                Ok(tx)
610            }
611            None => {
612                debug!(tx_id = %id, "transaction not found");
613                Err(RepositoryError::NotFound(format!(
614                    "Transaction with ID {id} not found"
615                )))
616            }
617        }
618    }
619
620    // Unoptimized implementation of list_paginated. Rarely used. find_by_relayer_id is preferred.
621    async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
622        let mut conn = self
623            .get_connection(self.connections.reader(), "list_all")
624            .await?;
625
626        debug!("fetching all transactions sorted by created_at (newest first)");
627
628        // Get all relayer IDs
629        let relayer_list_key = self.relayer_list_key();
630        let relayer_ids: Vec<String> = conn
631            .smembers(&relayer_list_key)
632            .await
633            .map_err(|e| self.map_redis_error(e, "list_all_relayer_ids"))?;
634
635        debug!(count = %relayer_ids.len(), "found relayers");
636
637        // Collect all transaction IDs from all relayers using their sorted sets
638        let mut all_tx_ids = Vec::new();
639        for relayer_id in relayer_ids {
640            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
641            let tx_ids: Vec<String> = redis::cmd("ZRANGE")
642                .arg(&relayer_sorted_key)
643                .arg(0)
644                .arg(-1)
645                .arg("REV")
646                .query_async(&mut conn)
647                .await
648                .map_err(|e| self.map_redis_error(e, "list_all_relayer_sorted"))?;
649
650            all_tx_ids.extend(tx_ids);
651        }
652
653        // Release connection before nested call to avoid connection doubling
654        drop(conn);
655
656        // Batch fetch all transactions at once
657        let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
658        let mut all_transactions = batch_result.results;
659
660        // Sort all transactions by created_at (newest first)
661        all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
662
663        debug!(count = %all_transactions.len(), "found transactions");
664        Ok(all_transactions)
665    }
666
667    // Unoptimized implementation of list_paginated. Rarely used. find_by_relayer_id is preferred.
668    async fn list_paginated(
669        &self,
670        query: PaginationQuery,
671    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
672        if query.per_page == 0 {
673            return Err(RepositoryError::InvalidData(
674                "per_page must be greater than 0".to_string(),
675            ));
676        }
677
678        let mut conn = self
679            .get_connection(self.connections.reader(), "list_paginated")
680            .await?;
681
682        debug!(page = %query.page, per_page = %query.per_page, "fetching paginated transactions sorted by created_at (newest first)");
683
684        // Get all relayer IDs
685        let relayer_list_key = self.relayer_list_key();
686        let relayer_ids: Vec<String> = conn
687            .smembers(&relayer_list_key)
688            .await
689            .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_ids"))?;
690
691        // Collect all transaction IDs from all relayers using their sorted sets
692        let mut all_tx_ids = Vec::new();
693        for relayer_id in relayer_ids {
694            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
695            let tx_ids: Vec<String> = redis::cmd("ZRANGE")
696                .arg(&relayer_sorted_key)
697                .arg(0)
698                .arg(-1)
699                .arg("REV")
700                .query_async(&mut conn)
701                .await
702                .map_err(|e| self.map_redis_error(e, "list_paginated_relayer_sorted"))?;
703
704            all_tx_ids.extend(tx_ids);
705        }
706
707        // Release connection before nested call to avoid connection doubling
708        drop(conn);
709
710        // Batch fetch all transactions at once
711        let batch_result = self.get_transactions_by_ids(&all_tx_ids).await?;
712        let mut all_transactions = batch_result.results;
713
714        // Sort all transactions by created_at (newest first)
715        all_transactions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
716
717        let total = all_transactions.len() as u64;
718        let start = ((query.page - 1) * query.per_page) as usize;
719        let end = (start + query.per_page as usize).min(all_transactions.len());
720
721        if start >= all_transactions.len() {
722            debug!(page = %query.page, total = %total, "page is beyond available data");
723            return Ok(PaginatedResult {
724                items: vec![],
725                total,
726                page: query.page,
727                per_page: query.per_page,
728            });
729        }
730
731        let items = all_transactions[start..end].to_vec();
732
733        debug!(count = %items.len(), page = %query.page, "successfully fetched transactions for page");
734
735        Ok(PaginatedResult {
736            items,
737            total,
738            page: query.page,
739            per_page: query.per_page,
740        })
741    }
742
743    async fn update(
744        &self,
745        id: String,
746        entity: TransactionRepoModel,
747    ) -> Result<TransactionRepoModel, RepositoryError> {
748        if id.is_empty() {
749            return Err(RepositoryError::InvalidData(
750                "Transaction ID cannot be empty".to_string(),
751            ));
752        }
753
754        debug!(tx_id = %id, "updating transaction");
755
756        // Get the old transaction for index cleanup
757        let old_tx = self.get_by_id(id.clone()).await?;
758
759        let key = self.tx_key(&entity.relayer_id, &id);
760        let mut conn = self
761            .get_connection(self.connections.primary(), "update")
762            .await?;
763
764        let value = self.serialize_entity(&entity, |t| &t.id, "transaction")?;
765
766        // Update transaction
767        let _: () = conn
768            .set(&key, value)
769            .await
770            .map_err(|e| self.map_redis_error(e, "update_transaction"))?;
771
772        // Update indexes
773        self.update_indexes(&entity, Some(&old_tx)).await?;
774
775        debug!(tx_id = %id, "successfully updated transaction");
776        Ok(entity)
777    }
778
779    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
780        if id.is_empty() {
781            return Err(RepositoryError::InvalidData(
782                "Transaction ID cannot be empty".to_string(),
783            ));
784        }
785
786        debug!(tx_id = %id, "deleting transaction");
787
788        // Get transaction first for index cleanup
789        let tx = self.get_by_id(id.clone()).await?;
790
791        let key = self.tx_key(&tx.relayer_id, &id);
792        let reverse_key = self.tx_to_relayer_key(&id);
793        let mut conn = self
794            .get_connection(self.connections.primary(), "delete_by_id")
795            .await?;
796
797        let mut pipe = redis::pipe();
798        pipe.atomic();
799        pipe.del(&key);
800        pipe.del(&reverse_key);
801
802        pipe.exec_async(&mut conn)
803            .await
804            .map_err(|e| self.map_redis_error(e, "delete_transaction"))?;
805
806        // Remove indexes (log errors but don't fail the delete)
807        if let Err(e) = self.remove_all_indexes(&tx).await {
808            error!(tx_id = %id, error = %e, "failed to remove indexes for deleted transaction");
809        }
810
811        debug!(tx_id = %id, "successfully deleted transaction");
812        Ok(())
813    }
814
815    // Unoptimized implementation of count. Rarely used. find_by_relayer_id is preferred.
816    async fn count(&self) -> Result<usize, RepositoryError> {
817        let mut conn = self
818            .get_connection(self.connections.reader(), "count")
819            .await?;
820
821        debug!("counting transactions");
822
823        // Get all relayer IDs and sum their sorted set counts
824        let relayer_list_key = self.relayer_list_key();
825        let relayer_ids: Vec<String> = conn
826            .smembers(&relayer_list_key)
827            .await
828            .map_err(|e| self.map_redis_error(e, "count_relayer_ids"))?;
829
830        let mut total_count = 0usize;
831        for relayer_id in relayer_ids {
832            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&relayer_id);
833            let count: usize = conn
834                .zcard(&relayer_sorted_key)
835                .await
836                .map_err(|e| self.map_redis_error(e, "count_relayer_transactions"))?;
837            total_count += count;
838        }
839
840        debug!(count = %total_count, "transaction count");
841        Ok(total_count)
842    }
843
844    async fn has_entries(&self) -> Result<bool, RepositoryError> {
845        let mut conn = self
846            .get_connection(self.connections.reader(), "has_entries")
847            .await?;
848        let relayer_list_key = self.relayer_list_key();
849
850        debug!("checking if transaction entries exist");
851
852        let exists: bool = conn
853            .exists(&relayer_list_key)
854            .await
855            .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
856
857        debug!(exists = %exists, "transaction entries exist");
858        Ok(exists)
859    }
860
861    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
862        let mut conn = self
863            .get_connection(self.connections.primary(), "drop_all_entries")
864            .await?;
865        let relayer_list_key = self.relayer_list_key();
866
867        debug!("dropping all transaction entries");
868
869        // Get all relayer IDs first
870        let relayer_ids: Vec<String> = conn
871            .smembers(&relayer_list_key)
872            .await
873            .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_relayer_ids"))?;
874
875        if relayer_ids.is_empty() {
876            debug!("no transaction entries to drop");
877            return Ok(());
878        }
879
880        // Use pipeline for atomic operations
881        let mut pipe = redis::pipe();
882        pipe.atomic();
883
884        // Delete all transactions and their indexes for each relayer
885        for relayer_id in &relayer_ids {
886            // Get all transaction IDs for this relayer
887            let pattern = format!(
888                "{}:{}:{}:{}:*",
889                self.key_prefix, RELAYER_PREFIX, relayer_id, TX_PREFIX
890            );
891            let mut cursor = 0;
892            let mut tx_ids = Vec::new();
893
894            loop {
895                let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
896                    .cursor_arg(cursor)
897                    .arg("MATCH")
898                    .arg(&pattern)
899                    .query_async(&mut conn)
900                    .await
901                    .map_err(|e| self.map_redis_error(e, "drop_all_entries_scan"))?;
902
903                // Extract transaction IDs from keys and delete keys
904                for key in keys {
905                    pipe.del(&key);
906                    if let Some(tx_id) = key.split(':').next_back() {
907                        tx_ids.push(tx_id.to_string());
908                    }
909                }
910
911                cursor = next_cursor;
912                if cursor == 0 {
913                    break;
914                }
915            }
916
917            // Delete reverse lookup keys and indexes
918            for tx_id in tx_ids {
919                let reverse_key = self.tx_to_relayer_key(&tx_id);
920                pipe.del(&reverse_key);
921
922                // Delete status indexes (we can't know the specific status, so we'll clean up all possible ones)
923                // This ensures complete cleanup even if there are orphaned entries
924                for status in &[
925                    TransactionStatus::Canceled,
926                    TransactionStatus::Pending,
927                    TransactionStatus::Sent,
928                    TransactionStatus::Submitted,
929                    TransactionStatus::Mined,
930                    TransactionStatus::Confirmed,
931                    TransactionStatus::Failed,
932                    TransactionStatus::Expired,
933                ] {
934                    // Remove from sorted status set (new format)
935                    let status_sorted_key = self.relayer_status_sorted_key(relayer_id, status);
936                    pipe.zrem(&status_sorted_key, &tx_id);
937
938                    // Remove from legacy status set (for migration cleanup)
939                    let status_key = self.relayer_status_key(relayer_id, status);
940                    pipe.srem(&status_key, &tx_id);
941                }
942            }
943
944            // Delete the relayer's sorted set by created_at
945            let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
946            pipe.del(&relayer_sorted_key);
947        }
948
949        // Delete the relayer list key
950        pipe.del(&relayer_list_key);
951
952        pipe.exec_async(&mut conn)
953            .await
954            .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
955
956        debug!(count = %relayer_ids.len(), "dropped all transaction entries for relayers");
957        Ok(())
958    }
959}
960
961#[async_trait]
962impl TransactionRepository for RedisTransactionRepository {
963    async fn find_by_relayer_id(
964        &self,
965        relayer_id: &str,
966        query: PaginationQuery,
967    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
968        let mut conn = self
969            .get_connection(self.connections.reader(), "find_by_relayer_id")
970            .await?;
971
972        debug!(relayer_id = %relayer_id, page = %query.page, per_page = %query.per_page, "fetching transactions for relayer sorted by created_at (newest first)");
973
974        let relayer_sorted_key = self.relayer_tx_by_created_at_key(relayer_id);
975
976        // Get total count from relayer's sorted set
977        let sorted_set_count: u64 = conn
978            .zcard(&relayer_sorted_key)
979            .await
980            .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_count"))?;
981
982        // If sorted set is empty, return empty result immediately
983        // All new transactions are automatically added to the sorted set
984        if sorted_set_count == 0 {
985            debug!(relayer_id = %relayer_id, "no transactions found for relayer (sorted set is empty)");
986            return Ok(PaginatedResult {
987                items: vec![],
988                total: 0,
989                page: query.page,
990                per_page: query.per_page,
991            });
992        }
993
994        let total = sorted_set_count;
995
996        // Calculate pagination range (0-indexed for Redis ZRANGE with REV)
997        let start = ((query.page - 1) * query.per_page) as isize;
998        let end = start + query.per_page as isize - 1;
999
1000        if start as u64 >= total {
1001            debug!(relayer_id = %relayer_id, page = %query.page, total = %total, "page is beyond available data");
1002            return Ok(PaginatedResult {
1003                items: vec![],
1004                total,
1005                page: query.page,
1006                per_page: query.per_page,
1007            });
1008        }
1009
1010        // Get page of transaction IDs from sorted set (newest first using ZRANGE with REV)
1011        let page_ids: Vec<String> = redis::cmd("ZRANGE")
1012            .arg(&relayer_sorted_key)
1013            .arg(start)
1014            .arg(end)
1015            .arg("REV")
1016            .query_async(&mut conn)
1017            .await
1018            .map_err(|e| self.map_redis_error(e, "find_by_relayer_id_sorted"))?;
1019
1020        // Release connection before nested call to avoid connection doubling
1021        drop(conn);
1022
1023        let items = self.get_transactions_by_ids(&page_ids).await?;
1024
1025        debug!(relayer_id = %relayer_id, count = %items.results.len(), page = %query.page, "successfully fetched transactions for relayer");
1026
1027        Ok(PaginatedResult {
1028            items: items.results,
1029            total,
1030            page: query.page,
1031            per_page: query.per_page,
1032        })
1033    }
1034
1035    // Unoptimized implementation of find_by_status. Rarely used. find_by_status_paginated is preferred.
1036    async fn find_by_status(
1037        &self,
1038        relayer_id: &str,
1039        statuses: &[TransactionStatus],
1040    ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
1041        // Ensure all status sorted sets are migrated first (releases connection after each)
1042        for status in statuses {
1043            self.ensure_status_sorted_set(relayer_id, status).await?;
1044        }
1045
1046        // Now get a connection and collect all IDs
1047        let mut conn = self
1048            .get_connection(self.connections.reader(), "find_by_status")
1049            .await?;
1050
1051        let mut all_ids: Vec<String> = Vec::new();
1052        for status in statuses {
1053            // Get IDs from sorted set (already ordered by created_at)
1054            let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1055            let ids: Vec<String> = redis::cmd("ZRANGE")
1056                .arg(&sorted_key)
1057                .arg(0)
1058                .arg(-1)
1059                .arg("REV") // Newest first
1060                .query_async(&mut conn)
1061                .await
1062                .map_err(|e| self.map_redis_error(e, "find_by_status"))?;
1063
1064            all_ids.extend(ids);
1065        }
1066
1067        // Release connection before nested call to avoid connection doubling
1068        drop(conn);
1069
1070        if all_ids.is_empty() {
1071            return Ok(vec![]);
1072        }
1073
1074        // Remove duplicates (can happen if a transaction is in multiple status sets due to partial failures)
1075        all_ids.sort();
1076        all_ids.dedup();
1077
1078        // Fetch all transactions and sort by created_at (newest first)
1079        let mut transactions = self.get_transactions_by_ids(&all_ids).await?;
1080
1081        // Sort by created_at descending (newest first)
1082        transactions
1083            .results
1084            .sort_by(|a, b| b.created_at.cmp(&a.created_at));
1085
1086        Ok(transactions.results)
1087    }
1088
1089    async fn find_by_status_paginated(
1090        &self,
1091        relayer_id: &str,
1092        statuses: &[TransactionStatus],
1093        query: PaginationQuery,
1094        oldest_first: bool,
1095    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
1096        // Ensure all status sorted sets are migrated first (releases connection after each)
1097        for status in statuses {
1098            self.ensure_status_sorted_set(relayer_id, status).await?;
1099        }
1100
1101        let mut conn = self
1102            .get_connection(self.connections.reader(), "find_by_status_paginated")
1103            .await?;
1104
1105        // For single status, we can paginate directly from the sorted set
1106        if statuses.len() == 1 {
1107            let sorted_key = self.relayer_status_sorted_key(relayer_id, &statuses[0]);
1108
1109            // Get total count
1110            let total: u64 = conn
1111                .zcard(&sorted_key)
1112                .await
1113                .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_count"))?;
1114
1115            if total == 0 {
1116                return Ok(PaginatedResult {
1117                    items: vec![],
1118                    total: 0,
1119                    page: query.page,
1120                    per_page: query.per_page,
1121                });
1122            }
1123
1124            // Calculate pagination bounds
1125            let start = ((query.page.saturating_sub(1)) * query.per_page) as isize;
1126            let end = start + query.per_page as isize - 1;
1127
1128            // Get page of IDs directly from sorted set
1129            // REV = newest first (descending), no REV = oldest first (ascending)
1130            let mut cmd = redis::cmd("ZRANGE");
1131            cmd.arg(&sorted_key).arg(start).arg(end);
1132            if !oldest_first {
1133                cmd.arg("REV");
1134            }
1135            let page_ids: Vec<String> = cmd
1136                .query_async(&mut conn)
1137                .await
1138                .map_err(|e| self.map_redis_error(e, "find_by_status_paginated"))?;
1139
1140            // Release connection before nested call to avoid connection doubling
1141            drop(conn);
1142
1143            let transactions = self.get_transactions_by_ids(&page_ids).await?;
1144
1145            debug!(
1146                relayer_id = %relayer_id,
1147                status = %statuses[0],
1148                total = %total,
1149                page = %query.page,
1150                page_size = %transactions.results.len(),
1151                "fetched paginated transactions by single status"
1152            );
1153
1154            return Ok(PaginatedResult {
1155                items: transactions.results,
1156                total,
1157                page: query.page,
1158                per_page: query.per_page,
1159            });
1160        }
1161
1162        // For multiple statuses, collect all IDs and merge
1163        let mut all_ids: Vec<(String, f64)> = Vec::new();
1164        for status in statuses {
1165            let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1166
1167            // Get IDs with scores for proper sorting
1168            let ids_with_scores: Vec<(String, f64)> = redis::cmd("ZRANGE")
1169                .arg(&sorted_key)
1170                .arg(0)
1171                .arg(-1)
1172                .arg("WITHSCORES")
1173                .query_async(&mut conn)
1174                .await
1175                .map_err(|e| self.map_redis_error(e, "find_by_status_paginated_multi"))?;
1176
1177            all_ids.extend(ids_with_scores);
1178        }
1179
1180        // Release connection before nested call to avoid connection doubling
1181        drop(conn);
1182
1183        // Remove duplicates (keep highest/lowest score based on sort order)
1184        let mut id_map: std::collections::HashMap<String, f64> = std::collections::HashMap::new();
1185        for (id, score) in all_ids {
1186            id_map
1187                .entry(id)
1188                .and_modify(|s| {
1189                    // For oldest_first, keep the lowest score; otherwise keep highest
1190                    if oldest_first {
1191                        if score < *s {
1192                            *s = score
1193                        }
1194                    } else if score > *s {
1195                        *s = score
1196                    }
1197                })
1198                .or_insert(score);
1199        }
1200
1201        // Sort by score: descending for newest first, ascending for oldest first
1202        let mut sorted_ids: Vec<(String, f64)> = id_map.into_iter().collect();
1203        if oldest_first {
1204            sorted_ids.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
1205        } else {
1206            sorted_ids.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
1207        }
1208
1209        let total = sorted_ids.len() as u64;
1210
1211        if total == 0 {
1212            return Ok(PaginatedResult {
1213                items: vec![],
1214                total: 0,
1215                page: query.page,
1216                per_page: query.per_page,
1217            });
1218        }
1219
1220        // Apply pagination
1221        let start = ((query.page.saturating_sub(1)) * query.per_page) as usize;
1222        let page_ids: Vec<String> = sorted_ids
1223            .into_iter()
1224            .skip(start)
1225            .take(query.per_page as usize)
1226            .map(|(id, _)| id)
1227            .collect();
1228
1229        // Fetch only the transactions for this page
1230        let transactions = self.get_transactions_by_ids(&page_ids).await?;
1231
1232        debug!(
1233            relayer_id = %relayer_id,
1234            total = %total,
1235            page = %query.page,
1236            page_size = %transactions.results.len(),
1237            "fetched paginated transactions by status"
1238        );
1239
1240        Ok(PaginatedResult {
1241            items: transactions.results,
1242            total,
1243            page: query.page,
1244            per_page: query.per_page,
1245        })
1246    }
1247
1248    async fn find_by_nonce(
1249        &self,
1250        relayer_id: &str,
1251        nonce: u64,
1252    ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
1253        let mut conn = self
1254            .get_connection(self.connections.reader(), "find_by_nonce")
1255            .await?;
1256        let nonce_key = self.relayer_nonce_key(relayer_id, nonce);
1257
1258        // Get transaction ID with this nonce for this relayer (should be single value)
1259        let tx_id: Option<String> = conn
1260            .get(nonce_key)
1261            .await
1262            .map_err(|e| self.map_redis_error(e, "find_by_nonce"))?;
1263
1264        match tx_id {
1265            Some(tx_id) => {
1266                match self.get_by_id(tx_id.clone()).await {
1267                    Ok(tx) => Ok(Some(tx)),
1268                    Err(RepositoryError::NotFound(_)) => {
1269                        // Transaction was deleted but index wasn't cleaned up
1270                        warn!(relayer_id = %relayer_id, nonce = %nonce, "stale nonce index found for relayer");
1271                        Ok(None)
1272                    }
1273                    Err(e) => Err(e),
1274                }
1275            }
1276            None => Ok(None),
1277        }
1278    }
1279
1280    async fn update_status(
1281        &self,
1282        tx_id: String,
1283        status: TransactionStatus,
1284    ) -> Result<TransactionRepoModel, RepositoryError> {
1285        let update = TransactionUpdateRequest {
1286            status: Some(status),
1287            ..Default::default()
1288        };
1289        self.partial_update(tx_id, update).await
1290    }
1291
1292    async fn partial_update(
1293        &self,
1294        tx_id: String,
1295        update: TransactionUpdateRequest,
1296    ) -> Result<TransactionRepoModel, RepositoryError> {
1297        const MAX_RETRIES: u32 = 3;
1298        const BACKOFF_MS: u64 = 100;
1299
1300        // Optimistic CAS: only apply update if the current stored value still matches the
1301        // expected pre-update value. This avoids duplicate status metric updates on races.
1302        let mut original_tx = self.get_by_id(tx_id.clone()).await?;
1303        let mut updated_tx = original_tx.clone();
1304        updated_tx.apply_partial_update(update.clone());
1305
1306        let key = self.tx_key(&updated_tx.relayer_id, &tx_id);
1307        let mut original_value = self.serialize_entity(&original_tx, |t| &t.id, "transaction")?;
1308        let mut updated_value = self.serialize_entity(&updated_tx, |t| &t.id, "transaction")?;
1309        let mut data_updated = false;
1310
1311        let mut last_error = None;
1312
1313        for attempt in 0..MAX_RETRIES {
1314            let mut conn = match self
1315                .get_connection(self.connections.primary(), "partial_update")
1316                .await
1317            {
1318                Ok(conn) => conn,
1319                Err(e) => {
1320                    last_error = Some(e);
1321                    if attempt < MAX_RETRIES - 1 {
1322                        tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
1323                        continue;
1324                    }
1325                    return Err(last_error.unwrap());
1326                }
1327            };
1328
1329            if !data_updated {
1330                let cas_script = Script::new(
1331                    r#"
1332                    local current = redis.call('GET', KEYS[1])
1333                    if not current then
1334                        return -1
1335                    end
1336                    if current == ARGV[1] then
1337                        redis.call('SET', KEYS[1], ARGV[2])
1338                        return 1
1339                    end
1340                    return 0
1341                    "#,
1342                );
1343
1344                let cas_result: i32 = match cas_script
1345                    .key(&key)
1346                    .arg(&original_value)
1347                    .arg(&updated_value)
1348                    .invoke_async(&mut conn)
1349                    .await
1350                {
1351                    Ok(result) => result,
1352                    Err(e) => {
1353                        if attempt < MAX_RETRIES - 1 {
1354                            warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed CAS transaction update, retrying");
1355                            last_error = Some(self.map_redis_error(e, "partial_update_cas"));
1356                            tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS))
1357                                .await;
1358                            continue;
1359                        }
1360                        return Err(self.map_redis_error(e, "partial_update_cas"));
1361                    }
1362                };
1363
1364                if cas_result == -1 {
1365                    return Err(RepositoryError::NotFound(format!(
1366                        "Transaction with ID {tx_id} not found"
1367                    )));
1368                }
1369
1370                if cas_result == 0 {
1371                    if attempt < MAX_RETRIES - 1 {
1372                        warn!(tx_id = %tx_id, attempt = %attempt, "concurrent transaction update detected, rebasing retry");
1373                        original_tx = self.get_by_id(tx_id.clone()).await?;
1374                        updated_tx = original_tx.clone();
1375                        updated_tx.apply_partial_update(update.clone());
1376                        original_value =
1377                            self.serialize_entity(&original_tx, |t| &t.id, "transaction")?;
1378                        updated_value =
1379                            self.serialize_entity(&updated_tx, |t| &t.id, "transaction")?;
1380                        tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
1381                        continue;
1382                    }
1383                    return Err(RepositoryError::TransactionFailure(format!(
1384                        "Concurrent update conflict for transaction {tx_id}"
1385                    )));
1386                }
1387
1388                data_updated = true;
1389            }
1390
1391            // Try to update indexes with the original pre-update state
1392            // This ensures stale indexes are removed even on retry attempts
1393            match self.update_indexes(&updated_tx, Some(&original_tx)).await {
1394                Ok(_) => {
1395                    debug!(tx_id = %tx_id, attempt = %attempt, "successfully updated transaction");
1396
1397                    // Track metrics for transaction state changes
1398                    if let Some(new_status) = &update.status {
1399                        let network_type = format!("{:?}", updated_tx.network_type).to_lowercase();
1400                        let relayer_id = updated_tx.relayer_id.as_str();
1401
1402                        // Track submission (when status changes to Submitted)
1403                        if original_tx.status != TransactionStatus::Submitted
1404                            && *new_status == TransactionStatus::Submitted
1405                        {
1406                            TRANSACTIONS_SUBMITTED
1407                                .with_label_values(&[relayer_id, &network_type])
1408                                .inc();
1409
1410                            // Track processing time: creation to submission
1411                            if let Ok(created_time) =
1412                                chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at)
1413                            {
1414                                let processing_seconds =
1415                                    (Utc::now() - created_time.with_timezone(&Utc)).num_seconds()
1416                                        as f64;
1417                                TRANSACTION_PROCESSING_TIME
1418                                    .with_label_values(&[
1419                                        relayer_id,
1420                                        &network_type,
1421                                        "creation_to_submission",
1422                                    ])
1423                                    .observe(processing_seconds);
1424                            }
1425                        }
1426
1427                        // Track status distribution (update gauge when status changes)
1428                        if original_tx.status != *new_status {
1429                            // Decrement old status and clamp to zero to avoid negative gauges.
1430                            let old_status = &original_tx.status;
1431                            let old_status_str = format!("{old_status:?}").to_lowercase();
1432                            let old_status_gauge = TRANSACTIONS_BY_STATUS.with_label_values(&[
1433                                relayer_id,
1434                                &network_type,
1435                                &old_status_str,
1436                            ]);
1437                            let clamped_value = (old_status_gauge.get() - 1.0).max(0.0);
1438                            old_status_gauge.set(clamped_value);
1439
1440                            // Increment new status
1441                            let new_status_str = format!("{new_status:?}").to_lowercase();
1442                            TRANSACTIONS_BY_STATUS
1443                                .with_label_values(&[relayer_id, &network_type, &new_status_str])
1444                                .inc();
1445                        }
1446
1447                        // Track metrics for final transaction states
1448                        // Only track when status changes from non-final to final state
1449                        let was_final = is_final_state(&original_tx.status);
1450                        let is_final = is_final_state(new_status);
1451
1452                        if !was_final && is_final {
1453                            match new_status {
1454                                TransactionStatus::Confirmed => {
1455                                    TRANSACTIONS_SUCCESS
1456                                        .with_label_values(&[relayer_id, &network_type])
1457                                        .inc();
1458
1459                                    // Track processing time: submission to confirmation
1460                                    if let (Some(sent_at_str), Some(confirmed_at_str)) =
1461                                        (&updated_tx.sent_at, &updated_tx.confirmed_at)
1462                                    {
1463                                        if let (Ok(sent_time), Ok(confirmed_time)) = (
1464                                            chrono::DateTime::parse_from_rfc3339(sent_at_str),
1465                                            chrono::DateTime::parse_from_rfc3339(confirmed_at_str),
1466                                        ) {
1467                                            let processing_seconds = (confirmed_time
1468                                                .with_timezone(&Utc)
1469                                                - sent_time.with_timezone(&Utc))
1470                                            .num_seconds()
1471                                                as f64;
1472                                            TRANSACTION_PROCESSING_TIME
1473                                                .with_label_values(&[
1474                                                    relayer_id,
1475                                                    &network_type,
1476                                                    "submission_to_confirmation",
1477                                                ])
1478                                                .observe(processing_seconds);
1479                                        }
1480                                    }
1481
1482                                    // Track processing time: creation to confirmation
1483                                    if let Ok(created_time) =
1484                                        chrono::DateTime::parse_from_rfc3339(&updated_tx.created_at)
1485                                    {
1486                                        if let Some(confirmed_at_str) = &updated_tx.confirmed_at {
1487                                            if let Ok(confirmed_time) =
1488                                                chrono::DateTime::parse_from_rfc3339(
1489                                                    confirmed_at_str,
1490                                                )
1491                                            {
1492                                                let processing_seconds = (confirmed_time
1493                                                    .with_timezone(&Utc)
1494                                                    - created_time.with_timezone(&Utc))
1495                                                .num_seconds()
1496                                                    as f64;
1497                                                TRANSACTION_PROCESSING_TIME
1498                                                    .with_label_values(&[
1499                                                        relayer_id,
1500                                                        &network_type,
1501                                                        "creation_to_confirmation",
1502                                                    ])
1503                                                    .observe(processing_seconds);
1504                                            }
1505                                        }
1506                                    }
1507                                }
1508                                TransactionStatus::Failed => {
1509                                    // Parse status_reason to determine failure type
1510                                    let failure_reason = updated_tx
1511                                        .status_reason
1512                                        .as_deref()
1513                                        .map(|reason| {
1514                                            if reason.starts_with("Submission failed:") {
1515                                                "submission_failed"
1516                                            } else if reason.starts_with("Preparation failed:") {
1517                                                "preparation_failed"
1518                                            } else {
1519                                                "failed"
1520                                            }
1521                                        })
1522                                        .unwrap_or("failed");
1523                                    TRANSACTIONS_FAILED
1524                                        .with_label_values(&[
1525                                            relayer_id,
1526                                            &network_type,
1527                                            failure_reason,
1528                                        ])
1529                                        .inc();
1530                                }
1531                                TransactionStatus::Expired => {
1532                                    TRANSACTIONS_FAILED
1533                                        .with_label_values(&[relayer_id, &network_type, "expired"])
1534                                        .inc();
1535                                }
1536                                TransactionStatus::Canceled => {
1537                                    TRANSACTIONS_FAILED
1538                                        .with_label_values(&[relayer_id, &network_type, "canceled"])
1539                                        .inc();
1540                                }
1541                                _ => {
1542                                    // Other final states (shouldn't happen, but handle gracefully)
1543                                }
1544                            }
1545                        }
1546                    }
1547                    return Ok(updated_tx);
1548                }
1549                Err(e) if attempt < MAX_RETRIES - 1 => {
1550                    warn!(tx_id = %tx_id, attempt = %attempt, error = %e, "failed to update indexes, retrying");
1551                    last_error = Some(e);
1552                    tokio::time::sleep(tokio::time::Duration::from_millis(BACKOFF_MS)).await;
1553                    continue;
1554                }
1555                Err(e) => return Err(e),
1556            }
1557        }
1558
1559        Err(last_error.unwrap_or_else(|| {
1560            RepositoryError::UnexpectedError("partial_update exhausted retries".to_string())
1561        }))
1562    }
1563
1564    async fn update_network_data(
1565        &self,
1566        tx_id: String,
1567        network_data: NetworkTransactionData,
1568    ) -> Result<TransactionRepoModel, RepositoryError> {
1569        let update = TransactionUpdateRequest {
1570            network_data: Some(network_data),
1571            ..Default::default()
1572        };
1573        self.partial_update(tx_id, update).await
1574    }
1575
1576    async fn set_sent_at(
1577        &self,
1578        tx_id: String,
1579        sent_at: String,
1580    ) -> Result<TransactionRepoModel, RepositoryError> {
1581        let update = TransactionUpdateRequest {
1582            sent_at: Some(sent_at),
1583            ..Default::default()
1584        };
1585        self.partial_update(tx_id, update).await
1586    }
1587
1588    async fn set_confirmed_at(
1589        &self,
1590        tx_id: String,
1591        confirmed_at: String,
1592    ) -> Result<TransactionRepoModel, RepositoryError> {
1593        let update = TransactionUpdateRequest {
1594            confirmed_at: Some(confirmed_at),
1595            ..Default::default()
1596        };
1597        self.partial_update(tx_id, update).await
1598    }
1599
1600    /// Count transactions by status using Redis ZCARD (O(1) per sorted set).
1601    /// Much more efficient than find_by_status when you only need the count.
1602    /// Triggers migration from legacy SETs if needed.
1603    async fn count_by_status(
1604        &self,
1605        relayer_id: &str,
1606        statuses: &[TransactionStatus],
1607    ) -> Result<u64, RepositoryError> {
1608        let mut conn = self
1609            .get_connection(self.connections.reader(), "count_by_status")
1610            .await?;
1611        let mut total_count: u64 = 0;
1612
1613        for status in statuses {
1614            // Ensure sorted set is migrated
1615            self.ensure_status_sorted_set(relayer_id, status).await?;
1616
1617            let sorted_key = self.relayer_status_sorted_key(relayer_id, status);
1618            let count: u64 = conn
1619                .zcard(&sorted_key)
1620                .await
1621                .map_err(|e| self.map_redis_error(e, "count_by_status"))?;
1622            total_count += count;
1623        }
1624
1625        debug!(relayer_id = %relayer_id, count = %total_count, "counted transactions by status");
1626        Ok(total_count)
1627    }
1628
1629    async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError> {
1630        if ids.is_empty() {
1631            debug!("no transaction IDs provided for batch delete");
1632            return Ok(BatchDeleteResult::default());
1633        }
1634
1635        debug!(count = %ids.len(), "batch deleting transactions by IDs (with fetch)");
1636
1637        // Fetch transactions to get their data for index cleanup
1638        let batch_result = self.get_transactions_by_ids(&ids).await?;
1639
1640        // Convert to delete requests
1641        let requests: Vec<TransactionDeleteRequest> = batch_result
1642            .results
1643            .iter()
1644            .map(|tx| TransactionDeleteRequest {
1645                id: tx.id.clone(),
1646                relayer_id: tx.relayer_id.clone(),
1647                nonce: self.extract_nonce(&tx.network_data),
1648            })
1649            .collect();
1650
1651        // Track IDs that weren't found
1652        let mut result = self.delete_by_requests(requests).await?;
1653
1654        // Add the IDs that weren't found during fetch
1655        for id in batch_result.failed_ids {
1656            result
1657                .failed
1658                .push((id.clone(), format!("Transaction with ID {id} not found")));
1659        }
1660
1661        Ok(result)
1662    }
1663
1664    async fn delete_by_requests(
1665        &self,
1666        requests: Vec<TransactionDeleteRequest>,
1667    ) -> Result<BatchDeleteResult, RepositoryError> {
1668        if requests.is_empty() {
1669            debug!("no delete requests provided for batch delete");
1670            return Ok(BatchDeleteResult::default());
1671        }
1672
1673        debug!(count = %requests.len(), "batch deleting transactions by requests (no fetch)");
1674        let mut conn = self
1675            .get_connection(self.connections.primary(), "batch_delete_no_fetch")
1676            .await?;
1677        let mut pipe = redis::pipe();
1678        pipe.atomic();
1679
1680        // All possible statuses for index cleanup
1681        let all_statuses = [
1682            TransactionStatus::Canceled,
1683            TransactionStatus::Pending,
1684            TransactionStatus::Sent,
1685            TransactionStatus::Submitted,
1686            TransactionStatus::Mined,
1687            TransactionStatus::Confirmed,
1688            TransactionStatus::Failed,
1689            TransactionStatus::Expired,
1690        ];
1691
1692        // Build pipeline for all deletions and index removals
1693        for req in &requests {
1694            // Delete transaction data
1695            let tx_key = self.tx_key(&req.relayer_id, &req.id);
1696            pipe.del(&tx_key);
1697
1698            // Delete reverse lookup
1699            let reverse_key = self.tx_to_relayer_key(&req.id);
1700            pipe.del(&reverse_key);
1701
1702            // Remove from all possible status indexes
1703            for status in &all_statuses {
1704                let status_sorted_key = self.relayer_status_sorted_key(&req.relayer_id, status);
1705                pipe.zrem(&status_sorted_key, &req.id);
1706
1707                let status_legacy_key = self.relayer_status_key(&req.relayer_id, status);
1708                pipe.srem(&status_legacy_key, &req.id);
1709            }
1710
1711            // Remove nonce index if exists
1712            if let Some(nonce) = req.nonce {
1713                let nonce_key = self.relayer_nonce_key(&req.relayer_id, nonce);
1714                pipe.del(&nonce_key);
1715            }
1716
1717            // Remove from per-relayer sorted set by created_at
1718            let relayer_sorted_key = self.relayer_tx_by_created_at_key(&req.relayer_id);
1719            pipe.zrem(&relayer_sorted_key, &req.id);
1720        }
1721
1722        // Execute the entire pipeline in one round-trip
1723        match pipe.exec_async(&mut conn).await {
1724            Ok(_) => {
1725                let deleted_count = requests.len();
1726                debug!(
1727                    deleted_count = %deleted_count,
1728                    "batch delete completed"
1729                );
1730                Ok(BatchDeleteResult {
1731                    deleted_count,
1732                    failed: vec![],
1733                })
1734            }
1735            Err(e) => {
1736                error!(error = %e, "batch delete pipeline failed");
1737                // Mark all requests as failed
1738                let failed: Vec<(String, String)> = requests
1739                    .iter()
1740                    .map(|req| (req.id.clone(), format!("Redis pipeline error: {e}")))
1741                    .collect();
1742                Ok(BatchDeleteResult {
1743                    deleted_count: 0,
1744                    failed,
1745                })
1746            }
1747        }
1748    }
1749}
1750
1751#[cfg(test)]
1752mod tests {
1753    use super::*;
1754    use crate::models::{evm::Speed, EvmTransactionData, NetworkType};
1755    use alloy::primitives::U256;
1756    use deadpool_redis::{Config, Runtime};
1757    use lazy_static::lazy_static;
1758    use std::str::FromStr;
1759    use tokio;
1760    use uuid::Uuid;
1761
1762    use tokio::sync::Mutex;
1763
1764    // Use a mutex to ensure tests don't run in parallel when modifying env vars
1765    lazy_static! {
1766        static ref ENV_MUTEX: Mutex<()> = Mutex::new(());
1767    }
1768
1769    // Helper function to create test transactions
1770    fn create_test_transaction(id: &str) -> TransactionRepoModel {
1771        TransactionRepoModel {
1772            id: id.to_string(),
1773            relayer_id: "relayer-1".to_string(),
1774            status: TransactionStatus::Pending,
1775            status_reason: None,
1776            created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
1777            sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
1778            confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
1779            valid_until: None,
1780            delete_at: None,
1781            network_type: NetworkType::Evm,
1782            priced_at: None,
1783            hashes: vec![],
1784            network_data: NetworkTransactionData::Evm(EvmTransactionData {
1785                gas_price: Some(1000000000),
1786                gas_limit: Some(21000),
1787                nonce: Some(1),
1788                value: U256::from_str("1000000000000000000").unwrap(),
1789                data: Some("0x".to_string()),
1790                from: "0xSender".to_string(),
1791                to: Some("0xRecipient".to_string()),
1792                chain_id: 1,
1793                signature: None,
1794                hash: Some(format!("0x{id}")),
1795                speed: Some(Speed::Fast),
1796                max_fee_per_gas: None,
1797                max_priority_fee_per_gas: None,
1798                raw: None,
1799            }),
1800            noop_count: None,
1801            is_canceled: Some(false),
1802            metadata: None,
1803        }
1804    }
1805
1806    fn create_test_transaction_with_relayer(id: &str, relayer_id: &str) -> TransactionRepoModel {
1807        let mut tx = create_test_transaction(id);
1808        tx.relayer_id = relayer_id.to_string();
1809        tx
1810    }
1811
1812    fn create_test_transaction_with_status(
1813        id: &str,
1814        relayer_id: &str,
1815        status: TransactionStatus,
1816    ) -> TransactionRepoModel {
1817        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1818        tx.status = status;
1819        tx
1820    }
1821
1822    fn create_test_transaction_with_nonce(
1823        id: &str,
1824        nonce: u64,
1825        relayer_id: &str,
1826    ) -> TransactionRepoModel {
1827        let mut tx = create_test_transaction_with_relayer(id, relayer_id);
1828        if let NetworkTransactionData::Evm(ref mut evm_data) = tx.network_data {
1829            evm_data.nonce = Some(nonce);
1830        }
1831        tx
1832    }
1833
1834    async fn setup_test_repo() -> RedisTransactionRepository {
1835        // Use a mock Redis URL - in real integration tests, this would connect to a test Redis instance
1836        let redis_url = std::env::var("REDIS_TEST_URL")
1837            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1838
1839        let cfg = Config::from_url(&redis_url);
1840        let pool = Arc::new(
1841            cfg.builder()
1842                .expect("Failed to create pool builder")
1843                .max_size(16)
1844                .runtime(Runtime::Tokio1)
1845                .build()
1846                .expect("Failed to build Redis pool"),
1847        );
1848
1849        // Create RedisConnections with same pool for both primary and reader (for testing)
1850        let connections = Arc::new(RedisConnections::new_single_pool(pool));
1851
1852        let random_id = Uuid::new_v4().to_string();
1853        let key_prefix = format!("test_prefix:{random_id}");
1854
1855        RedisTransactionRepository::new(connections, key_prefix)
1856            .expect("Failed to create RedisTransactionRepository")
1857    }
1858
1859    #[tokio::test]
1860    #[ignore = "Requires active Redis instance"]
1861    async fn test_new_repository_creation() {
1862        let repo = setup_test_repo().await;
1863        assert!(repo.key_prefix.contains("test_prefix"));
1864    }
1865
1866    #[tokio::test]
1867    #[ignore = "Requires active Redis instance"]
1868    async fn test_new_repository_empty_prefix_fails() {
1869        let redis_url = std::env::var("REDIS_TEST_URL")
1870            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
1871        let cfg = Config::from_url(&redis_url);
1872        let pool = Arc::new(
1873            cfg.builder()
1874                .expect("Failed to create pool builder")
1875                .max_size(16)
1876                .runtime(Runtime::Tokio1)
1877                .build()
1878                .expect("Failed to build Redis pool"),
1879        );
1880        let connections = Arc::new(RedisConnections::new_single_pool(pool));
1881
1882        let result = RedisTransactionRepository::new(connections, "".to_string());
1883        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1884    }
1885
1886    #[tokio::test]
1887    #[ignore = "Requires active Redis instance"]
1888    async fn test_key_generation() {
1889        let repo = setup_test_repo().await;
1890
1891        assert!(repo
1892            .tx_key("relayer-1", "test-id")
1893            .contains(":relayer:relayer-1:tx:test-id"));
1894        assert!(repo
1895            .tx_to_relayer_key("test-id")
1896            .contains(":relayer:tx_to_relayer:test-id"));
1897        assert!(repo.relayer_list_key().contains(":relayer_list"));
1898        assert!(repo
1899            .relayer_status_key("relayer-1", &TransactionStatus::Pending)
1900            .contains(":relayer:relayer-1:status:Pending"));
1901        assert!(repo
1902            .relayer_nonce_key("relayer-1", 42)
1903            .contains(":relayer:relayer-1:nonce:42"));
1904    }
1905
1906    #[tokio::test]
1907    #[ignore = "Requires active Redis instance"]
1908    async fn test_serialize_deserialize_transaction() {
1909        let repo = setup_test_repo().await;
1910        let tx = create_test_transaction("test-1");
1911
1912        let serialized = repo
1913            .serialize_entity(&tx, |t| &t.id, "transaction")
1914            .expect("Serialization should succeed");
1915        let deserialized: TransactionRepoModel = repo
1916            .deserialize_entity(&serialized, "test-1", "transaction")
1917            .expect("Deserialization should succeed");
1918
1919        assert_eq!(tx.id, deserialized.id);
1920        assert_eq!(tx.relayer_id, deserialized.relayer_id);
1921        assert_eq!(tx.status, deserialized.status);
1922    }
1923
1924    #[tokio::test]
1925    #[ignore = "Requires active Redis instance"]
1926    async fn test_extract_nonce() {
1927        let repo = setup_test_repo().await;
1928        let random_id = Uuid::new_v4().to_string();
1929        let relayer_id = Uuid::new_v4().to_string();
1930        let tx_with_nonce = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
1931
1932        let nonce = repo.extract_nonce(&tx_with_nonce.network_data);
1933        assert_eq!(nonce, Some(42));
1934    }
1935
1936    #[tokio::test]
1937    #[ignore = "Requires active Redis instance"]
1938    async fn test_create_transaction() {
1939        let repo = setup_test_repo().await;
1940        let random_id = Uuid::new_v4().to_string();
1941        let tx = create_test_transaction(&random_id);
1942
1943        let result = repo.create(tx.clone()).await.unwrap();
1944        assert_eq!(result.id, tx.id);
1945    }
1946
1947    #[tokio::test]
1948    #[ignore = "Requires active Redis instance"]
1949    async fn test_get_transaction() {
1950        let repo = setup_test_repo().await;
1951        let random_id = Uuid::new_v4().to_string();
1952        let tx = create_test_transaction(&random_id);
1953
1954        repo.create(tx.clone()).await.unwrap();
1955        let stored = repo.get_by_id(random_id.to_string()).await.unwrap();
1956        assert_eq!(stored.id, tx.id);
1957        assert_eq!(stored.relayer_id, tx.relayer_id);
1958    }
1959
1960    #[tokio::test]
1961    #[ignore = "Requires active Redis instance"]
1962    async fn test_update_transaction() {
1963        let repo = setup_test_repo().await;
1964        let random_id = Uuid::new_v4().to_string();
1965        let mut tx = create_test_transaction(&random_id);
1966
1967        repo.create(tx.clone()).await.unwrap();
1968        tx.status = TransactionStatus::Confirmed;
1969
1970        let updated = repo.update(random_id.to_string(), tx).await.unwrap();
1971        assert!(matches!(updated.status, TransactionStatus::Confirmed));
1972    }
1973
1974    #[tokio::test]
1975    #[ignore = "Requires active Redis instance"]
1976    async fn test_delete_transaction() {
1977        let repo = setup_test_repo().await;
1978        let random_id = Uuid::new_v4().to_string();
1979        let tx = create_test_transaction(&random_id);
1980
1981        repo.create(tx).await.unwrap();
1982        repo.delete_by_id(random_id.to_string()).await.unwrap();
1983
1984        let result = repo.get_by_id(random_id.to_string()).await;
1985        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1986    }
1987
1988    #[tokio::test]
1989    #[ignore = "Requires active Redis instance"]
1990    async fn test_list_all_transactions() {
1991        let repo = setup_test_repo().await;
1992        let random_id = Uuid::new_v4().to_string();
1993        let random_id2 = Uuid::new_v4().to_string();
1994
1995        let tx1 = create_test_transaction(&random_id);
1996        let tx2 = create_test_transaction(&random_id2);
1997
1998        repo.create(tx1).await.unwrap();
1999        repo.create(tx2).await.unwrap();
2000
2001        let transactions = repo.list_all().await.unwrap();
2002        assert!(transactions.len() >= 2);
2003    }
2004
2005    #[tokio::test]
2006    #[ignore = "Requires active Redis instance"]
2007    async fn test_count_transactions() {
2008        let repo = setup_test_repo().await;
2009        let random_id = Uuid::new_v4().to_string();
2010        let tx = create_test_transaction(&random_id);
2011
2012        let count = repo.count().await.unwrap();
2013        repo.create(tx).await.unwrap();
2014        assert!(repo.count().await.unwrap() > count);
2015    }
2016
2017    #[tokio::test]
2018    #[ignore = "Requires active Redis instance"]
2019    async fn test_get_nonexistent_transaction() {
2020        let repo = setup_test_repo().await;
2021        let result = repo.get_by_id("nonexistent".to_string()).await;
2022        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2023    }
2024
2025    #[tokio::test]
2026    #[ignore = "Requires active Redis instance"]
2027    async fn test_duplicate_transaction_creation() {
2028        let repo = setup_test_repo().await;
2029        let random_id = Uuid::new_v4().to_string();
2030
2031        let tx = create_test_transaction(&random_id);
2032
2033        repo.create(tx.clone()).await.unwrap();
2034        let result = repo.create(tx).await;
2035
2036        assert!(matches!(
2037            result,
2038            Err(RepositoryError::ConstraintViolation(_))
2039        ));
2040    }
2041
2042    #[tokio::test]
2043    #[ignore = "Requires active Redis instance"]
2044    async fn test_update_nonexistent_transaction() {
2045        let repo = setup_test_repo().await;
2046        let tx = create_test_transaction("test-1");
2047
2048        let result = repo.update("nonexistent".to_string(), tx).await;
2049        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
2050    }
2051
2052    #[tokio::test]
2053    #[ignore = "Requires active Redis instance"]
2054    async fn test_list_paginated() {
2055        let repo = setup_test_repo().await;
2056
2057        // Create multiple transactions
2058        for _ in 1..=10 {
2059            let random_id = Uuid::new_v4().to_string();
2060            let tx = create_test_transaction(&random_id);
2061            repo.create(tx).await.unwrap();
2062        }
2063
2064        // Test first page with 3 items per page
2065        let query = PaginationQuery {
2066            page: 1,
2067            per_page: 3,
2068        };
2069        let result = repo.list_paginated(query).await.unwrap();
2070        assert_eq!(result.items.len(), 3);
2071        assert!(result.total >= 10);
2072        assert_eq!(result.page, 1);
2073        assert_eq!(result.per_page, 3);
2074
2075        // Test empty page (beyond total items)
2076        let query = PaginationQuery {
2077            page: 1000,
2078            per_page: 3,
2079        };
2080        let result = repo.list_paginated(query).await.unwrap();
2081        assert_eq!(result.items.len(), 0);
2082    }
2083
2084    #[tokio::test]
2085    #[ignore = "Requires active Redis instance"]
2086    async fn test_find_by_relayer_id() {
2087        let repo = setup_test_repo().await;
2088        let random_id = Uuid::new_v4().to_string();
2089        let random_id2 = Uuid::new_v4().to_string();
2090        let random_id3 = Uuid::new_v4().to_string();
2091
2092        let tx1 = create_test_transaction_with_relayer(&random_id, "relayer-1");
2093        let tx2 = create_test_transaction_with_relayer(&random_id2, "relayer-1");
2094        let tx3 = create_test_transaction_with_relayer(&random_id3, "relayer-2");
2095
2096        repo.create(tx1).await.unwrap();
2097        repo.create(tx2).await.unwrap();
2098        repo.create(tx3).await.unwrap();
2099
2100        // Test finding transactions for relayer-1
2101        let query = PaginationQuery {
2102            page: 1,
2103            per_page: 10,
2104        };
2105        let result = repo
2106            .find_by_relayer_id("relayer-1", query.clone())
2107            .await
2108            .unwrap();
2109        assert!(result.total >= 2);
2110        assert!(result.items.len() >= 2);
2111        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-1"));
2112
2113        // Test finding transactions for relayer-2
2114        let result = repo
2115            .find_by_relayer_id("relayer-2", query.clone())
2116            .await
2117            .unwrap();
2118        assert!(result.total >= 1);
2119        assert!(!result.items.is_empty());
2120        assert!(result.items.iter().all(|tx| tx.relayer_id == "relayer-2"));
2121
2122        // Test finding transactions for non-existent relayer
2123        let result = repo
2124            .find_by_relayer_id("non-existent", query.clone())
2125            .await
2126            .unwrap();
2127        assert_eq!(result.total, 0);
2128        assert_eq!(result.items.len(), 0);
2129    }
2130
2131    #[tokio::test]
2132    #[ignore = "Requires active Redis instance"]
2133    async fn test_find_by_relayer_id_sorted_by_created_at_newest_first() {
2134        let repo = setup_test_repo().await;
2135        let relayer_id = Uuid::new_v4().to_string();
2136
2137        // Create transactions with different created_at timestamps
2138        let mut tx1 = create_test_transaction_with_relayer("test-1", &relayer_id);
2139        tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); // Oldest
2140
2141        let mut tx2 = create_test_transaction_with_relayer("test-2", &relayer_id);
2142        tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); // Middle
2143
2144        let mut tx3 = create_test_transaction_with_relayer("test-3", &relayer_id);
2145        tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); // Newest
2146
2147        // Create transactions in non-chronological order to ensure sorting works
2148        repo.create(tx2.clone()).await.unwrap(); // Middle first
2149        repo.create(tx1.clone()).await.unwrap(); // Oldest second
2150        repo.create(tx3.clone()).await.unwrap(); // Newest last
2151
2152        let query = PaginationQuery {
2153            page: 1,
2154            per_page: 10,
2155        };
2156        let result = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
2157
2158        assert_eq!(result.total, 3);
2159        assert_eq!(result.items.len(), 3);
2160
2161        // Verify transactions are sorted by created_at descending (newest first)
2162        assert_eq!(
2163            result.items[0].id, "test-3",
2164            "First item should be newest (test-3)"
2165        );
2166        assert_eq!(
2167            result.items[0].created_at,
2168            "2025-01-27T14:00:00.000000+00:00"
2169        );
2170
2171        assert_eq!(
2172            result.items[1].id, "test-2",
2173            "Second item should be middle (test-2)"
2174        );
2175        assert_eq!(
2176            result.items[1].created_at,
2177            "2025-01-27T12:00:00.000000+00:00"
2178        );
2179
2180        assert_eq!(
2181            result.items[2].id, "test-1",
2182            "Third item should be oldest (test-1)"
2183        );
2184        assert_eq!(
2185            result.items[2].created_at,
2186            "2025-01-27T10:00:00.000000+00:00"
2187        );
2188    }
2189
2190    #[tokio::test]
2191    #[ignore = "Requires active Redis instance"]
2192    async fn test_find_by_relayer_id_migration_from_old_index() {
2193        let repo = setup_test_repo().await;
2194        let relayer_id = Uuid::new_v4().to_string();
2195
2196        // Create transactions with different created_at timestamps
2197        let mut tx1 = create_test_transaction_with_relayer("migrate-test-1", &relayer_id);
2198        tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string(); // Oldest
2199
2200        let mut tx2 = create_test_transaction_with_relayer("migrate-test-2", &relayer_id);
2201        tx2.created_at = "2025-01-27T12:00:00.000000+00:00".to_string(); // Middle
2202
2203        let mut tx3 = create_test_transaction_with_relayer("migrate-test-3", &relayer_id);
2204        tx3.created_at = "2025-01-27T14:00:00.000000+00:00".to_string(); // Newest
2205
2206        // Create transactions directly in Redis WITHOUT adding to sorted set
2207        // This simulates old transactions created before the sorted set index existed
2208        let mut conn = repo.connections.primary().get().await.unwrap();
2209        let relayer_list_key = repo.relayer_list_key();
2210        let _: () = conn.sadd(&relayer_list_key, &relayer_id).await.unwrap();
2211
2212        for tx in &[&tx1, &tx2, &tx3] {
2213            let key = repo.tx_key(&tx.relayer_id, &tx.id);
2214            let reverse_key = repo.tx_to_relayer_key(&tx.id);
2215            let value = repo.serialize_entity(tx, |t| &t.id, "transaction").unwrap();
2216
2217            let mut pipe = redis::pipe();
2218            pipe.atomic();
2219            pipe.set(&key, &value);
2220            pipe.set(&reverse_key, &tx.relayer_id);
2221
2222            // Add to status index (but NOT to sorted set)
2223            let status_key = repo.relayer_status_key(&tx.relayer_id, &tx.status);
2224            pipe.sadd(&status_key, &tx.id);
2225
2226            pipe.exec_async(&mut conn).await.unwrap();
2227        }
2228
2229        // Verify sorted set is empty (transactions were created without sorted set index)
2230        let relayer_sorted_key = repo.relayer_tx_by_created_at_key(&relayer_id);
2231        let count: u64 = conn.zcard(&relayer_sorted_key).await.unwrap();
2232        assert_eq!(count, 0, "Sorted set should be empty for old transactions");
2233
2234        // Call find_by_relayer_id - this should trigger migration
2235        let query = PaginationQuery {
2236            page: 1,
2237            per_page: 10,
2238        };
2239        let result = repo
2240            .find_by_relayer_id(&relayer_id, query.clone())
2241            .await
2242            .unwrap();
2243
2244        // Verify migration happened - sorted set should now have entries
2245        let count_after: u64 = conn.zcard(&relayer_sorted_key).await.unwrap();
2246        assert_eq!(
2247            count_after, 3,
2248            "Sorted set should be populated after migration"
2249        );
2250
2251        // Verify results are correct and sorted (newest first)
2252        assert_eq!(result.total, 3);
2253        assert_eq!(result.items.len(), 3);
2254
2255        assert_eq!(
2256            result.items[0].id, "migrate-test-3",
2257            "First item should be newest after migration"
2258        );
2259        assert_eq!(
2260            result.items[0].created_at,
2261            "2025-01-27T14:00:00.000000+00:00"
2262        );
2263
2264        assert_eq!(
2265            result.items[1].id, "migrate-test-2",
2266            "Second item should be middle after migration"
2267        );
2268        assert_eq!(
2269            result.items[1].created_at,
2270            "2025-01-27T12:00:00.000000+00:00"
2271        );
2272
2273        assert_eq!(
2274            result.items[2].id, "migrate-test-1",
2275            "Third item should be oldest after migration"
2276        );
2277        assert_eq!(
2278            result.items[2].created_at,
2279            "2025-01-27T10:00:00.000000+00:00"
2280        );
2281
2282        // Verify second call uses sorted set (no migration needed)
2283        let result2 = repo.find_by_relayer_id(&relayer_id, query).await.unwrap();
2284        assert_eq!(result2.total, 3);
2285        assert_eq!(result2.items.len(), 3);
2286        // Results should be identical since sorted set is now populated
2287        assert_eq!(result.items[0].id, result2.items[0].id);
2288    }
2289
2290    #[tokio::test]
2291    #[ignore = "Requires active Redis instance"]
2292    async fn test_find_by_status() {
2293        let repo = setup_test_repo().await;
2294        let random_id = Uuid::new_v4().to_string();
2295        let random_id2 = Uuid::new_v4().to_string();
2296        let random_id3 = Uuid::new_v4().to_string();
2297        let relayer_id = Uuid::new_v4().to_string();
2298        let tx1 = create_test_transaction_with_status(
2299            &random_id,
2300            &relayer_id,
2301            TransactionStatus::Pending,
2302        );
2303        let tx2 =
2304            create_test_transaction_with_status(&random_id2, &relayer_id, TransactionStatus::Sent);
2305        let tx3 = create_test_transaction_with_status(
2306            &random_id3,
2307            &relayer_id,
2308            TransactionStatus::Confirmed,
2309        );
2310
2311        repo.create(tx1).await.unwrap();
2312        repo.create(tx2).await.unwrap();
2313        repo.create(tx3).await.unwrap();
2314
2315        // Test finding pending transactions
2316        let result = repo
2317            .find_by_status(&relayer_id, &[TransactionStatus::Pending])
2318            .await
2319            .unwrap();
2320        assert_eq!(result.len(), 1);
2321        assert_eq!(result[0].status, TransactionStatus::Pending);
2322
2323        // Test finding multiple statuses
2324        let result = repo
2325            .find_by_status(
2326                &relayer_id,
2327                &[TransactionStatus::Pending, TransactionStatus::Sent],
2328            )
2329            .await
2330            .unwrap();
2331        assert_eq!(result.len(), 2);
2332
2333        // Test finding non-existent status
2334        let result = repo
2335            .find_by_status(&relayer_id, &[TransactionStatus::Failed])
2336            .await
2337            .unwrap();
2338        assert_eq!(result.len(), 0);
2339    }
2340
2341    #[tokio::test]
2342    #[ignore = "Requires active Redis instance"]
2343    async fn test_find_by_status_paginated() {
2344        let repo = setup_test_repo().await;
2345        let relayer_id = Uuid::new_v4().to_string();
2346
2347        // Create 5 pending transactions with different timestamps
2348        for i in 1..=5 {
2349            let tx_id = Uuid::new_v4().to_string();
2350            let mut tx = create_test_transaction_with_status(
2351                &tx_id,
2352                &relayer_id,
2353                TransactionStatus::Pending,
2354            );
2355            tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2356            repo.create(tx).await.unwrap();
2357        }
2358
2359        // Create 2 confirmed transactions
2360        for i in 6..=7 {
2361            let tx_id = Uuid::new_v4().to_string();
2362            let mut tx = create_test_transaction_with_status(
2363                &tx_id,
2364                &relayer_id,
2365                TransactionStatus::Confirmed,
2366            );
2367            tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2368            repo.create(tx).await.unwrap();
2369        }
2370
2371        // Test first page (2 items per page)
2372        let query = PaginationQuery {
2373            page: 1,
2374            per_page: 2,
2375        };
2376        let result = repo
2377            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2378            .await
2379            .unwrap();
2380
2381        assert_eq!(result.total, 5);
2382        assert_eq!(result.items.len(), 2);
2383        assert_eq!(result.page, 1);
2384        assert_eq!(result.per_page, 2);
2385
2386        // Test second page
2387        let query = PaginationQuery {
2388            page: 2,
2389            per_page: 2,
2390        };
2391        let result = repo
2392            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2393            .await
2394            .unwrap();
2395
2396        assert_eq!(result.total, 5);
2397        assert_eq!(result.items.len(), 2);
2398        assert_eq!(result.page, 2);
2399
2400        // Test last page (partial)
2401        let query = PaginationQuery {
2402            page: 3,
2403            per_page: 2,
2404        };
2405        let result = repo
2406            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2407            .await
2408            .unwrap();
2409
2410        assert_eq!(result.total, 5);
2411        assert_eq!(result.items.len(), 1);
2412
2413        // Test multiple statuses
2414        let query = PaginationQuery {
2415            page: 1,
2416            per_page: 10,
2417        };
2418        let result = repo
2419            .find_by_status_paginated(
2420                &relayer_id,
2421                &[TransactionStatus::Pending, TransactionStatus::Confirmed],
2422                query,
2423                false,
2424            )
2425            .await
2426            .unwrap();
2427
2428        assert_eq!(result.total, 7);
2429        assert_eq!(result.items.len(), 7);
2430
2431        // Test empty result
2432        let query = PaginationQuery {
2433            page: 1,
2434            per_page: 10,
2435        };
2436        let result = repo
2437            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Failed], query, false)
2438            .await
2439            .unwrap();
2440
2441        assert_eq!(result.total, 0);
2442        assert_eq!(result.items.len(), 0);
2443    }
2444
2445    #[tokio::test]
2446    #[ignore = "Requires active Redis instance"]
2447    async fn test_find_by_status_paginated_oldest_first() {
2448        let repo = setup_test_repo().await;
2449        let relayer_id = Uuid::new_v4().to_string();
2450
2451        // Create 5 pending transactions with ascending timestamps
2452        for i in 1..=5 {
2453            let tx_id = format!("tx{}-{}", i, Uuid::new_v4());
2454            let mut tx = create_test_transaction(&tx_id);
2455            tx.relayer_id = relayer_id.clone();
2456            tx.status = TransactionStatus::Pending;
2457            tx.created_at = format!("2025-01-27T{:02}:00:00.000000+00:00", 10 + i);
2458            repo.create(tx).await.unwrap();
2459        }
2460
2461        // Test oldest_first: true - should return oldest transactions first
2462        let query = PaginationQuery {
2463            page: 1,
2464            per_page: 3,
2465        };
2466        let result = repo
2467            .find_by_status_paginated(
2468                &relayer_id,
2469                &[TransactionStatus::Pending],
2470                query.clone(),
2471                true,
2472            )
2473            .await
2474            .unwrap();
2475
2476        assert_eq!(result.total, 5);
2477        assert_eq!(result.items.len(), 3);
2478        // Verify ordering: oldest first (11:00, 12:00, 13:00)
2479        assert!(
2480            result.items[0].created_at < result.items[1].created_at,
2481            "First item should be older than second"
2482        );
2483        assert!(
2484            result.items[1].created_at < result.items[2].created_at,
2485            "Second item should be older than third"
2486        );
2487
2488        // Contrast with oldest_first: false - should return newest first
2489        let result_newest = repo
2490            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2491            .await
2492            .unwrap();
2493
2494        assert_eq!(result_newest.items.len(), 3);
2495        // Verify ordering: newest first (15:00, 14:00, 13:00)
2496        assert!(
2497            result_newest.items[0].created_at > result_newest.items[1].created_at,
2498            "First item should be newer than second"
2499        );
2500        assert!(
2501            result_newest.items[1].created_at > result_newest.items[2].created_at,
2502            "Second item should be newer than third"
2503        );
2504    }
2505
2506    #[tokio::test]
2507    #[ignore = "Requires active Redis instance"]
2508    async fn test_find_by_status_paginated_oldest_first_single_item() {
2509        let repo = setup_test_repo().await;
2510        let relayer_id = Uuid::new_v4().to_string();
2511
2512        // Create transactions with specific timestamps
2513        let timestamps = [
2514            "2025-01-27T08:00:00.000000+00:00", // oldest
2515            "2025-01-27T10:00:00.000000+00:00", // middle
2516            "2025-01-27T12:00:00.000000+00:00", // newest
2517        ];
2518
2519        let mut oldest_id = String::new();
2520        let mut newest_id = String::new();
2521
2522        for (i, timestamp) in timestamps.iter().enumerate() {
2523            let tx_id = format!("tx-{}-{}", i, Uuid::new_v4());
2524            if i == 0 {
2525                oldest_id = tx_id.clone();
2526            }
2527            if i == 2 {
2528                newest_id = tx_id.clone();
2529            }
2530            let mut tx = create_test_transaction(&tx_id);
2531            tx.relayer_id = relayer_id.clone();
2532            tx.status = TransactionStatus::Pending;
2533            tx.created_at = timestamp.to_string();
2534            repo.create(tx).await.unwrap();
2535        }
2536
2537        // Request just 1 item with oldest_first: true
2538        let query = PaginationQuery {
2539            page: 1,
2540            per_page: 1,
2541        };
2542        let result = repo
2543            .find_by_status_paginated(
2544                &relayer_id,
2545                &[TransactionStatus::Pending],
2546                query.clone(),
2547                true,
2548            )
2549            .await
2550            .unwrap();
2551
2552        assert_eq!(result.total, 3);
2553        assert_eq!(result.items.len(), 1);
2554        assert_eq!(
2555            result.items[0].id, oldest_id,
2556            "With oldest_first=true and per_page=1, should return the oldest transaction"
2557        );
2558
2559        // Contrast with oldest_first: false
2560        let result = repo
2561            .find_by_status_paginated(&relayer_id, &[TransactionStatus::Pending], query, false)
2562            .await
2563            .unwrap();
2564
2565        assert_eq!(result.items.len(), 1);
2566        assert_eq!(
2567            result.items[0].id, newest_id,
2568            "With oldest_first=false and per_page=1, should return the newest transaction"
2569        );
2570    }
2571
2572    #[tokio::test]
2573    #[ignore = "Requires active Redis instance"]
2574    async fn test_find_by_nonce() {
2575        let repo = setup_test_repo().await;
2576        let random_id = Uuid::new_v4().to_string();
2577        let random_id2 = Uuid::new_v4().to_string();
2578        let relayer_id = Uuid::new_v4().to_string();
2579
2580        let tx1 = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2581        let tx2 = create_test_transaction_with_nonce(&random_id2, 43, &relayer_id);
2582
2583        repo.create(tx1.clone()).await.unwrap();
2584        repo.create(tx2).await.unwrap();
2585
2586        // Test finding existing nonce
2587        let result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2588        assert!(result.is_some());
2589        assert_eq!(result.unwrap().id, random_id);
2590
2591        // Test finding non-existent nonce
2592        let result = repo.find_by_nonce(&relayer_id, 99).await.unwrap();
2593        assert!(result.is_none());
2594
2595        // Test finding nonce for non-existent relayer
2596        let result = repo.find_by_nonce("non-existent", 42).await.unwrap();
2597        assert!(result.is_none());
2598    }
2599
2600    #[tokio::test]
2601    #[ignore = "Requires active Redis instance"]
2602    async fn test_update_status() {
2603        let repo = setup_test_repo().await;
2604        let random_id = Uuid::new_v4().to_string();
2605        let tx = create_test_transaction(&random_id);
2606
2607        repo.create(tx).await.unwrap();
2608        let updated = repo
2609            .update_status(random_id.to_string(), TransactionStatus::Confirmed)
2610            .await
2611            .unwrap();
2612        assert_eq!(updated.status, TransactionStatus::Confirmed);
2613    }
2614
2615    #[tokio::test]
2616    #[ignore = "Requires active Redis instance"]
2617    async fn test_partial_update() {
2618        let repo = setup_test_repo().await;
2619        let random_id = Uuid::new_v4().to_string();
2620        let tx = create_test_transaction(&random_id);
2621
2622        repo.create(tx).await.unwrap();
2623
2624        let update = TransactionUpdateRequest {
2625            status: Some(TransactionStatus::Sent),
2626            status_reason: Some("Transaction sent".to_string()),
2627            sent_at: Some("2025-01-27T16:00:00.000000+00:00".to_string()),
2628            confirmed_at: None,
2629            network_data: None,
2630            hashes: None,
2631            is_canceled: None,
2632            priced_at: None,
2633            noop_count: None,
2634            delete_at: None,
2635            metadata: None,
2636        };
2637
2638        let updated = repo
2639            .partial_update(random_id.to_string(), update)
2640            .await
2641            .unwrap();
2642        assert_eq!(updated.status, TransactionStatus::Sent);
2643        assert_eq!(updated.status_reason, Some("Transaction sent".to_string()));
2644        assert_eq!(
2645            updated.sent_at,
2646            Some("2025-01-27T16:00:00.000000+00:00".to_string())
2647        );
2648    }
2649
2650    #[tokio::test]
2651    #[ignore = "Requires active Redis instance"]
2652    async fn test_set_sent_at() {
2653        let repo = setup_test_repo().await;
2654        let random_id = Uuid::new_v4().to_string();
2655        let tx = create_test_transaction(&random_id);
2656
2657        repo.create(tx).await.unwrap();
2658        let updated = repo
2659            .set_sent_at(
2660                random_id.to_string(),
2661                "2025-01-27T16:00:00.000000+00:00".to_string(),
2662            )
2663            .await
2664            .unwrap();
2665        assert_eq!(
2666            updated.sent_at,
2667            Some("2025-01-27T16:00:00.000000+00:00".to_string())
2668        );
2669    }
2670
2671    #[tokio::test]
2672    #[ignore = "Requires active Redis instance"]
2673    async fn test_set_confirmed_at() {
2674        let repo = setup_test_repo().await;
2675        let random_id = Uuid::new_v4().to_string();
2676        let tx = create_test_transaction(&random_id);
2677
2678        repo.create(tx).await.unwrap();
2679        let updated = repo
2680            .set_confirmed_at(
2681                random_id.to_string(),
2682                "2025-01-27T16:00:00.000000+00:00".to_string(),
2683            )
2684            .await
2685            .unwrap();
2686        assert_eq!(
2687            updated.confirmed_at,
2688            Some("2025-01-27T16:00:00.000000+00:00".to_string())
2689        );
2690    }
2691
2692    #[tokio::test]
2693    #[ignore = "Requires active Redis instance"]
2694    async fn test_update_network_data() {
2695        let repo = setup_test_repo().await;
2696        let random_id = Uuid::new_v4().to_string();
2697        let tx = create_test_transaction(&random_id);
2698
2699        repo.create(tx).await.unwrap();
2700
2701        let new_network_data = NetworkTransactionData::Evm(EvmTransactionData {
2702            gas_price: Some(2000000000),
2703            gas_limit: Some(42000),
2704            nonce: Some(2),
2705            value: U256::from_str("2000000000000000000").unwrap(),
2706            data: Some("0x1234".to_string()),
2707            from: "0xNewSender".to_string(),
2708            to: Some("0xNewRecipient".to_string()),
2709            chain_id: 1,
2710            signature: None,
2711            hash: Some("0xnewhash".to_string()),
2712            speed: Some(Speed::SafeLow),
2713            max_fee_per_gas: None,
2714            max_priority_fee_per_gas: None,
2715            raw: None,
2716        });
2717
2718        let updated = repo
2719            .update_network_data(random_id.to_string(), new_network_data.clone())
2720            .await
2721            .unwrap();
2722        assert_eq!(
2723            updated
2724                .network_data
2725                .get_evm_transaction_data()
2726                .unwrap()
2727                .hash,
2728            new_network_data.get_evm_transaction_data().unwrap().hash
2729        );
2730    }
2731
2732    #[tokio::test]
2733    #[ignore = "Requires active Redis instance"]
2734    async fn test_debug_implementation() {
2735        let repo = setup_test_repo().await;
2736        let debug_str = format!("{repo:?}");
2737        assert!(debug_str.contains("RedisTransactionRepository"));
2738        assert!(debug_str.contains("test_prefix"));
2739    }
2740
2741    #[tokio::test]
2742    #[ignore = "Requires active Redis instance"]
2743    async fn test_error_handling_empty_id() {
2744        let repo = setup_test_repo().await;
2745
2746        let result = repo.get_by_id("".to_string()).await;
2747        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2748
2749        let result = repo
2750            .update("".to_string(), create_test_transaction("test"))
2751            .await;
2752        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2753
2754        let result = repo.delete_by_id("".to_string()).await;
2755        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2756    }
2757
2758    #[tokio::test]
2759    #[ignore = "Requires active Redis instance"]
2760    async fn test_pagination_validation() {
2761        let repo = setup_test_repo().await;
2762
2763        let query = PaginationQuery {
2764            page: 1,
2765            per_page: 0,
2766        };
2767        let result = repo.list_paginated(query).await;
2768        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
2769    }
2770
2771    #[tokio::test]
2772    #[ignore = "Requires active Redis instance"]
2773    async fn test_index_consistency() {
2774        let repo = setup_test_repo().await;
2775        let random_id = Uuid::new_v4().to_string();
2776        let relayer_id = Uuid::new_v4().to_string();
2777        let tx = create_test_transaction_with_nonce(&random_id, 42, &relayer_id);
2778
2779        // Create transaction
2780        repo.create(tx.clone()).await.unwrap();
2781
2782        // Verify it can be found by nonce
2783        let found = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2784        assert!(found.is_some());
2785
2786        // Update the transaction with a new nonce
2787        let mut updated_tx = tx.clone();
2788        if let NetworkTransactionData::Evm(ref mut evm_data) = updated_tx.network_data {
2789            evm_data.nonce = Some(43);
2790        }
2791
2792        repo.update(random_id.to_string(), updated_tx)
2793            .await
2794            .unwrap();
2795
2796        // Verify old nonce index is cleaned up
2797        let old_nonce_result = repo.find_by_nonce(&relayer_id, 42).await.unwrap();
2798        assert!(old_nonce_result.is_none());
2799
2800        // Verify new nonce index works
2801        let new_nonce_result = repo.find_by_nonce(&relayer_id, 43).await.unwrap();
2802        assert!(new_nonce_result.is_some());
2803    }
2804
2805    #[tokio::test]
2806    #[ignore = "Requires active Redis instance"]
2807    async fn test_has_entries() {
2808        let repo = setup_test_repo().await;
2809        assert!(!repo.has_entries().await.unwrap());
2810
2811        let tx_id = uuid::Uuid::new_v4().to_string();
2812        let tx = create_test_transaction(&tx_id);
2813        repo.create(tx.clone()).await.unwrap();
2814
2815        assert!(repo.has_entries().await.unwrap());
2816    }
2817
2818    #[tokio::test]
2819    #[ignore = "Requires active Redis instance"]
2820    async fn test_drop_all_entries() {
2821        let repo = setup_test_repo().await;
2822        let tx_id = uuid::Uuid::new_v4().to_string();
2823        let tx = create_test_transaction(&tx_id);
2824        repo.create(tx.clone()).await.unwrap();
2825        assert!(repo.has_entries().await.unwrap());
2826
2827        repo.drop_all_entries().await.unwrap();
2828        assert!(!repo.has_entries().await.unwrap());
2829    }
2830
2831    // Tests for delete_at field setting on final status updates
2832    #[tokio::test]
2833    #[ignore = "Requires active Redis instance"]
2834    async fn test_update_status_sets_delete_at_for_final_statuses() {
2835        let _lock = ENV_MUTEX.lock().await;
2836
2837        use chrono::{DateTime, Duration, Utc};
2838        use std::env;
2839
2840        // Use a unique test environment variable to avoid conflicts
2841        env::set_var("TRANSACTION_EXPIRATION_HOURS", "6");
2842
2843        let repo = setup_test_repo().await;
2844
2845        let final_statuses = [
2846            TransactionStatus::Canceled,
2847            TransactionStatus::Confirmed,
2848            TransactionStatus::Failed,
2849            TransactionStatus::Expired,
2850        ];
2851
2852        for (i, status) in final_statuses.iter().enumerate() {
2853            let tx_id = format!("test-final-{}-{}", i, Uuid::new_v4());
2854            let mut tx = create_test_transaction(&tx_id);
2855
2856            // Ensure transaction has no delete_at initially and is in pending state
2857            tx.delete_at = None;
2858            tx.status = TransactionStatus::Pending;
2859
2860            repo.create(tx).await.unwrap();
2861
2862            let before_update = Utc::now();
2863
2864            // Update to final status
2865            let updated = repo
2866                .update_status(tx_id.clone(), status.clone())
2867                .await
2868                .unwrap();
2869
2870            // Should have delete_at set
2871            assert!(
2872                updated.delete_at.is_some(),
2873                "delete_at should be set for status: {status:?}"
2874            );
2875
2876            // Verify the timestamp is reasonable (approximately 6 hours from now)
2877            let delete_at_str = updated.delete_at.unwrap();
2878            let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
2879                .expect("delete_at should be valid RFC3339")
2880                .with_timezone(&Utc);
2881
2882            let duration_from_before = delete_at.signed_duration_since(before_update);
2883            let expected_duration = Duration::hours(6);
2884            let tolerance = Duration::minutes(5);
2885
2886            assert!(
2887                duration_from_before >= expected_duration - tolerance
2888                    && duration_from_before <= expected_duration + tolerance,
2889                "delete_at should be approximately 6 hours from now for status: {status:?}. Duration: {duration_from_before:?}"
2890            );
2891        }
2892
2893        // Cleanup
2894        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2895    }
2896
2897    #[tokio::test]
2898    #[ignore = "Requires active Redis instance"]
2899    async fn test_update_status_does_not_set_delete_at_for_non_final_statuses() {
2900        let _lock = ENV_MUTEX.lock().await;
2901
2902        use std::env;
2903
2904        env::set_var("TRANSACTION_EXPIRATION_HOURS", "4");
2905
2906        let repo = setup_test_repo().await;
2907
2908        let non_final_statuses = [
2909            TransactionStatus::Pending,
2910            TransactionStatus::Sent,
2911            TransactionStatus::Submitted,
2912            TransactionStatus::Mined,
2913        ];
2914
2915        for (i, status) in non_final_statuses.iter().enumerate() {
2916            let tx_id = format!("test-non-final-{}-{}", i, Uuid::new_v4());
2917            let mut tx = create_test_transaction(&tx_id);
2918            tx.delete_at = None;
2919            tx.status = TransactionStatus::Pending;
2920
2921            repo.create(tx).await.unwrap();
2922
2923            // Update to non-final status
2924            let updated = repo
2925                .update_status(tx_id.clone(), status.clone())
2926                .await
2927                .unwrap();
2928
2929            // Should NOT have delete_at set
2930            assert!(
2931                updated.delete_at.is_none(),
2932                "delete_at should NOT be set for status: {status:?}"
2933            );
2934        }
2935
2936        // Cleanup
2937        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
2938    }
2939
2940    #[tokio::test]
2941    #[ignore = "Requires active Redis instance"]
2942    async fn test_partial_update_sets_delete_at_for_final_statuses() {
2943        let _lock = ENV_MUTEX.lock().await;
2944
2945        use chrono::{DateTime, Duration, Utc};
2946        use std::env;
2947
2948        env::set_var("TRANSACTION_EXPIRATION_HOURS", "8");
2949
2950        let repo = setup_test_repo().await;
2951        let tx_id = format!("test-partial-final-{}", Uuid::new_v4());
2952        let mut tx = create_test_transaction(&tx_id);
2953        tx.delete_at = None;
2954        tx.status = TransactionStatus::Pending;
2955
2956        repo.create(tx).await.unwrap();
2957
2958        let before_update = Utc::now();
2959
2960        // Use partial_update to set status to Confirmed (final status)
2961        let update = TransactionUpdateRequest {
2962            status: Some(TransactionStatus::Confirmed),
2963            status_reason: Some("Transaction completed".to_string()),
2964            confirmed_at: Some("2023-01-01T12:05:00Z".to_string()),
2965            ..Default::default()
2966        };
2967
2968        let updated = repo.partial_update(tx_id.clone(), update).await.unwrap();
2969
2970        // Should have delete_at set
2971        assert!(
2972            updated.delete_at.is_some(),
2973            "delete_at should be set when updating to Confirmed status"
2974        );
2975
2976        // Verify the timestamp is reasonable (approximately 8 hours from now)
2977        let delete_at_str = updated.delete_at.unwrap();
2978        let delete_at = DateTime::parse_from_rfc3339(&delete_at_str)
2979            .expect("delete_at should be valid RFC3339")
2980            .with_timezone(&Utc);
2981
2982        let duration_from_before = delete_at.signed_duration_since(before_update);
2983        let expected_duration = Duration::hours(8);
2984        let tolerance = Duration::minutes(5);
2985
2986        assert!(
2987            duration_from_before >= expected_duration - tolerance
2988                && duration_from_before <= expected_duration + tolerance,
2989            "delete_at should be approximately 8 hours from now. Duration: {duration_from_before:?}"
2990        );
2991
2992        // Also verify other fields were updated
2993        assert_eq!(updated.status, TransactionStatus::Confirmed);
2994        assert_eq!(
2995            updated.status_reason,
2996            Some("Transaction completed".to_string())
2997        );
2998        assert_eq!(
2999            updated.confirmed_at,
3000            Some("2023-01-01T12:05:00Z".to_string())
3001        );
3002
3003        // Cleanup
3004        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3005    }
3006
3007    #[tokio::test]
3008    #[ignore = "Requires active Redis instance"]
3009    async fn test_update_status_preserves_existing_delete_at() {
3010        let _lock = ENV_MUTEX.lock().await;
3011
3012        use std::env;
3013
3014        env::set_var("TRANSACTION_EXPIRATION_HOURS", "2");
3015
3016        let repo = setup_test_repo().await;
3017        let tx_id = format!("test-preserve-delete-at-{}", Uuid::new_v4());
3018        let mut tx = create_test_transaction(&tx_id);
3019
3020        // Set an existing delete_at value
3021        let existing_delete_at = "2025-01-01T12:00:00Z".to_string();
3022        tx.delete_at = Some(existing_delete_at.clone());
3023        tx.status = TransactionStatus::Pending;
3024
3025        repo.create(tx).await.unwrap();
3026
3027        // Update to final status
3028        let updated = repo
3029            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3030            .await
3031            .unwrap();
3032
3033        // Should preserve the existing delete_at value
3034        assert_eq!(
3035            updated.delete_at,
3036            Some(existing_delete_at),
3037            "Existing delete_at should be preserved when updating to final status"
3038        );
3039
3040        // Cleanup
3041        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3042    }
3043    #[tokio::test]
3044    #[ignore = "Requires active Redis instance"]
3045    async fn test_partial_update_without_status_change_preserves_delete_at() {
3046        let _lock = ENV_MUTEX.lock().await;
3047
3048        use std::env;
3049
3050        env::set_var("TRANSACTION_EXPIRATION_HOURS", "3");
3051
3052        let repo = setup_test_repo().await;
3053        let tx_id = format!("test-preserve-no-status-{}", Uuid::new_v4());
3054        let mut tx = create_test_transaction(&tx_id);
3055        tx.delete_at = None;
3056        tx.status = TransactionStatus::Pending;
3057
3058        repo.create(tx).await.unwrap();
3059
3060        // First, update to final status to set delete_at
3061        let updated1 = repo
3062            .update_status(tx_id.clone(), TransactionStatus::Confirmed)
3063            .await
3064            .unwrap();
3065
3066        assert!(updated1.delete_at.is_some());
3067        let original_delete_at = updated1.delete_at.clone();
3068
3069        // Now update other fields without changing status
3070        let update = TransactionUpdateRequest {
3071            status: None, // No status change
3072            status_reason: Some("Updated reason".to_string()),
3073            confirmed_at: Some("2023-01-01T12:10:00Z".to_string()),
3074            ..Default::default()
3075        };
3076
3077        let updated2 = repo.partial_update(tx_id.clone(), update).await.unwrap();
3078
3079        // delete_at should be preserved
3080        assert_eq!(
3081            updated2.delete_at, original_delete_at,
3082            "delete_at should be preserved when status is not updated"
3083        );
3084
3085        // Other fields should be updated
3086        assert_eq!(updated2.status, TransactionStatus::Confirmed); // Unchanged
3087        assert_eq!(updated2.status_reason, Some("Updated reason".to_string()));
3088        assert_eq!(
3089            updated2.confirmed_at,
3090            Some("2023-01-01T12:10:00Z".to_string())
3091        );
3092
3093        // Cleanup
3094        env::remove_var("TRANSACTION_EXPIRATION_HOURS");
3095    }
3096
3097    // Tests for delete_by_ids batch delete functionality
3098
3099    #[tokio::test]
3100    #[ignore = "Requires active Redis instance"]
3101    async fn test_delete_by_ids_empty_list() {
3102        let repo = setup_test_repo().await;
3103        let tx_id = format!("test-empty-{}", Uuid::new_v4());
3104
3105        // Create a transaction to ensure repo is not empty
3106        let tx = create_test_transaction(&tx_id);
3107        repo.create(tx).await.unwrap();
3108
3109        // Delete with empty list should succeed and not affect existing data
3110        let result = repo.delete_by_ids(vec![]).await.unwrap();
3111
3112        assert_eq!(result.deleted_count, 0);
3113        assert!(result.failed.is_empty());
3114
3115        // Original transaction should still exist
3116        assert!(repo.get_by_id(tx_id).await.is_ok());
3117    }
3118
3119    #[tokio::test]
3120    #[ignore = "Requires active Redis instance"]
3121    async fn test_delete_by_ids_single_transaction() {
3122        let repo = setup_test_repo().await;
3123        let tx_id = format!("test-single-{}", Uuid::new_v4());
3124
3125        let tx = create_test_transaction(&tx_id);
3126        repo.create(tx).await.unwrap();
3127
3128        let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3129
3130        assert_eq!(result.deleted_count, 1);
3131        assert!(result.failed.is_empty());
3132
3133        // Verify transaction was deleted
3134        assert!(repo.get_by_id(tx_id).await.is_err());
3135    }
3136
3137    #[tokio::test]
3138    #[ignore = "Requires active Redis instance"]
3139    async fn test_delete_by_ids_multiple_transactions() {
3140        let repo = setup_test_repo().await;
3141        let base_id = Uuid::new_v4();
3142
3143        // Create multiple transactions
3144        let mut created_ids = Vec::new();
3145        for i in 1..=5 {
3146            let tx_id = format!("test-multi-{base_id}-{i}");
3147            let tx = create_test_transaction(&tx_id);
3148            repo.create(tx).await.unwrap();
3149            created_ids.push(tx_id);
3150        }
3151
3152        // Delete 3 of them
3153        let ids_to_delete = vec![
3154            created_ids[0].clone(),
3155            created_ids[2].clone(),
3156            created_ids[4].clone(),
3157        ];
3158        let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3159
3160        assert_eq!(result.deleted_count, 3);
3161        assert!(result.failed.is_empty());
3162
3163        // Verify correct transactions were deleted
3164        assert!(repo.get_by_id(created_ids[0].clone()).await.is_err());
3165        assert!(repo.get_by_id(created_ids[1].clone()).await.is_ok()); // Not deleted
3166        assert!(repo.get_by_id(created_ids[2].clone()).await.is_err());
3167        assert!(repo.get_by_id(created_ids[3].clone()).await.is_ok()); // Not deleted
3168        assert!(repo.get_by_id(created_ids[4].clone()).await.is_err());
3169    }
3170
3171    #[tokio::test]
3172    #[ignore = "Requires active Redis instance"]
3173    async fn test_delete_by_ids_nonexistent_transactions() {
3174        let repo = setup_test_repo().await;
3175        let base_id = Uuid::new_v4();
3176
3177        // Try to delete transactions that don't exist
3178        let ids_to_delete = vec![
3179            format!("nonexistent-{}-1", base_id),
3180            format!("nonexistent-{}-2", base_id),
3181        ];
3182        let result = repo.delete_by_ids(ids_to_delete.clone()).await.unwrap();
3183
3184        assert_eq!(result.deleted_count, 0);
3185        assert_eq!(result.failed.len(), 2);
3186
3187        // Verify error messages contain the IDs
3188        let failed_ids: Vec<&String> = result.failed.iter().map(|(id, _)| id).collect();
3189        assert!(failed_ids.contains(&&ids_to_delete[0]));
3190        assert!(failed_ids.contains(&&ids_to_delete[1]));
3191    }
3192
3193    #[tokio::test]
3194    #[ignore = "Requires active Redis instance"]
3195    async fn test_delete_by_ids_mixed_existing_and_nonexistent() {
3196        let repo = setup_test_repo().await;
3197        let base_id = Uuid::new_v4();
3198
3199        // Create some transactions
3200        let existing_ids: Vec<String> = (1..=3)
3201            .map(|i| format!("test-mixed-existing-{base_id}-{i}"))
3202            .collect();
3203
3204        for id in &existing_ids {
3205            let tx = create_test_transaction(id);
3206            repo.create(tx).await.unwrap();
3207        }
3208
3209        let nonexistent_ids: Vec<String> = (1..=2)
3210            .map(|i| format!("test-mixed-nonexistent-{base_id}-{i}"))
3211            .collect();
3212
3213        // Try to delete mix of existing and non-existing
3214        let ids_to_delete = vec![
3215            existing_ids[0].clone(),
3216            nonexistent_ids[0].clone(),
3217            existing_ids[1].clone(),
3218            nonexistent_ids[1].clone(),
3219        ];
3220        let result = repo.delete_by_ids(ids_to_delete).await.unwrap();
3221
3222        assert_eq!(result.deleted_count, 2);
3223        assert_eq!(result.failed.len(), 2);
3224
3225        // Verify existing transactions were deleted
3226        assert!(repo.get_by_id(existing_ids[0].clone()).await.is_err());
3227        assert!(repo.get_by_id(existing_ids[1].clone()).await.is_err());
3228
3229        // Verify remaining transaction still exists
3230        assert!(repo.get_by_id(existing_ids[2].clone()).await.is_ok());
3231    }
3232
3233    #[tokio::test]
3234    #[ignore = "Requires active Redis instance"]
3235    async fn test_delete_by_ids_removes_all_indexes() {
3236        let repo = setup_test_repo().await;
3237        let relayer_id = format!("relayer-{}", Uuid::new_v4());
3238        let tx_id = format!("test-indexes-{}", Uuid::new_v4());
3239
3240        // Create a transaction with specific status
3241        let mut tx = create_test_transaction(&tx_id);
3242        tx.relayer_id = relayer_id.clone();
3243        tx.status = TransactionStatus::Confirmed;
3244        repo.create(tx).await.unwrap();
3245
3246        // Verify transaction exists and is indexed
3247        let found = repo
3248            .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3249            .await
3250            .unwrap();
3251        assert!(found.iter().any(|t| t.id == tx_id));
3252
3253        // Delete the transaction
3254        let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3255        assert_eq!(result.deleted_count, 1);
3256
3257        // Verify transaction is no longer in status index
3258        let found_after = repo
3259            .find_by_status(&relayer_id, &[TransactionStatus::Confirmed])
3260            .await
3261            .unwrap();
3262        assert!(!found_after.iter().any(|t| t.id == tx_id));
3263
3264        // Verify transaction cannot be found
3265        assert!(repo.get_by_id(tx_id).await.is_err());
3266    }
3267
3268    #[tokio::test]
3269    #[ignore = "Requires active Redis instance"]
3270    async fn test_delete_by_ids_removes_nonce_index() {
3271        let repo = setup_test_repo().await;
3272        let relayer_id = format!("relayer-{}", Uuid::new_v4());
3273        let tx_id = format!("test-nonce-{}", Uuid::new_v4());
3274        let nonce = 12345u64;
3275
3276        // Create a transaction with a specific nonce
3277        let tx = create_test_transaction_with_nonce(&tx_id, nonce, &relayer_id);
3278        repo.create(tx).await.unwrap();
3279
3280        // Verify nonce index works
3281        let found = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3282        assert!(found.is_some());
3283        assert_eq!(found.unwrap().id, tx_id);
3284
3285        // Delete the transaction
3286        let result = repo.delete_by_ids(vec![tx_id.clone()]).await.unwrap();
3287        assert_eq!(result.deleted_count, 1);
3288
3289        // Verify nonce index was cleaned up
3290        let found_after = repo.find_by_nonce(&relayer_id, nonce).await.unwrap();
3291        assert!(found_after.is_none());
3292    }
3293
3294    #[tokio::test]
3295    #[ignore = "Requires active Redis instance"]
3296    async fn test_delete_by_ids_large_batch() {
3297        let repo = setup_test_repo().await;
3298        let base_id = Uuid::new_v4();
3299
3300        // Create many transactions to test batch performance
3301        let count = 50;
3302        let mut created_ids = Vec::new();
3303
3304        for i in 0..count {
3305            let tx_id = format!("test-large-{base_id}-{i}");
3306            let tx = create_test_transaction(&tx_id);
3307            repo.create(tx).await.unwrap();
3308            created_ids.push(tx_id);
3309        }
3310
3311        // Delete all of them in one batch
3312        let result = repo.delete_by_ids(created_ids.clone()).await.unwrap();
3313
3314        assert_eq!(result.deleted_count, count);
3315        assert!(result.failed.is_empty());
3316
3317        // Verify all were deleted
3318        for id in created_ids {
3319            assert!(repo.get_by_id(id).await.is_err());
3320        }
3321    }
3322
3323    #[tokio::test]
3324    #[ignore = "Requires active Redis instance"]
3325    async fn test_delete_by_ids_preserves_other_relayer_transactions() {
3326        let repo = setup_test_repo().await;
3327        let relayer_1 = format!("relayer-1-{}", Uuid::new_v4());
3328        let relayer_2 = format!("relayer-2-{}", Uuid::new_v4());
3329        let tx_id_1 = format!("tx-relayer-1-{}", Uuid::new_v4());
3330        let tx_id_2 = format!("tx-relayer-2-{}", Uuid::new_v4());
3331
3332        // Create transactions for different relayers
3333        let tx1 = create_test_transaction_with_relayer(&tx_id_1, &relayer_1);
3334        let tx2 = create_test_transaction_with_relayer(&tx_id_2, &relayer_2);
3335
3336        repo.create(tx1).await.unwrap();
3337        repo.create(tx2).await.unwrap();
3338
3339        // Delete only relayer-1's transaction
3340        let result = repo.delete_by_ids(vec![tx_id_1.clone()]).await.unwrap();
3341
3342        assert_eq!(result.deleted_count, 1);
3343
3344        // relayer-1's transaction should be deleted
3345        assert!(repo.get_by_id(tx_id_1).await.is_err());
3346
3347        // relayer-2's transaction should still exist
3348        let remaining = repo.get_by_id(tx_id_2).await.unwrap();
3349        assert_eq!(remaining.relayer_id, relayer_2);
3350    }
3351}