openzeppelin_relayer/repositories/relayer/
mod.rs

1//! Relayer Repository Module
2//!
3//! This module provides the relayer repository layer for the OpenZeppelin Relayer service.
4//! It implements the Repository pattern to abstract relayer data persistence operations,
5//! supporting both in-memory and Redis-backed storage implementations.
6//!
7//! ## Features
8//!
9//! - **CRUD Operations**: Create, read, update, and delete relayer configurations
10//! - **Status Management**: Enable/disable relayers and track their state
11//! - **Policy Management**: Update relayer network policies
12//! - **Partial Updates**: Support for partial relayer configuration updates
13//! - **Active Filtering**: Query for active (non-paused) relayers
14//! - **Pagination Support**: Efficient paginated listing of relayers
15//!
16//! ## Repository Implementations
17//!
18//! - [`InMemoryRelayerRepository`]: Fast in-memory storage for testing/development
19//! - [`RedisRelayerRepository`]: Redis-backed storage for production environments
20//!
21
22mod relayer_in_memory;
23mod relayer_redis;
24
25pub use relayer_in_memory::*;
26pub use relayer_redis::*;
27
28use crate::{
29    models::UpdateRelayerRequest,
30    models::{
31        DisabledReason, PaginationQuery, RelayerNetworkPolicy, RelayerRepoModel, RepositoryError,
32    },
33    repositories::{PaginatedResult, Repository},
34    utils::RedisConnections,
35};
36use async_trait::async_trait;
37use std::sync::Arc;
38
39#[async_trait]
40pub trait RelayerRepository: Repository<RelayerRepoModel, String> + Send + Sync {
41    async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
42    async fn list_by_signer_id(
43        &self,
44        signer_id: &str,
45    ) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
46    async fn list_by_notification_id(
47        &self,
48        notification_id: &str,
49    ) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
50    async fn partial_update(
51        &self,
52        id: String,
53        update: UpdateRelayerRequest,
54    ) -> Result<RelayerRepoModel, RepositoryError>;
55    async fn enable_relayer(&self, relayer_id: String)
56        -> Result<RelayerRepoModel, RepositoryError>;
57    async fn disable_relayer(
58        &self,
59        relayer_id: String,
60        reason: DisabledReason,
61    ) -> Result<RelayerRepoModel, RepositoryError>;
62    async fn update_policy(
63        &self,
64        id: String,
65        policy: RelayerNetworkPolicy,
66    ) -> Result<RelayerRepoModel, RepositoryError>;
67    /// Returns true if this repository uses persistent storage (e.g., Redis).
68    /// Returns false for in-memory storage.
69    fn is_persistent_storage(&self) -> bool;
70}
71
72#[cfg(test)]
73mockall::mock! {
74    pub RelayerRepository {}
75
76    #[async_trait]
77    impl Repository<RelayerRepoModel, String> for RelayerRepository {
78        async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError>;
79        async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError>;
80        async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
81        async fn list_paginated(&self, query: PaginationQuery) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError>;
82        async fn update(&self, id: String, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError>;
83        async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError>;
84        async fn count(&self) -> Result<usize, RepositoryError>;
85        async fn has_entries(&self) -> Result<bool, RepositoryError>;
86        async fn drop_all_entries(&self) -> Result<(), RepositoryError>;
87    }
88
89    #[async_trait]
90    impl RelayerRepository for RelayerRepository {
91        async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
92        async fn list_by_signer_id(&self, signer_id: &str) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
93        async fn list_by_notification_id(&self, notification_id: &str) -> Result<Vec<RelayerRepoModel>, RepositoryError>;
94        async fn partial_update(&self, id: String, update: UpdateRelayerRequest) -> Result<RelayerRepoModel, RepositoryError>;
95        async fn enable_relayer(&self, relayer_id: String) -> Result<RelayerRepoModel, RepositoryError>;
96        async fn disable_relayer(&self, relayer_id: String, reason: DisabledReason) -> Result<RelayerRepoModel, RepositoryError>;
97        async fn update_policy(&self, id: String, policy: RelayerNetworkPolicy) -> Result<RelayerRepoModel, RepositoryError>;
98        fn is_persistent_storage(&self) -> bool;
99    }
100}
101
102/// Enum wrapper for different relayer repository implementations
103#[derive(Debug, Clone)]
104pub enum RelayerRepositoryStorage {
105    InMemory(InMemoryRelayerRepository),
106    Redis(RedisRelayerRepository),
107}
108
109impl RelayerRepositoryStorage {
110    pub fn new_in_memory() -> Self {
111        Self::InMemory(InMemoryRelayerRepository::new())
112    }
113
114    pub fn new_redis(
115        connections: Arc<RedisConnections>,
116        key_prefix: String,
117    ) -> Result<Self, RepositoryError> {
118        Ok(Self::Redis(RedisRelayerRepository::new(
119            connections,
120            key_prefix,
121        )?))
122    }
123}
124
125impl Default for RelayerRepositoryStorage {
126    fn default() -> Self {
127        Self::new_in_memory()
128    }
129}
130
131#[async_trait]
132impl Repository<RelayerRepoModel, String> for RelayerRepositoryStorage {
133    async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError> {
134        match self {
135            RelayerRepositoryStorage::InMemory(repo) => repo.create(entity).await,
136            RelayerRepositoryStorage::Redis(repo) => repo.create(entity).await,
137        }
138    }
139
140    async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError> {
141        match self {
142            RelayerRepositoryStorage::InMemory(repo) => repo.get_by_id(id).await,
143            RelayerRepositoryStorage::Redis(repo) => repo.get_by_id(id).await,
144        }
145    }
146
147    async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
148        match self {
149            RelayerRepositoryStorage::InMemory(repo) => repo.list_all().await,
150            RelayerRepositoryStorage::Redis(repo) => repo.list_all().await,
151        }
152    }
153
154    async fn list_paginated(
155        &self,
156        query: PaginationQuery,
157    ) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError> {
158        match self {
159            RelayerRepositoryStorage::InMemory(repo) => repo.list_paginated(query).await,
160            RelayerRepositoryStorage::Redis(repo) => repo.list_paginated(query).await,
161        }
162    }
163
164    async fn update(
165        &self,
166        id: String,
167        entity: RelayerRepoModel,
168    ) -> Result<RelayerRepoModel, RepositoryError> {
169        match self {
170            RelayerRepositoryStorage::InMemory(repo) => repo.update(id, entity).await,
171            RelayerRepositoryStorage::Redis(repo) => repo.update(id, entity).await,
172        }
173    }
174
175    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
176        match self {
177            RelayerRepositoryStorage::InMemory(repo) => repo.delete_by_id(id).await,
178            RelayerRepositoryStorage::Redis(repo) => repo.delete_by_id(id).await,
179        }
180    }
181
182    async fn count(&self) -> Result<usize, RepositoryError> {
183        match self {
184            RelayerRepositoryStorage::InMemory(repo) => repo.count().await,
185            RelayerRepositoryStorage::Redis(repo) => repo.count().await,
186        }
187    }
188
189    async fn has_entries(&self) -> Result<bool, RepositoryError> {
190        match self {
191            RelayerRepositoryStorage::InMemory(repo) => repo.has_entries().await,
192            RelayerRepositoryStorage::Redis(repo) => repo.has_entries().await,
193        }
194    }
195
196    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
197        match self {
198            RelayerRepositoryStorage::InMemory(repo) => repo.drop_all_entries().await,
199            RelayerRepositoryStorage::Redis(repo) => repo.drop_all_entries().await,
200        }
201    }
202}
203
204#[async_trait]
205impl RelayerRepository for RelayerRepositoryStorage {
206    async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
207        match self {
208            RelayerRepositoryStorage::InMemory(repo) => repo.list_active().await,
209            RelayerRepositoryStorage::Redis(repo) => repo.list_active().await,
210        }
211    }
212
213    async fn list_by_signer_id(
214        &self,
215        signer_id: &str,
216    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
217        match self {
218            RelayerRepositoryStorage::InMemory(repo) => repo.list_by_signer_id(signer_id).await,
219            RelayerRepositoryStorage::Redis(repo) => repo.list_by_signer_id(signer_id).await,
220        }
221    }
222
223    async fn list_by_notification_id(
224        &self,
225        notification_id: &str,
226    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
227        match self {
228            RelayerRepositoryStorage::InMemory(repo) => {
229                repo.list_by_notification_id(notification_id).await
230            }
231            RelayerRepositoryStorage::Redis(repo) => {
232                repo.list_by_notification_id(notification_id).await
233            }
234        }
235    }
236
237    async fn partial_update(
238        &self,
239        id: String,
240        update: UpdateRelayerRequest,
241    ) -> Result<RelayerRepoModel, RepositoryError> {
242        match self {
243            RelayerRepositoryStorage::InMemory(repo) => repo.partial_update(id, update).await,
244            RelayerRepositoryStorage::Redis(repo) => repo.partial_update(id, update).await,
245        }
246    }
247
248    async fn enable_relayer(
249        &self,
250        relayer_id: String,
251    ) -> Result<RelayerRepoModel, RepositoryError> {
252        match self {
253            RelayerRepositoryStorage::InMemory(repo) => repo.enable_relayer(relayer_id).await,
254            RelayerRepositoryStorage::Redis(repo) => repo.enable_relayer(relayer_id).await,
255        }
256    }
257
258    async fn disable_relayer(
259        &self,
260        relayer_id: String,
261        reason: DisabledReason,
262    ) -> Result<RelayerRepoModel, RepositoryError> {
263        match self {
264            RelayerRepositoryStorage::InMemory(repo) => {
265                repo.disable_relayer(relayer_id, reason).await
266            }
267            RelayerRepositoryStorage::Redis(repo) => repo.disable_relayer(relayer_id, reason).await,
268        }
269    }
270
271    async fn update_policy(
272        &self,
273        id: String,
274        policy: RelayerNetworkPolicy,
275    ) -> Result<RelayerRepoModel, RepositoryError> {
276        match self {
277            RelayerRepositoryStorage::InMemory(repo) => repo.update_policy(id, policy).await,
278            RelayerRepositoryStorage::Redis(repo) => repo.update_policy(id, policy).await,
279        }
280    }
281
282    fn is_persistent_storage(&self) -> bool {
283        match self {
284            RelayerRepositoryStorage::InMemory(_) => false,
285            RelayerRepositoryStorage::Redis(_) => true,
286        }
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293    use crate::models::{NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy};
294
295    fn create_test_relayer(id: String) -> RelayerRepoModel {
296        RelayerRepoModel {
297            id: id.clone(),
298            name: format!("Relayer {}", id.clone()),
299            network: "TestNet".to_string(),
300            paused: false,
301            network_type: NetworkType::Evm,
302            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
303                min_balance: Some(0),
304                gas_limit_estimation: Some(true),
305                gas_price_cap: None,
306                whitelist_receivers: None,
307                eip1559_pricing: Some(false),
308                private_transactions: Some(false),
309            }),
310            signer_id: "test".to_string(),
311            address: "0x".to_string(),
312            notification_id: None,
313            system_disabled: false,
314            custom_rpc_urls: None,
315            ..Default::default()
316        }
317    }
318
319    #[actix_web::test]
320    async fn test_in_memory_repository_impl() {
321        let impl_repo = RelayerRepositoryStorage::new_in_memory();
322        let relayer = create_test_relayer("test-relayer".to_string());
323
324        // Test create
325        let created = impl_repo.create(relayer.clone()).await.unwrap();
326        assert_eq!(created.id, relayer.id);
327
328        // Test get
329        let retrieved = impl_repo
330            .get_by_id("test-relayer".to_string())
331            .await
332            .unwrap();
333        assert_eq!(retrieved.id, relayer.id);
334
335        // Test list all
336        let all_relayers = impl_repo.list_all().await.unwrap();
337        assert!(!all_relayers.is_empty());
338
339        // Test count
340        let count = impl_repo.count().await.unwrap();
341        assert!(count >= 1);
342
343        // Test update
344        let mut updated_relayer = relayer.clone();
345        updated_relayer.name = "Updated Name".to_string();
346        let updated = impl_repo
347            .update(relayer.id.clone(), updated_relayer)
348            .await
349            .unwrap();
350        assert_eq!(updated.name, "Updated Name");
351
352        // Test delete
353        impl_repo.delete_by_id(relayer.id.clone()).await.unwrap();
354        let get_result = impl_repo.get_by_id("test-relayer".to_string()).await;
355        assert!(get_result.is_err());
356    }
357
358    #[actix_web::test]
359    async fn test_relayer_repository_trait_methods() {
360        let impl_repo = RelayerRepositoryStorage::new_in_memory();
361        let relayer = create_test_relayer("test-relayer".to_string());
362
363        // Create the relayer first
364        impl_repo.create(relayer.clone()).await.unwrap();
365
366        // Test list_active
367        let active_relayers = impl_repo.list_active().await.unwrap();
368        assert!(!active_relayers.is_empty());
369
370        // Test partial_update
371        let update = UpdateRelayerRequest {
372            paused: Some(true),
373            ..Default::default()
374        };
375        let updated = impl_repo
376            .partial_update(relayer.id.clone(), update)
377            .await
378            .unwrap();
379        assert!(updated.paused);
380
381        // Test enable/disable
382        let disabled = impl_repo
383            .disable_relayer(
384                relayer.id.clone(),
385                DisabledReason::BalanceCheckFailed("Test disable reason".to_string()),
386            )
387            .await
388            .unwrap();
389        assert!(disabled.system_disabled);
390        assert_eq!(
391            disabled.disabled_reason,
392            Some(DisabledReason::BalanceCheckFailed(
393                "Test disable reason".to_string()
394            ))
395        );
396
397        let enabled = impl_repo.enable_relayer(relayer.id.clone()).await.unwrap();
398        assert!(!enabled.system_disabled);
399        assert_eq!(enabled.disabled_reason, None);
400
401        // Test update_policy
402        let new_policy = RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
403            min_balance: Some(1000000000000000000),
404            gas_limit_estimation: Some(true),
405            gas_price_cap: Some(50_000_000_000),
406            whitelist_receivers: None,
407            eip1559_pricing: Some(true),
408            private_transactions: Some(false),
409        });
410        let policy_updated = impl_repo
411            .update_policy(relayer.id.clone(), new_policy)
412            .await
413            .unwrap();
414
415        if let RelayerNetworkPolicy::Evm(evm_policy) = policy_updated.policies {
416            assert_eq!(evm_policy.gas_price_cap, Some(50_000_000_000));
417            assert_eq!(evm_policy.eip1559_pricing, Some(true));
418        } else {
419            panic!("Expected EVM policy");
420        }
421    }
422
423    #[actix_web::test]
424    async fn test_create_repository_in_memory() {
425        let result = RelayerRepositoryStorage::new_in_memory();
426
427        assert!(matches!(result, RelayerRepositoryStorage::InMemory(_)));
428    }
429
430    #[actix_web::test]
431    async fn test_pagination() {
432        let impl_repo = RelayerRepositoryStorage::new_in_memory();
433        let relayer1 = create_test_relayer("test-relayer-1".to_string());
434        let relayer2 = create_test_relayer("test-relayer-2".to_string());
435
436        impl_repo.create(relayer1).await.unwrap();
437        impl_repo.create(relayer2).await.unwrap();
438
439        let query = PaginationQuery {
440            page: 1,
441            per_page: 10,
442        };
443
444        let result = impl_repo.list_paginated(query).await.unwrap();
445        assert!(result.total >= 2);
446        assert_eq!(result.page, 1);
447        assert_eq!(result.per_page, 10);
448    }
449
450    #[actix_web::test]
451    async fn test_delete_relayer() {
452        let impl_repo = RelayerRepositoryStorage::new_in_memory();
453        let relayer = create_test_relayer("delete-test".to_string());
454
455        // Create relayer
456        impl_repo.create(relayer.clone()).await.unwrap();
457
458        // Delete relayer
459        impl_repo
460            .delete_by_id("delete-test".to_string())
461            .await
462            .unwrap();
463
464        // Verify deletion
465        let get_result = impl_repo.get_by_id("delete-test".to_string()).await;
466        assert!(get_result.is_err());
467        assert!(matches!(
468            get_result.unwrap_err(),
469            RepositoryError::NotFound(_)
470        ));
471
472        // Test deleting non-existent relayer
473        let delete_result = impl_repo.delete_by_id("nonexistent".to_string()).await;
474        assert!(delete_result.is_err());
475    }
476
477    #[actix_web::test]
478    async fn test_has_entries() {
479        let repo = InMemoryRelayerRepository::new();
480        assert!(!repo.has_entries().await.unwrap());
481
482        let relayer = create_test_relayer("test".to_string());
483
484        repo.create(relayer.clone()).await.unwrap();
485        assert!(repo.has_entries().await.unwrap());
486
487        repo.delete_by_id(relayer.id.clone()).await.unwrap();
488        assert!(!repo.has_entries().await.unwrap());
489    }
490
491    #[actix_web::test]
492    async fn test_drop_all_entries() {
493        let repo = InMemoryRelayerRepository::new();
494        let relayer = create_test_relayer("test".to_string());
495
496        repo.create(relayer.clone()).await.unwrap();
497        assert!(repo.has_entries().await.unwrap());
498
499        repo.drop_all_entries().await.unwrap();
500        assert!(!repo.has_entries().await.unwrap());
501    }
502}