openzeppelin_relayer/repositories/transaction/
mod.rs

1//! Transaction Repository Module
2//!
3//! This module provides the transaction repository layer for the OpenZeppelin Relayer service.
4//! It implements the Repository pattern to abstract transaction data persistence operations,
5//! supporting both in-memory and Redis-backed storage implementations.
6//!
7//! ## Features
8//!
9//! - **CRUD Operations**: Create, read, update, and delete transactions
10//! - **Specialized Queries**: Find transactions by relayer ID, status, and nonce
11//! - **Pagination Support**: Efficient paginated listing of transactions
12//! - **Status Management**: Update transaction status and timestamps
13//! - **Partial Updates**: Support for partial transaction updates
14//! - **Network Data**: Manage transaction network-specific data
15//!
16//! ## Repository Implementations
17//!
18//! - [`InMemoryTransactionRepository`]: Fast in-memory storage for testing/development
19//! - [`RedisTransactionRepository`]: Redis-backed storage for production environments
20//!
21mod transaction_in_memory;
22mod transaction_redis;
23
24pub use transaction_in_memory::*;
25pub use transaction_redis::*;
26
27use crate::{
28    models::{
29        NetworkTransactionData, TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
30    },
31    repositories::{BatchDeleteResult, TransactionDeleteRequest, *},
32    utils::RedisConnections,
33};
34use async_trait::async_trait;
35use eyre::Result;
36use std::sync::Arc;
37
38/// A trait defining transaction repository operations
39#[async_trait]
40pub trait TransactionRepository: Repository<TransactionRepoModel, String> {
41    /// Returns underlying storage Redis connections when available.
42    ///
43    /// In-memory implementations return `None`.
44    fn connection_info(&self) -> Option<(Arc<RedisConnections>, String)> {
45        None
46    }
47
48    /// Find transactions by relayer ID with pagination
49    async fn find_by_relayer_id(
50        &self,
51        relayer_id: &str,
52        query: PaginationQuery,
53    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
54
55    /// Find transactions by relayer ID and status(es).
56    ///
57    /// Results are sorted by created_at descending (newest first).
58    async fn find_by_status(
59        &self,
60        relayer_id: &str,
61        statuses: &[TransactionStatus],
62    ) -> Result<Vec<TransactionRepoModel>, RepositoryError>;
63
64    /// Find transactions by relayer ID and status(es) with pagination.
65    ///
66    /// Results are sorted by timestamp:
67    /// - For Confirmed transactions: sorted by confirmed_at (on-chain confirmation order)
68    /// - For all other statuses: sorted by created_at (queue/processing order)
69    ///
70    /// The `oldest_first` parameter controls sort direction:
71    /// - `false` (default): newest first (descending) - for displaying recent transactions
72    /// - `true`: oldest first (ascending) - for FIFO queue processing
73    ///
74    /// For multi-status queries, transactions are merged and sorted using the same rules,
75    /// ensuring consistent ordering across different statuses.
76    async fn find_by_status_paginated(
77        &self,
78        relayer_id: &str,
79        statuses: &[TransactionStatus],
80        query: PaginationQuery,
81        oldest_first: bool,
82    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
83
84    /// Find a transaction by relayer ID and nonce
85    async fn find_by_nonce(
86        &self,
87        relayer_id: &str,
88        nonce: u64,
89    ) -> Result<Option<TransactionRepoModel>, RepositoryError>;
90
91    /// Update the status of a transaction
92    async fn update_status(
93        &self,
94        tx_id: String,
95        status: TransactionStatus,
96    ) -> Result<TransactionRepoModel, RepositoryError>;
97
98    /// Partially update a transaction
99    async fn partial_update(
100        &self,
101        tx_id: String,
102        update: TransactionUpdateRequest,
103    ) -> Result<TransactionRepoModel, RepositoryError>;
104
105    /// Update the network data of a transaction
106    async fn update_network_data(
107        &self,
108        tx_id: String,
109        network_data: NetworkTransactionData,
110    ) -> Result<TransactionRepoModel, RepositoryError>;
111
112    /// Set the sent_at timestamp of a transaction
113    async fn set_sent_at(
114        &self,
115        tx_id: String,
116        sent_at: String,
117    ) -> Result<TransactionRepoModel, RepositoryError>;
118
119    /// Set the confirmed_at timestamp of a transaction
120    async fn set_confirmed_at(
121        &self,
122        tx_id: String,
123        confirmed_at: String,
124    ) -> Result<TransactionRepoModel, RepositoryError>;
125
126    /// Count transactions by status(es) without fetching full transaction data.
127    /// This is an optimized O(1) operation in Redis using ZCARD.
128    async fn count_by_status(
129        &self,
130        relayer_id: &str,
131        statuses: &[TransactionStatus],
132    ) -> Result<u64, RepositoryError>;
133
134    /// Delete multiple transactions by their IDs in a single batch operation.
135    ///
136    /// This is more efficient than calling `delete_by_id` multiple times as it
137    /// reduces the number of round-trips to the storage backend.
138    ///
139    /// Note: This method requires fetching transaction data first to clean up indexes.
140    /// If you already have transaction data, use `delete_by_requests` instead for
141    /// better performance.
142    ///
143    /// # Arguments
144    /// * `ids` - List of transaction IDs to delete
145    ///
146    /// # Returns
147    /// * `BatchDeleteResult` containing the count of successful deletions and any failures
148    async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError>;
149
150    /// Delete multiple transactions using pre-extracted data.
151    ///
152    /// This is the most efficient batch delete method as it doesn't require
153    /// re-fetching transaction data. Use this when you already have the transaction
154    /// data (e.g., from a previous query).
155    ///
156    /// # Arguments
157    /// * `requests` - List of delete requests containing transaction data needed for cleanup
158    ///
159    /// # Returns
160    /// * `BatchDeleteResult` containing the count of successful deletions and any failures
161    async fn delete_by_requests(
162        &self,
163        requests: Vec<TransactionDeleteRequest>,
164    ) -> Result<BatchDeleteResult, RepositoryError>;
165}
166
167#[cfg(test)]
168mockall::mock! {
169  pub TransactionRepository {}
170
171  #[async_trait]
172  impl Repository<TransactionRepoModel, String> for TransactionRepository {
173      async fn create(&self, entity: TransactionRepoModel) -> Result<TransactionRepoModel, RepositoryError>;
174      async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError>;
175      async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError>;
176      async fn list_paginated(&self, query: PaginationQuery) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
177      async fn update(&self, id: String, entity: TransactionRepoModel) -> Result<TransactionRepoModel, RepositoryError>;
178      async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError>;
179      async fn count(&self) -> Result<usize, RepositoryError>;
180      async fn has_entries(&self) -> Result<bool, RepositoryError>;
181      async fn drop_all_entries(&self) -> Result<(), RepositoryError>;
182  }
183
184  #[async_trait]
185  impl TransactionRepository for TransactionRepository {
186      fn connection_info(&self) -> Option<(Arc<RedisConnections>, String)>;
187      async fn find_by_relayer_id(&self, relayer_id: &str, query: PaginationQuery) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
188      async fn find_by_status(&self, relayer_id: &str, statuses: &[TransactionStatus]) -> Result<Vec<TransactionRepoModel>, RepositoryError>;
189      async fn find_by_status_paginated(&self, relayer_id: &str, statuses: &[TransactionStatus], query: PaginationQuery, oldest_first: bool) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError>;
190      async fn find_by_nonce(&self, relayer_id: &str, nonce: u64) -> Result<Option<TransactionRepoModel>, RepositoryError>;
191      async fn update_status(&self, tx_id: String, status: TransactionStatus) -> Result<TransactionRepoModel, RepositoryError>;
192      async fn partial_update(&self, tx_id: String, update: TransactionUpdateRequest) -> Result<TransactionRepoModel, RepositoryError>;
193      async fn update_network_data(&self, tx_id: String, network_data: NetworkTransactionData) -> Result<TransactionRepoModel, RepositoryError>;
194      async fn set_sent_at(&self, tx_id: String, sent_at: String) -> Result<TransactionRepoModel, RepositoryError>;
195      async fn set_confirmed_at(&self, tx_id: String, confirmed_at: String) -> Result<TransactionRepoModel, RepositoryError>;
196      async fn count_by_status(&self, relayer_id: &str, statuses: &[TransactionStatus]) -> Result<u64, RepositoryError>;
197      async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError>;
198      async fn delete_by_requests(&self, requests: Vec<TransactionDeleteRequest>) -> Result<BatchDeleteResult, RepositoryError>;
199  }
200}
201
202/// Enum wrapper for different transaction repository implementations
203#[derive(Debug, Clone)]
204pub enum TransactionRepositoryStorage {
205    InMemory(InMemoryTransactionRepository),
206    Redis(RedisTransactionRepository),
207}
208
209impl TransactionRepositoryStorage {
210    pub fn new_in_memory() -> Self {
211        Self::InMemory(InMemoryTransactionRepository::new())
212    }
213    pub fn new_redis(
214        connections: Arc<RedisConnections>,
215        key_prefix: String,
216    ) -> Result<Self, RepositoryError> {
217        Ok(Self::Redis(RedisTransactionRepository::new(
218            connections,
219            key_prefix,
220        )?))
221    }
222
223    /// Returns underlying Redis connections if this is a persistent storage backend.
224    ///
225    /// This is useful for operations that need direct storage access, such as
226    /// distributed locking and health checks.
227    ///
228    /// # Returns
229    /// * `Some((connections, key_prefix))` - If using persistent Redis storage
230    /// * `None` - If using in-memory storage
231    pub fn connection_info(&self) -> Option<(Arc<RedisConnections>, &str)> {
232        match self {
233            TransactionRepositoryStorage::InMemory(_) => None,
234            TransactionRepositoryStorage::Redis(repo) => {
235                Some((repo.connections.clone(), &repo.key_prefix))
236            }
237        }
238    }
239
240    /// Returns key prefix used by persistent storage backends.
241    pub fn key_prefix(&self) -> Option<&str> {
242        match self {
243            TransactionRepositoryStorage::InMemory(_) => None,
244            TransactionRepositoryStorage::Redis(repo) => Some(&repo.key_prefix),
245        }
246    }
247}
248
249#[async_trait]
250impl TransactionRepository for TransactionRepositoryStorage {
251    fn connection_info(&self) -> Option<(Arc<RedisConnections>, String)> {
252        TransactionRepositoryStorage::connection_info(self)
253            .map(|(connections, key_prefix)| (connections, key_prefix.to_string()))
254    }
255
256    async fn find_by_relayer_id(
257        &self,
258        relayer_id: &str,
259        query: PaginationQuery,
260    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
261        match self {
262            TransactionRepositoryStorage::InMemory(repo) => {
263                repo.find_by_relayer_id(relayer_id, query).await
264            }
265            TransactionRepositoryStorage::Redis(repo) => {
266                repo.find_by_relayer_id(relayer_id, query).await
267            }
268        }
269    }
270
271    async fn find_by_status(
272        &self,
273        relayer_id: &str,
274        statuses: &[TransactionStatus],
275    ) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
276        match self {
277            TransactionRepositoryStorage::InMemory(repo) => {
278                repo.find_by_status(relayer_id, statuses).await
279            }
280            TransactionRepositoryStorage::Redis(repo) => {
281                repo.find_by_status(relayer_id, statuses).await
282            }
283        }
284    }
285
286    async fn find_by_status_paginated(
287        &self,
288        relayer_id: &str,
289        statuses: &[TransactionStatus],
290        query: PaginationQuery,
291        oldest_first: bool,
292    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
293        match self {
294            TransactionRepositoryStorage::InMemory(repo) => {
295                repo.find_by_status_paginated(relayer_id, statuses, query, oldest_first)
296                    .await
297            }
298            TransactionRepositoryStorage::Redis(repo) => {
299                repo.find_by_status_paginated(relayer_id, statuses, query, oldest_first)
300                    .await
301            }
302        }
303    }
304
305    async fn find_by_nonce(
306        &self,
307        relayer_id: &str,
308        nonce: u64,
309    ) -> Result<Option<TransactionRepoModel>, RepositoryError> {
310        match self {
311            TransactionRepositoryStorage::InMemory(repo) => {
312                repo.find_by_nonce(relayer_id, nonce).await
313            }
314            TransactionRepositoryStorage::Redis(repo) => {
315                repo.find_by_nonce(relayer_id, nonce).await
316            }
317        }
318    }
319
320    async fn update_status(
321        &self,
322        tx_id: String,
323        status: TransactionStatus,
324    ) -> Result<TransactionRepoModel, RepositoryError> {
325        match self {
326            TransactionRepositoryStorage::InMemory(repo) => repo.update_status(tx_id, status).await,
327            TransactionRepositoryStorage::Redis(repo) => repo.update_status(tx_id, status).await,
328        }
329    }
330
331    async fn partial_update(
332        &self,
333        tx_id: String,
334        update: TransactionUpdateRequest,
335    ) -> Result<TransactionRepoModel, RepositoryError> {
336        match self {
337            TransactionRepositoryStorage::InMemory(repo) => {
338                repo.partial_update(tx_id, update).await
339            }
340            TransactionRepositoryStorage::Redis(repo) => repo.partial_update(tx_id, update).await,
341        }
342    }
343
344    async fn update_network_data(
345        &self,
346        tx_id: String,
347        network_data: NetworkTransactionData,
348    ) -> Result<TransactionRepoModel, RepositoryError> {
349        match self {
350            TransactionRepositoryStorage::InMemory(repo) => {
351                repo.update_network_data(tx_id, network_data).await
352            }
353            TransactionRepositoryStorage::Redis(repo) => {
354                repo.update_network_data(tx_id, network_data).await
355            }
356        }
357    }
358
359    async fn set_sent_at(
360        &self,
361        tx_id: String,
362        sent_at: String,
363    ) -> Result<TransactionRepoModel, RepositoryError> {
364        match self {
365            TransactionRepositoryStorage::InMemory(repo) => repo.set_sent_at(tx_id, sent_at).await,
366            TransactionRepositoryStorage::Redis(repo) => repo.set_sent_at(tx_id, sent_at).await,
367        }
368    }
369
370    async fn set_confirmed_at(
371        &self,
372        tx_id: String,
373        confirmed_at: String,
374    ) -> Result<TransactionRepoModel, RepositoryError> {
375        match self {
376            TransactionRepositoryStorage::InMemory(repo) => {
377                repo.set_confirmed_at(tx_id, confirmed_at).await
378            }
379            TransactionRepositoryStorage::Redis(repo) => {
380                repo.set_confirmed_at(tx_id, confirmed_at).await
381            }
382        }
383    }
384
385    async fn count_by_status(
386        &self,
387        relayer_id: &str,
388        statuses: &[TransactionStatus],
389    ) -> Result<u64, RepositoryError> {
390        match self {
391            TransactionRepositoryStorage::InMemory(repo) => {
392                repo.count_by_status(relayer_id, statuses).await
393            }
394            TransactionRepositoryStorage::Redis(repo) => {
395                repo.count_by_status(relayer_id, statuses).await
396            }
397        }
398    }
399
400    async fn delete_by_ids(&self, ids: Vec<String>) -> Result<BatchDeleteResult, RepositoryError> {
401        match self {
402            TransactionRepositoryStorage::InMemory(repo) => repo.delete_by_ids(ids).await,
403            TransactionRepositoryStorage::Redis(repo) => repo.delete_by_ids(ids).await,
404        }
405    }
406
407    async fn delete_by_requests(
408        &self,
409        requests: Vec<TransactionDeleteRequest>,
410    ) -> Result<BatchDeleteResult, RepositoryError> {
411        match self {
412            TransactionRepositoryStorage::InMemory(repo) => repo.delete_by_requests(requests).await,
413            TransactionRepositoryStorage::Redis(repo) => repo.delete_by_requests(requests).await,
414        }
415    }
416}
417
418#[async_trait]
419impl Repository<TransactionRepoModel, String> for TransactionRepositoryStorage {
420    async fn create(
421        &self,
422        entity: TransactionRepoModel,
423    ) -> Result<TransactionRepoModel, RepositoryError> {
424        match self {
425            TransactionRepositoryStorage::InMemory(repo) => repo.create(entity).await,
426            TransactionRepositoryStorage::Redis(repo) => repo.create(entity).await,
427        }
428    }
429
430    async fn get_by_id(&self, id: String) -> Result<TransactionRepoModel, RepositoryError> {
431        match self {
432            TransactionRepositoryStorage::InMemory(repo) => repo.get_by_id(id).await,
433            TransactionRepositoryStorage::Redis(repo) => repo.get_by_id(id).await,
434        }
435    }
436
437    async fn list_all(&self) -> Result<Vec<TransactionRepoModel>, RepositoryError> {
438        match self {
439            TransactionRepositoryStorage::InMemory(repo) => repo.list_all().await,
440            TransactionRepositoryStorage::Redis(repo) => repo.list_all().await,
441        }
442    }
443
444    async fn list_paginated(
445        &self,
446        query: PaginationQuery,
447    ) -> Result<PaginatedResult<TransactionRepoModel>, RepositoryError> {
448        match self {
449            TransactionRepositoryStorage::InMemory(repo) => repo.list_paginated(query).await,
450            TransactionRepositoryStorage::Redis(repo) => repo.list_paginated(query).await,
451        }
452    }
453
454    async fn update(
455        &self,
456        id: String,
457        entity: TransactionRepoModel,
458    ) -> Result<TransactionRepoModel, RepositoryError> {
459        match self {
460            TransactionRepositoryStorage::InMemory(repo) => repo.update(id, entity).await,
461            TransactionRepositoryStorage::Redis(repo) => repo.update(id, entity).await,
462        }
463    }
464
465    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
466        match self {
467            TransactionRepositoryStorage::InMemory(repo) => repo.delete_by_id(id).await,
468            TransactionRepositoryStorage::Redis(repo) => repo.delete_by_id(id).await,
469        }
470    }
471
472    async fn count(&self) -> Result<usize, RepositoryError> {
473        match self {
474            TransactionRepositoryStorage::InMemory(repo) => repo.count().await,
475            TransactionRepositoryStorage::Redis(repo) => repo.count().await,
476        }
477    }
478
479    async fn has_entries(&self) -> Result<bool, RepositoryError> {
480        match self {
481            TransactionRepositoryStorage::InMemory(repo) => repo.has_entries().await,
482            TransactionRepositoryStorage::Redis(repo) => repo.has_entries().await,
483        }
484    }
485
486    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
487        match self {
488            TransactionRepositoryStorage::InMemory(repo) => repo.drop_all_entries().await,
489            TransactionRepositoryStorage::Redis(repo) => repo.drop_all_entries().await,
490        }
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use chrono::Utc;
497    use color_eyre::Result;
498    use deadpool_redis::{Config, Runtime};
499
500    use super::*;
501    use crate::models::{
502        EvmTransactionData, NetworkTransactionData, TransactionStatus, TransactionUpdateRequest,
503    };
504    use crate::repositories::PaginationQuery;
505    use crate::utils::mocks::mockutils::create_mock_transaction;
506
507    fn create_test_transaction(id: &str, relayer_id: &str) -> TransactionRepoModel {
508        let mut transaction = create_mock_transaction();
509        transaction.id = id.to_string();
510        transaction.relayer_id = relayer_id.to_string();
511        transaction
512    }
513
514    fn create_test_transaction_with_status(
515        id: &str,
516        relayer_id: &str,
517        status: TransactionStatus,
518    ) -> TransactionRepoModel {
519        let mut transaction = create_test_transaction(id, relayer_id);
520        transaction.status = status;
521        transaction
522    }
523
524    fn create_test_transaction_with_nonce(
525        id: &str,
526        relayer_id: &str,
527        nonce: u64,
528    ) -> TransactionRepoModel {
529        let mut transaction = create_test_transaction(id, relayer_id);
530        if let NetworkTransactionData::Evm(ref mut evm_data) = transaction.network_data {
531            evm_data.nonce = Some(nonce);
532        }
533        transaction
534    }
535
536    fn create_test_update_request() -> TransactionUpdateRequest {
537        TransactionUpdateRequest {
538            status: Some(TransactionStatus::Sent),
539            status_reason: Some("Test reason".to_string()),
540            sent_at: Some(Utc::now().to_string()),
541            confirmed_at: None,
542            network_data: None,
543            priced_at: None,
544            hashes: Some(vec!["test_hash".to_string()]),
545            noop_count: None,
546            is_canceled: None,
547            delete_at: None,
548            metadata: None,
549        }
550    }
551
552    #[tokio::test]
553    async fn test_new_in_memory() {
554        let storage = TransactionRepositoryStorage::new_in_memory();
555
556        match storage {
557            TransactionRepositoryStorage::InMemory(_) => {
558                // Success - verify it's the InMemory variant
559            }
560            TransactionRepositoryStorage::Redis(_) => {
561                panic!("Expected InMemory variant, got Redis");
562            }
563        }
564    }
565
566    #[tokio::test]
567    async fn test_connection_info_returns_none_for_in_memory() {
568        let storage = TransactionRepositoryStorage::new_in_memory();
569
570        // In-memory storage should return None for connection_info
571        assert!(storage.connection_info().is_none());
572    }
573
574    #[tokio::test]
575    #[ignore = "Requires active Redis instance"]
576    async fn test_connection_info_returns_some_for_redis() -> Result<()> {
577        let redis_url = std::env::var("REDIS_TEST_URL")
578            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
579        let cfg = Config::from_url(&redis_url);
580        let pool = Arc::new(
581            cfg.builder()
582                .map_err(|e| eyre::eyre!("Failed to create Redis pool builder: {}", e))?
583                .max_size(16)
584                .runtime(Runtime::Tokio1)
585                .build()
586                .map_err(|e| eyre::eyre!("Failed to build Redis pool: {}", e))?,
587        );
588        let connections = Arc::new(RedisConnections::new_single_pool(pool.clone()));
589        let key_prefix = "test_prefix".to_string();
590
591        let storage = TransactionRepositoryStorage::new_redis(connections, key_prefix.clone())?;
592
593        let (returned_connection, returned_prefix) = storage
594            .connection_info()
595            .expect("Expected Redis connection info");
596
597        assert!(Arc::ptr_eq(&pool, returned_connection.primary()));
598        assert_eq!(returned_prefix, key_prefix);
599
600        Ok(())
601    }
602
603    #[tokio::test]
604    async fn test_create_in_memory() -> Result<()> {
605        let storage = TransactionRepositoryStorage::new_in_memory();
606        let transaction = create_test_transaction("test-tx", "test-relayer");
607
608        let created = storage.create(transaction.clone()).await?;
609        assert_eq!(created.id, transaction.id);
610        assert_eq!(created.relayer_id, transaction.relayer_id);
611        assert_eq!(created.status, transaction.status);
612
613        Ok(())
614    }
615
616    #[tokio::test]
617    async fn test_get_by_id_in_memory() -> Result<()> {
618        let storage = TransactionRepositoryStorage::new_in_memory();
619        let transaction = create_test_transaction("test-tx", "test-relayer");
620
621        // Create transaction first
622        storage.create(transaction.clone()).await?;
623
624        // Get by ID
625        let retrieved = storage.get_by_id("test-tx".to_string()).await?;
626        assert_eq!(retrieved.id, transaction.id);
627        assert_eq!(retrieved.relayer_id, transaction.relayer_id);
628        assert_eq!(retrieved.status, transaction.status);
629
630        Ok(())
631    }
632
633    #[tokio::test]
634    async fn test_get_by_id_not_found_in_memory() -> Result<()> {
635        let storage = TransactionRepositoryStorage::new_in_memory();
636
637        let result = storage.get_by_id("non-existent".to_string()).await;
638        assert!(result.is_err());
639
640        Ok(())
641    }
642
643    #[tokio::test]
644    async fn test_list_all_in_memory() -> Result<()> {
645        let storage = TransactionRepositoryStorage::new_in_memory();
646
647        // Initially empty
648        let transactions = storage.list_all().await?;
649        assert!(transactions.is_empty());
650
651        // Add transactions
652        let tx1 = create_test_transaction("tx-1", "relayer-1");
653        let tx2 = create_test_transaction("tx-2", "relayer-2");
654
655        storage.create(tx1.clone()).await?;
656        storage.create(tx2.clone()).await?;
657
658        let all_transactions = storage.list_all().await?;
659        assert_eq!(all_transactions.len(), 2);
660
661        let ids: Vec<&str> = all_transactions.iter().map(|t| t.id.as_str()).collect();
662        assert!(ids.contains(&"tx-1"));
663        assert!(ids.contains(&"tx-2"));
664
665        Ok(())
666    }
667
668    #[tokio::test]
669    async fn test_list_paginated_in_memory() -> Result<()> {
670        let storage = TransactionRepositoryStorage::new_in_memory();
671
672        // Add test transactions
673        for i in 1..=5 {
674            let tx = create_test_transaction(&format!("tx-{i}"), "test-relayer");
675            storage.create(tx).await?;
676        }
677
678        // Test pagination
679        let query = PaginationQuery {
680            page: 1,
681            per_page: 2,
682        };
683        let page = storage.list_paginated(query).await?;
684
685        assert_eq!(page.items.len(), 2);
686        assert_eq!(page.total, 5);
687        assert_eq!(page.page, 1);
688        assert_eq!(page.per_page, 2);
689
690        // Test second page
691        let query2 = PaginationQuery {
692            page: 2,
693            per_page: 2,
694        };
695        let page2 = storage.list_paginated(query2).await?;
696
697        assert_eq!(page2.items.len(), 2);
698        assert_eq!(page2.total, 5);
699        assert_eq!(page2.page, 2);
700        assert_eq!(page2.per_page, 2);
701
702        Ok(())
703    }
704
705    #[tokio::test]
706    async fn test_update_in_memory() -> Result<()> {
707        let storage = TransactionRepositoryStorage::new_in_memory();
708        let transaction = create_test_transaction("test-tx", "test-relayer");
709
710        // Create transaction first
711        storage.create(transaction.clone()).await?;
712
713        // Update it
714        let mut updated_transaction = transaction.clone();
715        updated_transaction.status = TransactionStatus::Sent;
716        updated_transaction.status_reason = Some("Updated reason".to_string());
717
718        let result = storage
719            .update("test-tx".to_string(), updated_transaction.clone())
720            .await?;
721        assert_eq!(result.id, "test-tx");
722        assert_eq!(result.status, TransactionStatus::Sent);
723        assert_eq!(result.status_reason, Some("Updated reason".to_string()));
724
725        // Verify the update persisted
726        let retrieved = storage.get_by_id("test-tx".to_string()).await?;
727        assert_eq!(retrieved.status, TransactionStatus::Sent);
728        assert_eq!(retrieved.status_reason, Some("Updated reason".to_string()));
729
730        Ok(())
731    }
732
733    #[tokio::test]
734    async fn test_update_not_found_in_memory() -> Result<()> {
735        let storage = TransactionRepositoryStorage::new_in_memory();
736        let transaction = create_test_transaction("non-existent", "test-relayer");
737
738        let result = storage
739            .update("non-existent".to_string(), transaction)
740            .await;
741        assert!(result.is_err());
742
743        Ok(())
744    }
745
746    #[tokio::test]
747    async fn test_delete_by_id_in_memory() -> Result<()> {
748        let storage = TransactionRepositoryStorage::new_in_memory();
749        let transaction = create_test_transaction("test-tx", "test-relayer");
750
751        // Create transaction first
752        storage.create(transaction.clone()).await?;
753
754        // Verify it exists
755        let retrieved = storage.get_by_id("test-tx".to_string()).await?;
756        assert_eq!(retrieved.id, "test-tx");
757
758        // Delete it
759        storage.delete_by_id("test-tx".to_string()).await?;
760
761        // Verify it's gone
762        let result = storage.get_by_id("test-tx".to_string()).await;
763        assert!(result.is_err());
764
765        Ok(())
766    }
767
768    #[tokio::test]
769    async fn test_delete_by_id_not_found_in_memory() -> Result<()> {
770        let storage = TransactionRepositoryStorage::new_in_memory();
771
772        let result = storage.delete_by_id("non-existent".to_string()).await;
773        assert!(result.is_err());
774
775        Ok(())
776    }
777
778    #[tokio::test]
779    async fn test_count_in_memory() -> Result<()> {
780        let storage = TransactionRepositoryStorage::new_in_memory();
781
782        // Initially empty
783        let count = storage.count().await?;
784        assert_eq!(count, 0);
785
786        // Add transactions
787        let tx1 = create_test_transaction("tx-1", "relayer-1");
788        let tx2 = create_test_transaction("tx-2", "relayer-2");
789
790        storage.create(tx1).await?;
791        let count_after_one = storage.count().await?;
792        assert_eq!(count_after_one, 1);
793
794        storage.create(tx2).await?;
795        let count_after_two = storage.count().await?;
796        assert_eq!(count_after_two, 2);
797
798        // Delete one
799        storage.delete_by_id("tx-1".to_string()).await?;
800        let count_after_delete = storage.count().await?;
801        assert_eq!(count_after_delete, 1);
802
803        Ok(())
804    }
805
806    #[tokio::test]
807    async fn test_has_entries_in_memory() -> Result<()> {
808        let storage = TransactionRepositoryStorage::new_in_memory();
809
810        // Initially empty
811        let has_entries = storage.has_entries().await?;
812        assert!(!has_entries);
813
814        // Add transaction
815        let transaction = create_test_transaction("test-tx", "test-relayer");
816        storage.create(transaction).await?;
817
818        let has_entries_after_create = storage.has_entries().await?;
819        assert!(has_entries_after_create);
820
821        // Delete transaction
822        storage.delete_by_id("test-tx".to_string()).await?;
823
824        let has_entries_after_delete = storage.has_entries().await?;
825        assert!(!has_entries_after_delete);
826
827        Ok(())
828    }
829
830    #[tokio::test]
831    async fn test_drop_all_entries_in_memory() -> Result<()> {
832        let storage = TransactionRepositoryStorage::new_in_memory();
833
834        // Add multiple transactions
835        for i in 1..=5 {
836            let tx = create_test_transaction(&format!("tx-{i}"), "test-relayer");
837            storage.create(tx).await?;
838        }
839
840        // Verify they exist
841        let count_before = storage.count().await?;
842        assert_eq!(count_before, 5);
843
844        let has_entries_before = storage.has_entries().await?;
845        assert!(has_entries_before);
846
847        // Drop all entries
848        storage.drop_all_entries().await?;
849
850        // Verify they're gone
851        let count_after = storage.count().await?;
852        assert_eq!(count_after, 0);
853
854        let has_entries_after = storage.has_entries().await?;
855        assert!(!has_entries_after);
856
857        let all_transactions = storage.list_all().await?;
858        assert!(all_transactions.is_empty());
859
860        Ok(())
861    }
862
863    #[tokio::test]
864    async fn test_find_by_relayer_id_in_memory() -> Result<()> {
865        let storage = TransactionRepositoryStorage::new_in_memory();
866
867        // Add transactions for different relayers
868        let tx1 = create_test_transaction("tx-1", "relayer-1");
869        let tx2 = create_test_transaction("tx-2", "relayer-1");
870        let tx3 = create_test_transaction("tx-3", "relayer-2");
871
872        storage.create(tx1).await?;
873        storage.create(tx2).await?;
874        storage.create(tx3).await?;
875
876        // Find by relayer ID
877        let query = PaginationQuery {
878            page: 1,
879            per_page: 10,
880        };
881        let result = storage.find_by_relayer_id("relayer-1", query).await?;
882
883        assert_eq!(result.items.len(), 2);
884        assert_eq!(result.total, 2);
885
886        // Verify all transactions belong to relayer-1
887        for tx in result.items {
888            assert_eq!(tx.relayer_id, "relayer-1");
889        }
890
891        Ok(())
892    }
893
894    #[tokio::test]
895    async fn test_find_by_status_in_memory() -> Result<()> {
896        let storage = TransactionRepositoryStorage::new_in_memory();
897
898        // Add transactions with different statuses
899        let tx1 =
900            create_test_transaction_with_status("tx-1", "relayer-1", TransactionStatus::Pending);
901        let tx2 = create_test_transaction_with_status("tx-2", "relayer-1", TransactionStatus::Sent);
902        let tx3 =
903            create_test_transaction_with_status("tx-3", "relayer-1", TransactionStatus::Pending);
904        let tx4 =
905            create_test_transaction_with_status("tx-4", "relayer-2", TransactionStatus::Pending);
906
907        storage.create(tx1).await?;
908        storage.create(tx2).await?;
909        storage.create(tx3).await?;
910        storage.create(tx4).await?;
911
912        // Find by status
913        let statuses = vec![TransactionStatus::Pending];
914        let result = storage.find_by_status("relayer-1", &statuses).await?;
915
916        assert_eq!(result.len(), 2);
917
918        // Verify all transactions have Pending status and belong to relayer-1
919        for tx in result {
920            assert_eq!(tx.status, TransactionStatus::Pending);
921            assert_eq!(tx.relayer_id, "relayer-1");
922        }
923
924        Ok(())
925    }
926
927    #[tokio::test]
928    async fn test_find_by_nonce_in_memory() -> Result<()> {
929        let storage = TransactionRepositoryStorage::new_in_memory();
930
931        // Add transactions with different nonces
932        let tx1 = create_test_transaction_with_nonce("tx-1", "relayer-1", 10);
933        let tx2 = create_test_transaction_with_nonce("tx-2", "relayer-1", 20);
934        let tx3 = create_test_transaction_with_nonce("tx-3", "relayer-2", 10);
935
936        storage.create(tx1).await?;
937        storage.create(tx2).await?;
938        storage.create(tx3).await?;
939
940        // Find by nonce
941        let result = storage.find_by_nonce("relayer-1", 10).await?;
942
943        assert!(result.is_some());
944        let found_tx = result.unwrap();
945        assert_eq!(found_tx.id, "tx-1");
946        assert_eq!(found_tx.relayer_id, "relayer-1");
947
948        // Check EVM nonce
949        if let NetworkTransactionData::Evm(evm_data) = found_tx.network_data {
950            assert_eq!(evm_data.nonce, Some(10));
951        }
952
953        // Test not found
954        let not_found = storage.find_by_nonce("relayer-1", 99).await?;
955        assert!(not_found.is_none());
956
957        Ok(())
958    }
959
960    #[tokio::test]
961    async fn test_update_status_in_memory() -> Result<()> {
962        let storage = TransactionRepositoryStorage::new_in_memory();
963        let transaction = create_test_transaction("test-tx", "test-relayer");
964
965        // Create transaction first
966        storage.create(transaction).await?;
967
968        // Update status
969        let updated = storage
970            .update_status("test-tx".to_string(), TransactionStatus::Sent)
971            .await?;
972
973        assert_eq!(updated.id, "test-tx");
974        assert_eq!(updated.status, TransactionStatus::Sent);
975
976        // Verify the update persisted
977        let retrieved = storage.get_by_id("test-tx".to_string()).await?;
978        assert_eq!(retrieved.status, TransactionStatus::Sent);
979
980        Ok(())
981    }
982
983    #[tokio::test]
984    async fn test_partial_update_in_memory() -> Result<()> {
985        let storage = TransactionRepositoryStorage::new_in_memory();
986        let transaction = create_test_transaction("test-tx", "test-relayer");
987
988        // Create transaction first
989        storage.create(transaction).await?;
990
991        // Partial update
992        let update_request = create_test_update_request();
993        let updated = storage
994            .partial_update("test-tx".to_string(), update_request)
995            .await?;
996
997        assert_eq!(updated.id, "test-tx");
998        assert_eq!(updated.status, TransactionStatus::Sent);
999        assert_eq!(updated.status_reason, Some("Test reason".to_string()));
1000        assert!(updated.sent_at.is_some());
1001        assert_eq!(updated.hashes, vec!["test_hash".to_string()]);
1002
1003        Ok(())
1004    }
1005
1006    #[tokio::test]
1007    async fn test_update_network_data_in_memory() -> Result<()> {
1008        let storage = TransactionRepositoryStorage::new_in_memory();
1009        let transaction = create_test_transaction("test-tx", "test-relayer");
1010
1011        // Create transaction first
1012        storage.create(transaction).await?;
1013
1014        // Update network data
1015        let new_evm_data = EvmTransactionData {
1016            nonce: Some(42),
1017            gas_limit: Some(21000),
1018            ..Default::default()
1019        };
1020        let new_network_data = NetworkTransactionData::Evm(new_evm_data);
1021
1022        let updated = storage
1023            .update_network_data("test-tx".to_string(), new_network_data)
1024            .await?;
1025
1026        assert_eq!(updated.id, "test-tx");
1027        if let NetworkTransactionData::Evm(evm_data) = updated.network_data {
1028            assert_eq!(evm_data.nonce, Some(42));
1029            assert_eq!(evm_data.gas_limit, Some(21000));
1030        } else {
1031            panic!("Expected EVM network data");
1032        }
1033
1034        Ok(())
1035    }
1036
1037    #[tokio::test]
1038    async fn test_set_sent_at_in_memory() -> Result<()> {
1039        let storage = TransactionRepositoryStorage::new_in_memory();
1040        let transaction = create_test_transaction("test-tx", "test-relayer");
1041
1042        // Create transaction first
1043        storage.create(transaction).await?;
1044
1045        // Set sent_at
1046        let sent_at = Utc::now().to_string();
1047        let updated = storage
1048            .set_sent_at("test-tx".to_string(), sent_at.clone())
1049            .await?;
1050
1051        assert_eq!(updated.id, "test-tx");
1052        assert_eq!(updated.sent_at, Some(sent_at));
1053
1054        Ok(())
1055    }
1056
1057    #[tokio::test]
1058    async fn test_set_confirmed_at_in_memory() -> Result<()> {
1059        let storage = TransactionRepositoryStorage::new_in_memory();
1060        let transaction = create_test_transaction("test-tx", "test-relayer");
1061
1062        // Create transaction first
1063        storage.create(transaction).await?;
1064
1065        // Set confirmed_at
1066        let confirmed_at = Utc::now().to_string();
1067        let updated = storage
1068            .set_confirmed_at("test-tx".to_string(), confirmed_at.clone())
1069            .await?;
1070
1071        assert_eq!(updated.id, "test-tx");
1072        assert_eq!(updated.confirmed_at, Some(confirmed_at));
1073
1074        Ok(())
1075    }
1076
1077    #[tokio::test]
1078    async fn test_create_duplicate_id_in_memory() -> Result<()> {
1079        let storage = TransactionRepositoryStorage::new_in_memory();
1080        let transaction = create_test_transaction("duplicate-id", "test-relayer");
1081
1082        // Create first transaction
1083        storage.create(transaction.clone()).await?;
1084
1085        // Try to create another with same ID - should fail
1086        let result = storage.create(transaction.clone()).await;
1087        assert!(result.is_err());
1088
1089        Ok(())
1090    }
1091
1092    #[tokio::test]
1093    async fn test_workflow_in_memory() -> Result<()> {
1094        let storage = TransactionRepositoryStorage::new_in_memory();
1095
1096        // 1. Start with empty storage
1097        assert!(!storage.has_entries().await?);
1098        assert_eq!(storage.count().await?, 0);
1099
1100        // 2. Create transaction
1101        let transaction = create_test_transaction("workflow-test", "test-relayer");
1102        let created = storage.create(transaction.clone()).await?;
1103        assert_eq!(created.id, "workflow-test");
1104
1105        // 3. Verify it exists
1106        assert!(storage.has_entries().await?);
1107        assert_eq!(storage.count().await?, 1);
1108
1109        // 4. Retrieve it
1110        let retrieved = storage.get_by_id("workflow-test".to_string()).await?;
1111        assert_eq!(retrieved.id, "workflow-test");
1112
1113        // 5. Update status
1114        let updated = storage
1115            .update_status("workflow-test".to_string(), TransactionStatus::Sent)
1116            .await?;
1117        assert_eq!(updated.status, TransactionStatus::Sent);
1118
1119        // 6. Verify update
1120        let retrieved_updated = storage.get_by_id("workflow-test".to_string()).await?;
1121        assert_eq!(retrieved_updated.status, TransactionStatus::Sent);
1122
1123        // 7. Delete it
1124        storage.delete_by_id("workflow-test".to_string()).await?;
1125
1126        // 8. Verify it's gone
1127        assert!(!storage.has_entries().await?);
1128        assert_eq!(storage.count().await?, 0);
1129
1130        let result = storage.get_by_id("workflow-test".to_string()).await;
1131        assert!(result.is_err());
1132
1133        Ok(())
1134    }
1135
1136    #[tokio::test]
1137    async fn test_multiple_relayers_workflow() -> Result<()> {
1138        let storage = TransactionRepositoryStorage::new_in_memory();
1139
1140        // Add transactions for multiple relayers
1141        let tx1 =
1142            create_test_transaction_with_status("tx-1", "relayer-1", TransactionStatus::Pending);
1143        let tx2 = create_test_transaction_with_status("tx-2", "relayer-1", TransactionStatus::Sent);
1144        let tx3 =
1145            create_test_transaction_with_status("tx-3", "relayer-2", TransactionStatus::Pending);
1146
1147        storage.create(tx1).await?;
1148        storage.create(tx2).await?;
1149        storage.create(tx3).await?;
1150
1151        // Test find_by_relayer_id
1152        let query = PaginationQuery {
1153            page: 1,
1154            per_page: 10,
1155        };
1156        let relayer1_txs = storage.find_by_relayer_id("relayer-1", query).await?;
1157        assert_eq!(relayer1_txs.items.len(), 2);
1158
1159        // Test find_by_status
1160        let pending_txs = storage
1161            .find_by_status("relayer-1", &[TransactionStatus::Pending])
1162            .await?;
1163        assert_eq!(pending_txs.len(), 1);
1164        assert_eq!(pending_txs[0].id, "tx-1");
1165
1166        // Test count remains accurate
1167        assert_eq!(storage.count().await?, 3);
1168
1169        Ok(())
1170    }
1171
1172    #[tokio::test]
1173    async fn test_pagination_edge_cases_in_memory() -> Result<()> {
1174        let storage = TransactionRepositoryStorage::new_in_memory();
1175
1176        // Test pagination with empty storage
1177        let query = PaginationQuery {
1178            page: 1,
1179            per_page: 10,
1180        };
1181        let page = storage.list_paginated(query).await?;
1182        assert_eq!(page.items.len(), 0);
1183        assert_eq!(page.total, 0);
1184        assert_eq!(page.page, 1);
1185        assert_eq!(page.per_page, 10);
1186
1187        // Add one transaction
1188        let transaction = create_test_transaction("single-tx", "test-relayer");
1189        storage.create(transaction).await?;
1190
1191        // Test pagination with single item
1192        let query = PaginationQuery {
1193            page: 1,
1194            per_page: 10,
1195        };
1196        let page = storage.list_paginated(query).await?;
1197        assert_eq!(page.items.len(), 1);
1198        assert_eq!(page.total, 1);
1199        assert_eq!(page.page, 1);
1200        assert_eq!(page.per_page, 10);
1201
1202        // Test pagination with page beyond total
1203        let query = PaginationQuery {
1204            page: 3,
1205            per_page: 10,
1206        };
1207        let page = storage.list_paginated(query).await?;
1208        assert_eq!(page.items.len(), 0);
1209        assert_eq!(page.total, 1);
1210        assert_eq!(page.page, 3);
1211        assert_eq!(page.per_page, 10);
1212
1213        Ok(())
1214    }
1215
1216    #[tokio::test]
1217    async fn test_find_by_relayer_id_pagination() -> Result<()> {
1218        let storage = TransactionRepositoryStorage::new_in_memory();
1219
1220        // Add many transactions for one relayer
1221        for i in 1..=10 {
1222            let tx = create_test_transaction(&format!("tx-{i}"), "test-relayer");
1223            storage.create(tx).await?;
1224        }
1225
1226        // Test first page
1227        let query = PaginationQuery {
1228            page: 1,
1229            per_page: 3,
1230        };
1231        let page1 = storage.find_by_relayer_id("test-relayer", query).await?;
1232        assert_eq!(page1.items.len(), 3);
1233        assert_eq!(page1.total, 10);
1234        assert_eq!(page1.page, 1);
1235        assert_eq!(page1.per_page, 3);
1236
1237        // Test second page
1238        let query = PaginationQuery {
1239            page: 2,
1240            per_page: 3,
1241        };
1242        let page2 = storage.find_by_relayer_id("test-relayer", query).await?;
1243        assert_eq!(page2.items.len(), 3);
1244        assert_eq!(page2.total, 10);
1245        assert_eq!(page2.page, 2);
1246        assert_eq!(page2.per_page, 3);
1247
1248        Ok(())
1249    }
1250
1251    #[tokio::test]
1252    async fn test_find_by_multiple_statuses() -> Result<()> {
1253        let storage = TransactionRepositoryStorage::new_in_memory();
1254
1255        // Add transactions with different statuses
1256        let tx1 =
1257            create_test_transaction_with_status("tx-1", "test-relayer", TransactionStatus::Pending);
1258        let tx2 =
1259            create_test_transaction_with_status("tx-2", "test-relayer", TransactionStatus::Sent);
1260        let tx3 = create_test_transaction_with_status(
1261            "tx-3",
1262            "test-relayer",
1263            TransactionStatus::Confirmed,
1264        );
1265        let tx4 =
1266            create_test_transaction_with_status("tx-4", "test-relayer", TransactionStatus::Failed);
1267
1268        storage.create(tx1).await?;
1269        storage.create(tx2).await?;
1270        storage.create(tx3).await?;
1271        storage.create(tx4).await?;
1272
1273        // Find by multiple statuses
1274        let statuses = vec![TransactionStatus::Pending, TransactionStatus::Sent];
1275        let result = storage.find_by_status("test-relayer", &statuses).await?;
1276
1277        assert_eq!(result.len(), 2);
1278
1279        // Verify all transactions have the correct statuses
1280        let found_statuses: Vec<TransactionStatus> =
1281            result.iter().map(|tx| tx.status.clone()).collect();
1282        assert!(found_statuses.contains(&TransactionStatus::Pending));
1283        assert!(found_statuses.contains(&TransactionStatus::Sent));
1284
1285        Ok(())
1286    }
1287}