openzeppelin_relayer/repositories/relayer/
relayer_redis.rs

1//! Redis-backed implementation of the RelayerRepository.
2
3use crate::models::UpdateRelayerRequest;
4use crate::models::{
5    DisabledReason, PaginationQuery, RelayerNetworkPolicy, RelayerRepoModel, RepositoryError,
6};
7use crate::repositories::redis_base::RedisRepository;
8use crate::repositories::{BatchRetrievalResult, PaginatedResult, RelayerRepository, Repository};
9use crate::utils::RedisConnections;
10use async_trait::async_trait;
11use redis::AsyncCommands;
12use std::fmt;
13use std::sync::Arc;
14use tracing::{debug, error, warn};
15
16const RELAYER_PREFIX: &str = "relayer";
17const RELAYER_LIST_KEY: &str = "relayer_list";
18
19#[derive(Clone)]
20pub struct RedisRelayerRepository {
21    pub connections: Arc<RedisConnections>,
22    pub key_prefix: String,
23}
24
25impl RedisRepository for RedisRelayerRepository {}
26
27impl RedisRelayerRepository {
28    pub fn new(
29        connections: Arc<RedisConnections>,
30        key_prefix: String,
31    ) -> Result<Self, RepositoryError> {
32        if key_prefix.is_empty() {
33            return Err(RepositoryError::InvalidData(
34                "Redis key prefix cannot be empty".to_string(),
35            ));
36        }
37
38        Ok(Self {
39            connections,
40            key_prefix,
41        })
42    }
43
44    /// Generate key for relayer data: relayer:{relayer_id}
45    fn relayer_key(&self, relayer_id: &str) -> String {
46        format!("{}:{}:{}", self.key_prefix, RELAYER_PREFIX, relayer_id)
47    }
48
49    /// Generate key for relayer list: relayer_list (set of all relayer IDs)
50    fn relayer_list_key(&self) -> String {
51        format!("{}:{}", self.key_prefix, RELAYER_LIST_KEY)
52    }
53
54    /// Batch fetch relayers by IDs
55    async fn get_relayers_by_ids(
56        &self,
57        ids: &[String],
58    ) -> Result<BatchRetrievalResult<RelayerRepoModel>, RepositoryError> {
59        if ids.is_empty() {
60            debug!("no relayer IDs provided for batch fetch");
61            return Ok(BatchRetrievalResult {
62                results: vec![],
63                failed_ids: vec![],
64            });
65        }
66
67        let mut conn = self
68            .get_connection(self.connections.reader(), "batch_fetch_relayers")
69            .await?;
70        let keys: Vec<String> = ids.iter().map(|id| self.relayer_key(id)).collect();
71
72        debug!(count = %keys.len(), "batch fetching relayer data");
73
74        let values: Vec<Option<String>> = conn
75            .mget(&keys)
76            .await
77            .map_err(|e| self.map_redis_error(e, "batch_fetch_relayers"))?;
78
79        let mut relayers = Vec::new();
80        let mut failed_count = 0;
81        let mut failed_ids = Vec::new();
82        for (i, value) in values.into_iter().enumerate() {
83            match value {
84                Some(json) => {
85                    match self.deserialize_entity(&json, &ids[i], "relayer") {
86                        Ok(relayer) => relayers.push(relayer),
87                        Err(e) => {
88                            failed_count += 1;
89                            error!(relayer_id = %ids[i], error = %e, "failed to deserialize relayer");
90                            failed_ids.push(ids[i].clone());
91                            // Continue processing other relayers
92                        }
93                    }
94                }
95                None => {
96                    warn!(relayer_id = %ids[i], "relayer not found in batch fetch");
97                }
98            }
99        }
100
101        if failed_count > 0 {
102            warn!(failed_count = %failed_count, total_count = %ids.len(), "failed to deserialize relayers in batch");
103        }
104
105        debug!(count = %relayers.len(), "successfully fetched relayers");
106        Ok(BatchRetrievalResult {
107            results: relayers,
108            failed_ids,
109        })
110    }
111}
112
113impl fmt::Debug for RedisRelayerRepository {
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        f.debug_struct("RedisRelayerRepository")
116            .field("connections", &"<RedisConnections>")
117            .field("key_prefix", &self.key_prefix)
118            .finish()
119    }
120}
121
122#[async_trait]
123impl Repository<RelayerRepoModel, String> for RedisRelayerRepository {
124    async fn create(&self, entity: RelayerRepoModel) -> Result<RelayerRepoModel, RepositoryError> {
125        if entity.id.is_empty() {
126            return Err(RepositoryError::InvalidData(
127                "Relayer ID cannot be empty".to_string(),
128            ));
129        }
130
131        if entity.name.is_empty() {
132            return Err(RepositoryError::InvalidData(
133                "Relayer name cannot be empty".to_string(),
134            ));
135        }
136
137        let mut conn = self
138            .get_connection(self.connections.primary(), "create")
139            .await?;
140        let relayer_key = self.relayer_key(&entity.id);
141
142        // Check if relayer already exists
143        let exists: bool = conn
144            .exists(&relayer_key)
145            .await
146            .map_err(|e| self.map_redis_error(e, "create_relayer_exists_check"))?;
147
148        if exists {
149            return Err(RepositoryError::ConstraintViolation(format!(
150                "Relayer with ID {} already exists",
151                entity.id
152            )));
153        }
154
155        let serialized = self.serialize_entity(&entity, |r| &r.id, "relayer")?;
156
157        // Use pipeline for atomic operations
158        let mut pipe = redis::pipe();
159        pipe.atomic();
160        pipe.set(&relayer_key, &serialized);
161        pipe.sadd(self.relayer_list_key(), &entity.id);
162
163        pipe.exec_async(&mut conn)
164            .await
165            .map_err(|e| self.map_redis_error(e, "create_relayer_pipeline"))?;
166
167        debug!(relayer_id = %entity.id, "created relayer");
168        Ok(entity)
169    }
170
171    async fn get_by_id(&self, id: String) -> Result<RelayerRepoModel, RepositoryError> {
172        if id.is_empty() {
173            return Err(RepositoryError::InvalidData(
174                "Relayer ID cannot be empty".to_string(),
175            ));
176        }
177
178        let mut conn = self
179            .get_connection(self.connections.reader(), "get_by_id")
180            .await?;
181        let relayer_key = self.relayer_key(&id);
182
183        debug!(relayer_id = %id, "fetching relayer");
184
185        let json: Option<String> = conn
186            .get(&relayer_key)
187            .await
188            .map_err(|e| self.map_redis_error(e, "get_relayer_by_id"))?;
189
190        match json {
191            Some(json) => {
192                debug!(relayer_id = %id, "found relayer");
193                self.deserialize_entity(&json, &id, "relayer")
194            }
195            None => {
196                debug!(relayer_id = %id, "relayer not found");
197                Err(RepositoryError::NotFound(format!(
198                    "Relayer with ID {id} not found"
199                )))
200            }
201        }
202    }
203
204    async fn list_all(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
205        let relayer_ids = {
206            let mut conn = self
207                .get_connection(self.connections.reader(), "list_all")
208                .await?;
209            let relayer_list_key = self.relayer_list_key();
210
211            debug!("listing all relayers");
212
213            let ids: Vec<String> = conn
214                .smembers(&relayer_list_key)
215                .await
216                .map_err(|e| self.map_redis_error(e, "list_all_relayers"))?;
217
218            debug!(count = %ids.len(), "found relayers in index");
219            ids
220            // Connection dropped here before nested call to avoid connection doubling
221        };
222
223        let relayers = self.get_relayers_by_ids(&relayer_ids).await?;
224        Ok(relayers.results)
225    }
226
227    async fn list_paginated(
228        &self,
229        query: PaginationQuery,
230    ) -> Result<PaginatedResult<RelayerRepoModel>, RepositoryError> {
231        if query.page == 0 {
232            return Err(RepositoryError::InvalidData(
233                "Page number must be greater than 0".to_string(),
234            ));
235        }
236
237        if query.per_page == 0 {
238            return Err(RepositoryError::InvalidData(
239                "Per page count must be greater than 0".to_string(),
240            ));
241        }
242
243        let (total, page_ids) = {
244            let mut conn = self
245                .get_connection(self.connections.reader(), "list_paginated")
246                .await?;
247            let relayer_list_key = self.relayer_list_key();
248
249            // Get total count
250            let total: u64 = conn
251                .scard(&relayer_list_key)
252                .await
253                .map_err(|e| self.map_redis_error(e, "list_paginated_count"))?;
254
255            if total == 0 {
256                return Ok(PaginatedResult {
257                    items: vec![],
258                    total: 0,
259                    page: query.page,
260                    per_page: query.per_page,
261                });
262            }
263
264            // Get all IDs and paginate in memory
265            let all_ids: Vec<String> = conn
266                .smembers(&relayer_list_key)
267                .await
268                .map_err(|e| self.map_redis_error(e, "list_paginated_members"))?;
269
270            let start = ((query.page - 1) * query.per_page) as usize;
271            let end = (start + query.per_page as usize).min(all_ids.len());
272
273            (total, all_ids[start..end].to_vec())
274            // Connection dropped here before nested call to avoid connection doubling
275        };
276
277        let items = self.get_relayers_by_ids(&page_ids).await?;
278
279        Ok(PaginatedResult {
280            items: items.results.clone(),
281            total,
282            page: query.page,
283            per_page: query.per_page,
284        })
285    }
286
287    async fn update(
288        &self,
289        id: String,
290        entity: RelayerRepoModel,
291    ) -> Result<RelayerRepoModel, RepositoryError> {
292        if id.is_empty() {
293            return Err(RepositoryError::InvalidData(
294                "Relayer ID cannot be empty".to_string(),
295            ));
296        }
297
298        if entity.name.is_empty() {
299            return Err(RepositoryError::InvalidData(
300                "Relayer name cannot be empty".to_string(),
301            ));
302        }
303
304        let mut conn = self
305            .get_connection(self.connections.primary(), "update")
306            .await?;
307        let relayer_key = self.relayer_key(&id);
308
309        // Check if relayer exists
310        let exists: bool = conn
311            .exists(&relayer_key)
312            .await
313            .map_err(|e| self.map_redis_error(e, "update_relayer_exists_check"))?;
314
315        if !exists {
316            return Err(RepositoryError::NotFound(format!(
317                "Relayer with ID {id} not found"
318            )));
319        }
320
321        // Ensure we preserve the original ID
322        let mut updated_entity = entity;
323        updated_entity.id = id.clone();
324
325        let serialized = self.serialize_entity(&updated_entity, |r| &r.id, "relayer")?;
326
327        // Use pipeline for atomic operations
328        let mut pipe = redis::pipe();
329        pipe.atomic();
330        pipe.set(&relayer_key, &serialized);
331        pipe.sadd(self.relayer_list_key(), &id);
332
333        pipe.exec_async(&mut conn)
334            .await
335            .map_err(|e| self.map_redis_error(e, "update_relayer_pipeline"))?;
336
337        debug!(relayer_id = %id, "updated relayer");
338        Ok(updated_entity)
339    }
340
341    async fn delete_by_id(&self, id: String) -> Result<(), RepositoryError> {
342        if id.is_empty() {
343            return Err(RepositoryError::InvalidData(
344                "Relayer ID cannot be empty".to_string(),
345            ));
346        }
347
348        let mut conn = self
349            .get_connection(self.connections.primary(), "delete_by_id")
350            .await?;
351        let relayer_key = self.relayer_key(&id);
352
353        // Check if relayer exists
354        let exists: bool = conn
355            .exists(&relayer_key)
356            .await
357            .map_err(|e| self.map_redis_error(e, "delete_relayer_exists_check"))?;
358
359        if !exists {
360            return Err(RepositoryError::NotFound(format!(
361                "Relayer with ID {id} not found"
362            )));
363        }
364
365        // Use pipeline for atomic operations
366        let mut pipe = redis::pipe();
367        pipe.atomic();
368        pipe.del(&relayer_key);
369        pipe.srem(self.relayer_list_key(), &id);
370
371        pipe.exec_async(&mut conn)
372            .await
373            .map_err(|e| self.map_redis_error(e, "delete_relayer_pipeline"))?;
374
375        debug!(relayer_id = %id, "deleted relayer");
376        Ok(())
377    }
378
379    async fn count(&self) -> Result<usize, RepositoryError> {
380        let mut conn = self
381            .get_connection(self.connections.reader(), "count")
382            .await?;
383        let relayer_list_key = self.relayer_list_key();
384
385        let count: u64 = conn
386            .scard(&relayer_list_key)
387            .await
388            .map_err(|e| self.map_redis_error(e, "count_relayers"))?;
389
390        Ok(count as usize)
391    }
392
393    async fn has_entries(&self) -> Result<bool, RepositoryError> {
394        let mut conn = self
395            .get_connection(self.connections.reader(), "has_entries")
396            .await?;
397        let relayer_list_key = self.relayer_list_key();
398
399        debug!("checking if relayer entries exist");
400
401        let exists: bool = conn
402            .exists(&relayer_list_key)
403            .await
404            .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
405
406        debug!(exists = %exists, "relayer entries exist");
407        Ok(exists)
408    }
409
410    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
411        let mut conn = self
412            .get_connection(self.connections.primary(), "drop_all_entries")
413            .await?;
414        let relayer_list_key = self.relayer_list_key();
415
416        debug!("dropping all relayer entries");
417
418        // Get all relayer IDs first
419        let relayer_ids: Vec<String> = conn
420            .smembers(&relayer_list_key)
421            .await
422            .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_ids"))?;
423
424        if relayer_ids.is_empty() {
425            debug!("no relayer entries to drop");
426            return Ok(());
427        }
428
429        // Use pipeline for atomic operations
430        let mut pipe = redis::pipe();
431        pipe.atomic();
432
433        // Delete all individual relayer entries
434        for relayer_id in &relayer_ids {
435            let relayer_key = self.relayer_key(relayer_id);
436            pipe.del(&relayer_key);
437        }
438
439        // Delete the relayer list key
440        pipe.del(&relayer_list_key);
441
442        pipe.exec_async(&mut conn)
443            .await
444            .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
445
446        debug!(count = %relayer_ids.len(), "dropped relayer entries");
447        Ok(())
448    }
449}
450
451#[async_trait]
452impl RelayerRepository for RedisRelayerRepository {
453    async fn list_active(&self) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
454        let all_relayers = self.list_all().await?;
455        let active_relayers: Vec<RelayerRepoModel> = all_relayers
456            .into_iter()
457            .filter(|relayer| !relayer.paused)
458            .collect();
459
460        debug!(count = %active_relayers.len(), "found active relayers");
461        Ok(active_relayers)
462    }
463
464    async fn list_by_signer_id(
465        &self,
466        signer_id: &str,
467    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
468        let all_relayers = self.list_all().await?;
469        let relayers_with_signer: Vec<RelayerRepoModel> = all_relayers
470            .into_iter()
471            .filter(|relayer| relayer.signer_id == signer_id)
472            .collect();
473
474        debug!(count = %relayers_with_signer.len(), signer_id = %signer_id, "found relayers using signer");
475        Ok(relayers_with_signer)
476    }
477
478    async fn list_by_notification_id(
479        &self,
480        notification_id: &str,
481    ) -> Result<Vec<RelayerRepoModel>, RepositoryError> {
482        let all_relayers = self.list_all().await?;
483        let relayers_with_notification: Vec<RelayerRepoModel> = all_relayers
484            .into_iter()
485            .filter(|relayer| {
486                relayer
487                    .notification_id
488                    .as_ref()
489                    .is_some_and(|id| id == notification_id)
490            })
491            .collect();
492
493        debug!(count = %relayers_with_notification.len(), notification_id = %notification_id, "found relayers using notification");
494        Ok(relayers_with_notification)
495    }
496
497    async fn partial_update(
498        &self,
499        id: String,
500        update: UpdateRelayerRequest,
501    ) -> Result<RelayerRepoModel, RepositoryError> {
502        // First get the current relayer
503        let mut relayer = self.get_by_id(id.clone()).await?;
504
505        // Apply the partial update
506        if let Some(paused) = update.paused {
507            relayer.paused = paused;
508        }
509
510        // Update the relayer
511        self.update(id, relayer).await
512    }
513
514    async fn enable_relayer(
515        &self,
516        relayer_id: String,
517    ) -> Result<RelayerRepoModel, RepositoryError> {
518        // First get the current relayer
519        let mut relayer = self.get_by_id(relayer_id.clone()).await?;
520
521        // Update the system_disabled flag and clear reason
522        relayer.system_disabled = false;
523        relayer.disabled_reason = None;
524
525        // Update the relayer
526        self.update(relayer_id, relayer).await
527    }
528
529    async fn disable_relayer(
530        &self,
531        relayer_id: String,
532        reason: DisabledReason,
533    ) -> Result<RelayerRepoModel, RepositoryError> {
534        // First get the current relayer
535        let mut relayer = self.get_by_id(relayer_id.clone()).await?;
536
537        // Update the system_disabled flag and set reason
538        relayer.system_disabled = true;
539        relayer.disabled_reason = Some(reason);
540
541        // Update the relayer
542        self.update(relayer_id, relayer).await
543    }
544
545    async fn update_policy(
546        &self,
547        id: String,
548        policy: RelayerNetworkPolicy,
549    ) -> Result<RelayerRepoModel, RepositoryError> {
550        // First get the current relayer
551        let mut relayer = self.get_by_id(id.clone()).await?;
552
553        // Update the policy
554        relayer.policies = policy;
555
556        // Update the relayer
557        self.update(id, relayer).await
558    }
559
560    fn is_persistent_storage(&self) -> bool {
561        true
562    }
563}
564
565#[cfg(test)]
566mod tests {
567    use super::*;
568    use crate::models::{NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy};
569    use deadpool_redis::{Config, Runtime};
570    use std::sync::Arc;
571
572    fn create_test_relayer(id: &str) -> RelayerRepoModel {
573        RelayerRepoModel {
574            id: id.to_string(),
575            name: format!("Test Relayer {id}"),
576            network: "ethereum".to_string(),
577            paused: false,
578            network_type: NetworkType::Evm,
579            signer_id: "test-signer".to_string(),
580            policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy::default()),
581            address: "0x742d35Cc6634C0532925a3b844Bc454e4438f44e".to_string(),
582            notification_id: None,
583            system_disabled: false,
584            disabled_reason: None,
585            custom_rpc_urls: None,
586        }
587    }
588
589    fn create_test_relayer_with_pause(id: &str, paused: bool) -> RelayerRepoModel {
590        let mut relayer = create_test_relayer(id);
591        relayer.paused = paused;
592        relayer
593    }
594
595    async fn setup_test_repo() -> RedisRelayerRepository {
596        let redis_url =
597            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
598        let cfg = Config::from_url(&redis_url);
599        let pool = Arc::new(
600            cfg.builder()
601                .expect("Failed to create pool builder")
602                .max_size(16)
603                .runtime(Runtime::Tokio1)
604                .build()
605                .expect("Failed to build Redis pool"),
606        );
607        let connections = Arc::new(RedisConnections::new_single_pool(pool));
608
609        let random_id = uuid::Uuid::new_v4().to_string();
610        let key_prefix = format!("test_prefix:{random_id}");
611
612        RedisRelayerRepository::new(connections, key_prefix)
613            .expect("Failed to create Redis relayer repository")
614    }
615
616    #[ignore = "Requires active Redis instance"]
617    #[tokio::test]
618    async fn test_new_repository_creation() {
619        let repo = setup_test_repo().await;
620        assert!(repo.key_prefix.contains("test_prefix"));
621    }
622
623    #[ignore = "Requires active Redis instance"]
624    #[tokio::test]
625    async fn test_new_repository_empty_prefix_fails() {
626        let redis_url =
627            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
628        let cfg = Config::from_url(&redis_url);
629        let pool = Arc::new(
630            cfg.builder()
631                .expect("Failed to create pool builder")
632                .max_size(16)
633                .runtime(Runtime::Tokio1)
634                .build()
635                .expect("Failed to build Redis pool"),
636        );
637        let connections = Arc::new(RedisConnections::new_single_pool(pool));
638
639        let result = RedisRelayerRepository::new(connections, "".to_string());
640        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
641    }
642
643    #[ignore = "Requires active Redis instance"]
644    #[tokio::test]
645    async fn test_key_generation() {
646        let repo = setup_test_repo().await;
647
648        let relayer_key = repo.relayer_key("test-relayer");
649        assert!(relayer_key.contains(":relayer:test-relayer"));
650
651        let list_key = repo.relayer_list_key();
652        assert!(list_key.contains(":relayer_list"));
653    }
654
655    #[ignore = "Requires active Redis instance"]
656    #[tokio::test]
657    async fn test_serialize_deserialize_relayer() {
658        let repo = setup_test_repo().await;
659        let relayer = create_test_relayer("test-relayer");
660
661        let serialized = repo
662            .serialize_entity(&relayer, |r| &r.id, "relayer")
663            .unwrap();
664        let deserialized: RelayerRepoModel = repo
665            .deserialize_entity(&serialized, &relayer.id, "relayer")
666            .unwrap();
667
668        assert_eq!(relayer.id, deserialized.id);
669        assert_eq!(relayer.name, deserialized.name);
670        assert_eq!(relayer.network, deserialized.network);
671        assert_eq!(relayer.paused, deserialized.paused);
672        assert_eq!(relayer.network_type, deserialized.network_type);
673        assert_eq!(relayer.signer_id, deserialized.signer_id);
674        assert_eq!(relayer.address, deserialized.address);
675        assert_eq!(relayer.notification_id, deserialized.notification_id);
676        assert_eq!(relayer.system_disabled, deserialized.system_disabled);
677        assert_eq!(relayer.custom_rpc_urls, deserialized.custom_rpc_urls);
678    }
679
680    #[ignore = "Requires active Redis instance"]
681    #[tokio::test]
682    async fn test_create_relayer() {
683        let repo = setup_test_repo().await;
684        let relayer_id = uuid::Uuid::new_v4().to_string();
685        let relayer = create_test_relayer(&relayer_id);
686
687        let result = repo.create(relayer.clone()).await;
688        assert!(result.is_ok());
689
690        let created_relayer = result.unwrap();
691        assert_eq!(created_relayer.id, relayer_id);
692        assert_eq!(created_relayer.name, relayer.name);
693    }
694
695    #[ignore = "Requires active Redis instance"]
696    #[tokio::test]
697    async fn test_get_relayer() {
698        let repo = setup_test_repo().await;
699        let relayer_id = uuid::Uuid::new_v4().to_string();
700        let relayer = create_test_relayer(&relayer_id);
701
702        repo.create(relayer.clone()).await.unwrap();
703
704        let retrieved = repo.get_by_id(relayer_id).await.unwrap();
705        assert_eq!(retrieved.id, relayer.id);
706        assert_eq!(retrieved.name, relayer.name);
707    }
708
709    #[ignore = "Requires active Redis instance"]
710    #[tokio::test]
711    async fn test_list_all_relayers() {
712        let repo = setup_test_repo().await;
713        let relayer1_id = uuid::Uuid::new_v4().to_string();
714        let relayer2_id = uuid::Uuid::new_v4().to_string();
715        let relayer1 = create_test_relayer(&relayer1_id);
716        let relayer2 = create_test_relayer(&relayer2_id);
717
718        repo.create(relayer1).await.unwrap();
719        repo.create(relayer2).await.unwrap();
720
721        let all_relayers = repo.list_all().await.unwrap();
722        assert!(all_relayers.len() >= 2);
723    }
724
725    #[ignore = "Requires active Redis instance"]
726    #[tokio::test]
727    async fn test_list_active_relayers() {
728        let repo = setup_test_repo().await;
729        let relayer1_id = uuid::Uuid::new_v4().to_string();
730        let relayer2_id = uuid::Uuid::new_v4().to_string();
731        let relayer1 = create_test_relayer_with_pause(&relayer1_id, false);
732        let relayer2 = create_test_relayer_with_pause(&relayer2_id, true);
733
734        repo.create(relayer1).await.unwrap();
735        repo.create(relayer2).await.unwrap();
736
737        let active_relayers = repo.list_active().await.unwrap();
738        // Should have at least 1 active relayer
739        assert!(!active_relayers.is_empty());
740        // All returned relayers should be active
741        assert!(active_relayers.iter().all(|r| !r.paused));
742    }
743
744    #[ignore = "Requires active Redis instance"]
745    #[tokio::test]
746    async fn test_count_relayers() {
747        let repo = setup_test_repo().await;
748        let relayer_id = uuid::Uuid::new_v4().to_string();
749        let relayer = create_test_relayer(&relayer_id);
750
751        repo.create(relayer).await.unwrap();
752
753        let count = repo.count().await.unwrap();
754        assert!(count >= 1);
755    }
756
757    #[ignore = "Requires active Redis instance"]
758    #[tokio::test]
759    async fn test_get_nonexistent_relayer() {
760        let repo = setup_test_repo().await;
761
762        let result = repo.get_by_id("nonexistent-relayer".to_string()).await;
763        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
764    }
765
766    #[ignore = "Requires active Redis instance"]
767    #[tokio::test]
768    async fn test_duplicate_relayer_creation() {
769        let repo = setup_test_repo().await;
770        let relayer_id = uuid::Uuid::new_v4().to_string();
771        let relayer = create_test_relayer(&relayer_id);
772
773        repo.create(relayer.clone()).await.unwrap();
774
775        let duplicate_result = repo.create(relayer).await;
776        assert!(matches!(
777            duplicate_result,
778            Err(RepositoryError::ConstraintViolation(_))
779        ));
780    }
781
782    #[ignore = "Requires active Redis instance"]
783    #[tokio::test]
784    async fn test_update_relayer() {
785        let repo = setup_test_repo().await;
786        let relayer_id = uuid::Uuid::new_v4().to_string();
787        let relayer = create_test_relayer(&relayer_id);
788
789        repo.create(relayer.clone()).await.unwrap();
790
791        let mut updated_relayer = relayer.clone();
792        updated_relayer.name = "Updated Relayer Name".to_string();
793
794        let result = repo.update(relayer.id.clone(), updated_relayer).await;
795        assert!(result.is_ok());
796
797        let updated = result.unwrap();
798        assert_eq!(updated.name, "Updated Relayer Name");
799        assert_eq!(updated.id, relayer.id);
800    }
801
802    #[ignore = "Requires active Redis instance"]
803    #[tokio::test]
804    async fn test_delete_relayer() {
805        let repo = setup_test_repo().await;
806        let relayer_id = uuid::Uuid::new_v4().to_string();
807        let relayer = create_test_relayer(&relayer_id);
808
809        repo.create(relayer.clone()).await.unwrap();
810
811        let delete_result = repo.delete_by_id(relayer.id.clone()).await;
812        assert!(delete_result.is_ok());
813
814        let get_result = repo.get_by_id(relayer.id).await;
815        assert!(matches!(get_result, Err(RepositoryError::NotFound(_))));
816    }
817
818    #[ignore = "Requires active Redis instance"]
819    #[tokio::test]
820    async fn test_list_paginated() {
821        let repo = setup_test_repo().await;
822        let relayer1_id = uuid::Uuid::new_v4().to_string();
823        let relayer2_id = uuid::Uuid::new_v4().to_string();
824        let relayer1 = create_test_relayer(&relayer1_id);
825        let relayer2 = create_test_relayer(&relayer2_id);
826
827        repo.create(relayer1).await.unwrap();
828        repo.create(relayer2).await.unwrap();
829
830        let query = PaginationQuery {
831            page: 1,
832            per_page: 10,
833        };
834
835        let result = repo.list_paginated(query).await.unwrap();
836        assert!(result.total >= 2);
837        assert_eq!(result.page, 1);
838        assert_eq!(result.per_page, 10);
839    }
840
841    #[ignore = "Requires active Redis instance"]
842    #[tokio::test]
843    async fn test_partial_update_relayer() {
844        let repo = setup_test_repo().await;
845        let relayer_id = uuid::Uuid::new_v4().to_string();
846        let relayer = create_test_relayer(&relayer_id);
847
848        repo.create(relayer.clone()).await.unwrap();
849
850        let update = UpdateRelayerRequest {
851            paused: Some(true),
852            ..Default::default()
853        };
854        let result = repo.partial_update(relayer.id.clone(), update).await;
855        assert!(result.is_ok());
856
857        let updated = result.unwrap();
858        assert_eq!(updated.id, relayer.id);
859        assert!(updated.paused);
860    }
861
862    #[ignore = "Requires active Redis instance"]
863    #[tokio::test]
864    async fn test_enable_disable_relayer() {
865        let repo = setup_test_repo().await;
866        let relayer_id = uuid::Uuid::new_v4().to_string();
867        let relayer = create_test_relayer(&relayer_id);
868
869        repo.create(relayer.clone()).await.unwrap();
870
871        // Test disable
872        let disabled = repo
873            .disable_relayer(
874                relayer.id.clone(),
875                DisabledReason::BalanceCheckFailed("test reason".to_string()),
876            )
877            .await
878            .unwrap();
879        assert!(disabled.system_disabled);
880
881        // Test enable
882        let enabled = repo.enable_relayer(relayer.id.clone()).await.unwrap();
883        assert!(!enabled.system_disabled);
884    }
885
886    #[ignore = "Requires active Redis instance"]
887    #[tokio::test]
888    async fn test_update_policy() {
889        let repo = setup_test_repo().await;
890        let relayer_id = uuid::Uuid::new_v4().to_string();
891        let relayer = create_test_relayer(&relayer_id);
892
893        repo.create(relayer.clone()).await.unwrap();
894
895        let new_policy = RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
896            gas_price_cap: Some(50_000_000_000),
897            whitelist_receivers: Some(vec!["0x123".to_string()]),
898            eip1559_pricing: Some(true),
899            private_transactions: Some(true),
900            min_balance: Some(1000000000000000000),
901            gas_limit_estimation: Some(true),
902        });
903
904        let result = repo.update_policy(relayer.id.clone(), new_policy).await;
905        assert!(result.is_ok());
906
907        let updated = result.unwrap();
908        if let RelayerNetworkPolicy::Evm(evm_policy) = updated.policies {
909            assert_eq!(evm_policy.gas_price_cap, Some(50_000_000_000));
910            assert_eq!(
911                evm_policy.whitelist_receivers,
912                Some(vec!["0x123".to_string()])
913            );
914            assert_eq!(evm_policy.eip1559_pricing, Some(true));
915            assert!(evm_policy.private_transactions.unwrap_or(false));
916            assert_eq!(evm_policy.min_balance, Some(1000000000000000000));
917        } else {
918            panic!("Expected EVM policy");
919        }
920    }
921
922    #[ignore = "Requires active Redis instance"]
923    #[tokio::test]
924    async fn test_debug_implementation() {
925        let repo = setup_test_repo().await;
926        let debug_str = format!("{repo:?}");
927        assert!(debug_str.contains("RedisRelayerRepository"));
928        assert!(debug_str.contains("key_prefix"));
929    }
930
931    #[ignore = "Requires active Redis instance"]
932    #[tokio::test]
933    async fn test_error_handling_empty_id() {
934        let repo = setup_test_repo().await;
935
936        let create_result = repo
937            .create(RelayerRepoModel {
938                id: "".to_string(),
939                ..create_test_relayer("test")
940            })
941            .await;
942        assert!(matches!(
943            create_result,
944            Err(RepositoryError::InvalidData(_))
945        ));
946
947        let get_result = repo.get_by_id("".to_string()).await;
948        assert!(matches!(get_result, Err(RepositoryError::InvalidData(_))));
949
950        let update_result = repo
951            .update("".to_string(), create_test_relayer("test"))
952            .await;
953        assert!(matches!(
954            update_result,
955            Err(RepositoryError::InvalidData(_))
956        ));
957
958        let delete_result = repo.delete_by_id("".to_string()).await;
959        assert!(matches!(
960            delete_result,
961            Err(RepositoryError::InvalidData(_))
962        ));
963    }
964
965    #[ignore = "Requires active Redis instance"]
966    #[tokio::test]
967    async fn test_error_handling_empty_name() {
968        let repo = setup_test_repo().await;
969
970        let create_result = repo
971            .create(RelayerRepoModel {
972                name: "".to_string(),
973                ..create_test_relayer("test")
974            })
975            .await;
976        assert!(matches!(
977            create_result,
978            Err(RepositoryError::InvalidData(_))
979        ));
980    }
981
982    #[ignore = "Requires active Redis instance"]
983    #[tokio::test]
984    async fn test_pagination_validation() {
985        let repo = setup_test_repo().await;
986
987        let invalid_page = PaginationQuery {
988            page: 0,
989            per_page: 10,
990        };
991        let result = repo.list_paginated(invalid_page).await;
992        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
993
994        let invalid_per_page = PaginationQuery {
995            page: 1,
996            per_page: 0,
997        };
998        let result = repo.list_paginated(invalid_per_page).await;
999        assert!(matches!(result, Err(RepositoryError::InvalidData(_))));
1000    }
1001
1002    #[ignore = "Requires active Redis instance"]
1003    #[tokio::test]
1004    async fn test_update_nonexistent_relayer() {
1005        let repo = setup_test_repo().await;
1006        let relayer = create_test_relayer("nonexistent-relayer");
1007
1008        let result = repo
1009            .update("nonexistent-relayer".to_string(), relayer)
1010            .await;
1011        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1012    }
1013
1014    #[ignore = "Requires active Redis instance"]
1015    #[tokio::test]
1016    async fn test_delete_nonexistent_relayer() {
1017        let repo = setup_test_repo().await;
1018
1019        let result = repo.delete_by_id("nonexistent-relayer".to_string()).await;
1020        assert!(matches!(result, Err(RepositoryError::NotFound(_))));
1021    }
1022
1023    #[tokio::test]
1024    #[ignore = "Requires active Redis instance"]
1025    async fn test_has_entries() {
1026        let repo = setup_test_repo().await;
1027        assert!(!repo.has_entries().await.unwrap());
1028
1029        let relayer_id = uuid::Uuid::new_v4().to_string();
1030        let relayer = create_test_relayer(&relayer_id);
1031        repo.create(relayer.clone()).await.unwrap();
1032        assert!(repo.has_entries().await.unwrap());
1033    }
1034
1035    #[tokio::test]
1036    #[ignore = "Requires active Redis instance"]
1037    async fn test_drop_all_entries() {
1038        let repo = setup_test_repo().await;
1039        let relayer_id = uuid::Uuid::new_v4().to_string();
1040        let relayer = create_test_relayer(&relayer_id);
1041        repo.create(relayer.clone()).await.unwrap();
1042        assert!(repo.has_entries().await.unwrap());
1043
1044        repo.drop_all_entries().await.unwrap();
1045        assert!(!repo.has_entries().await.unwrap());
1046    }
1047
1048    #[ignore = "Requires active Redis instance"]
1049    #[tokio::test]
1050    async fn test_list_by_signer_id() {
1051        let repo = setup_test_repo().await;
1052
1053        let relayer1_id = uuid::Uuid::new_v4().to_string();
1054        let relayer2_id = uuid::Uuid::new_v4().to_string();
1055        let relayer3_id = uuid::Uuid::new_v4().to_string();
1056        let signer1_id = uuid::Uuid::new_v4().to_string();
1057        let signer2_id = uuid::Uuid::new_v4().to_string();
1058
1059        let mut relayer1 = create_test_relayer(&relayer1_id);
1060        relayer1.signer_id = signer1_id.clone();
1061        repo.create(relayer1).await.unwrap();
1062
1063        let mut relayer2 = create_test_relayer(&relayer2_id);
1064
1065        relayer2.signer_id = signer2_id.clone();
1066        repo.create(relayer2).await.unwrap();
1067
1068        let mut relayer3 = create_test_relayer(&relayer3_id);
1069        relayer3.signer_id = signer1_id.clone();
1070        repo.create(relayer3).await.unwrap();
1071
1072        let result = repo.list_by_signer_id(&signer1_id).await.unwrap();
1073        assert_eq!(result.len(), 2);
1074        let ids: Vec<_> = result.iter().map(|r| r.id.clone()).collect();
1075        assert!(ids.contains(&relayer1_id));
1076        assert!(ids.contains(&relayer3_id));
1077
1078        let result = repo.list_by_signer_id(&signer2_id).await.unwrap();
1079        assert_eq!(result.len(), 1);
1080
1081        let result = repo.list_by_signer_id("nonexistent").await.unwrap();
1082        assert_eq!(result.len(), 0);
1083    }
1084
1085    #[ignore = "Requires active Redis instance"]
1086    #[tokio::test]
1087    async fn test_list_by_notification_id() {
1088        let repo = setup_test_repo().await;
1089
1090        let relayer1_id = uuid::Uuid::new_v4().to_string();
1091        let mut relayer1 = create_test_relayer(&relayer1_id);
1092        relayer1.notification_id = Some("notif1".to_string());
1093        repo.create(relayer1).await.unwrap();
1094
1095        let relayer2_id = uuid::Uuid::new_v4().to_string();
1096        let mut relayer2 = create_test_relayer(&relayer2_id);
1097        relayer2.notification_id = Some("notif2".to_string());
1098        repo.create(relayer2).await.unwrap();
1099
1100        let relayer3_id = uuid::Uuid::new_v4().to_string();
1101        let mut relayer3 = create_test_relayer(&relayer3_id);
1102        relayer3.notification_id = Some("notif1".to_string());
1103        repo.create(relayer3).await.unwrap();
1104
1105        let relayer4_id = uuid::Uuid::new_v4().to_string();
1106        let mut relayer4 = create_test_relayer(&relayer4_id);
1107        relayer4.notification_id = None;
1108        repo.create(relayer4).await.unwrap();
1109
1110        let result = repo.list_by_notification_id("notif1").await.unwrap();
1111        assert_eq!(result.len(), 2);
1112        let ids: Vec<_> = result.iter().map(|r| r.id.clone()).collect();
1113        assert!(ids.contains(&relayer1_id));
1114        assert!(ids.contains(&relayer3_id));
1115
1116        let result = repo.list_by_notification_id("notif2").await.unwrap();
1117        assert_eq!(result.len(), 1);
1118
1119        let result = repo.list_by_notification_id("nonexistent").await.unwrap();
1120        assert_eq!(result.len(), 0);
1121    }
1122}