openzeppelin_relayer/repositories/plugin/
plugin_redis.rs

1//! Redis-backed implementation of the PluginRepository.
2
3use crate::models::{PaginationQuery, PluginModel, RepositoryError};
4use crate::repositories::redis_base::RedisRepository;
5use crate::repositories::{BatchRetrievalResult, PaginatedResult, PluginRepositoryTrait};
6use crate::utils::RedisConnections;
7use async_trait::async_trait;
8use lru::LruCache;
9use parking_lot::RwLock;
10use redis::AsyncCommands;
11use std::fmt;
12use std::num::NonZeroUsize;
13use std::sync::Arc;
14use std::time::Instant;
15use tracing::{debug, error, warn};
16
17const PLUGIN_PREFIX: &str = "plugin";
18const PLUGIN_LIST_KEY: &str = "plugin_list";
19const COMPILED_CODE_PREFIX: &str = "compiled_code";
20const SOURCE_HASH_PREFIX: &str = "source_hash";
21
22#[derive(Clone)]
23struct CachedCompiledCode {
24    code: String,
25    cached_at: Instant,
26}
27
28#[derive(Clone)]
29pub struct RedisPluginRepository {
30    pub connections: Arc<RedisConnections>,
31    pub key_prefix: String,
32    compiled_code_cache: Arc<RwLock<LruCache<String, CachedCompiledCode>>>,
33}
34
35impl RedisRepository for RedisPluginRepository {}
36
37impl RedisPluginRepository {
38    pub fn new(
39        connections: Arc<RedisConnections>,
40        key_prefix: String,
41    ) -> Result<Self, RepositoryError> {
42        if key_prefix.is_empty() {
43            return Err(RepositoryError::InvalidData(
44                "Redis key prefix cannot be empty".to_string(),
45            ));
46        }
47
48        Ok(Self {
49            connections,
50            key_prefix,
51            compiled_code_cache: Arc::new(RwLock::new(LruCache::new(
52                NonZeroUsize::new(100).unwrap(),
53            ))),
54        })
55    }
56
57    /// Generate key for plugin data: plugin:{plugin_id}
58    fn plugin_key(&self, plugin_id: &str) -> String {
59        format!("{}:{}:{}", self.key_prefix, PLUGIN_PREFIX, plugin_id)
60    }
61
62    /// Generate key for plugin list: plugin_list (paginated list of plugin IDs)
63    fn plugin_list_key(&self) -> String {
64        format!("{}:{}", self.key_prefix, PLUGIN_LIST_KEY)
65    }
66
67    /// Generate key for compiled code: compiled_code:{plugin_id}
68    fn compiled_code_key(&self, plugin_id: &str) -> String {
69        format!("{}:{}:{}", self.key_prefix, COMPILED_CODE_PREFIX, plugin_id)
70    }
71
72    /// Generate key for source hash: source_hash:{plugin_id}
73    fn source_hash_key(&self, plugin_id: &str) -> String {
74        format!("{}:{}:{}", self.key_prefix, SOURCE_HASH_PREFIX, plugin_id)
75    }
76
77    /// Get plugin by ID using an existing connection.
78    /// This method is useful to prevent creating new connections for
79    /// getting individual plugins on list operations.
80    ///
81    /// # Arguments
82    ///
83    /// * `id` - The ID of the plugin to get.
84    /// * `conn` - The connection to use.
85    async fn get_by_id_with_connection(
86        &self,
87        id: &str,
88        conn: &mut deadpool_redis::Connection,
89    ) -> Result<Option<PluginModel>, RepositoryError> {
90        if id.is_empty() {
91            return Err(RepositoryError::InvalidData(
92                "Plugin ID cannot be empty".to_string(),
93            ));
94        }
95        let key = self.plugin_key(id);
96
97        debug!(plugin_id = %id, "fetching plugin data");
98
99        let json: Option<String> = conn
100            .get(&key)
101            .await
102            .map_err(|e| self.map_redis_error(e, "get_plugin_by_id"))?;
103
104        match json {
105            Some(json) => {
106                debug!(plugin_id = %id, "found plugin data");
107                let plugin = self.deserialize_entity::<PluginModel>(&json, id, "plugin")?;
108                Ok(Some(plugin))
109            }
110            None => {
111                debug!(plugin_id = %id, "no plugin found");
112                Ok(None)
113            }
114        }
115    }
116
117    async fn get_by_ids(
118        &self,
119        ids: &[String],
120    ) -> Result<BatchRetrievalResult<PluginModel>, RepositoryError> {
121        if ids.is_empty() {
122            debug!("no plugin IDs provided for batch fetch");
123            return Ok(BatchRetrievalResult {
124                results: vec![],
125                failed_ids: vec![],
126            });
127        }
128
129        let mut conn = self
130            .get_connection(self.connections.reader(), "get_by_ids")
131            .await?;
132        let keys: Vec<String> = ids.iter().map(|id| self.plugin_key(id)).collect();
133
134        let values: Vec<Option<String>> = conn
135            .mget(&keys)
136            .await
137            .map_err(|e| self.map_redis_error(e, "batch_fetch_plugins"))?;
138
139        let mut plugins = Vec::new();
140        let mut failed_count = 0;
141        let mut failed_ids = Vec::new();
142        for (i, value) in values.into_iter().enumerate() {
143            match value {
144                Some(json) => match self.deserialize_entity(&json, &ids[i], "plugin") {
145                    Ok(plugin) => plugins.push(plugin),
146                    Err(e) => {
147                        failed_count += 1;
148                        error!(plugin_id = %ids[i], error = %e, "failed to deserialize plugin");
149                        failed_ids.push(ids[i].clone());
150                    }
151                },
152                None => {
153                    warn!(plugin_id = %ids[i], "plugin not found in batch fetch");
154                }
155            }
156        }
157
158        if failed_count > 0 {
159            warn!(failed_count = %failed_count, total_count = %ids.len(), "failed to deserialize plugins in batch");
160        }
161
162        Ok(BatchRetrievalResult {
163            results: plugins,
164            failed_ids,
165        })
166    }
167}
168
169impl fmt::Debug for RedisPluginRepository {
170    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
171        f.debug_struct("RedisPluginRepository")
172            .field("pool", &"<Pool>")
173            .field("key_prefix", &self.key_prefix)
174            .finish()
175    }
176}
177
178#[async_trait]
179impl PluginRepositoryTrait for RedisPluginRepository {
180    async fn get_by_id(&self, id: &str) -> Result<Option<PluginModel>, RepositoryError> {
181        let mut conn = self
182            .get_connection(self.connections.reader(), "get_by_id")
183            .await?;
184        self.get_by_id_with_connection(id, &mut conn).await
185    }
186
187    async fn add(&self, plugin: PluginModel) -> Result<(), RepositoryError> {
188        if plugin.id.is_empty() {
189            return Err(RepositoryError::InvalidData(
190                "Plugin ID cannot be empty".to_string(),
191            ));
192        }
193
194        if plugin.path.is_empty() {
195            return Err(RepositoryError::InvalidData(
196                "Plugin path cannot be empty".to_string(),
197            ));
198        }
199
200        let mut conn = self
201            .get_connection(self.connections.primary(), "add")
202            .await?;
203        let key = self.plugin_key(&plugin.id);
204        let list_key = self.plugin_list_key();
205
206        debug!(plugin_id = %plugin.id, "adding plugin");
207
208        // Check if plugin already exists
209        let exists: bool = conn
210            .exists(&key)
211            .await
212            .map_err(|e| self.map_redis_error(e, "check_plugin_exists"))?;
213
214        if exists {
215            return Err(RepositoryError::ConstraintViolation(format!(
216                "Plugin with ID {} already exists",
217                plugin.id
218            )));
219        }
220
221        // Serialize plugin
222        let json = self.serialize_entity(&plugin, |p| &p.id, "plugin")?;
223
224        // Use a pipeline to ensure atomicity
225        let mut pipe = redis::pipe();
226        pipe.atomic();
227        pipe.set(&key, &json);
228        pipe.sadd(&list_key, &plugin.id);
229
230        pipe.exec_async(&mut conn).await.map_err(|e| {
231            error!(plugin_id = %plugin.id, error = %e, "failed to add plugin");
232            self.map_redis_error(e, "add_plugin")
233        })?;
234
235        debug!(plugin_id = %plugin.id, "successfully added plugin");
236        Ok(())
237    }
238
239    async fn update(&self, plugin: PluginModel) -> Result<PluginModel, RepositoryError> {
240        if plugin.id.is_empty() {
241            return Err(RepositoryError::InvalidData(
242                "Plugin ID cannot be empty".to_string(),
243            ));
244        }
245
246        let mut conn = self
247            .get_connection(self.connections.primary(), "update")
248            .await?;
249        let key = self.plugin_key(&plugin.id);
250
251        debug!(plugin_id = %plugin.id, "updating plugin");
252
253        // Check if plugin exists
254        let exists: bool = conn
255            .exists(&key)
256            .await
257            .map_err(|e| self.map_redis_error(e, &format!("check_plugin_exists_{}", plugin.id)))?;
258
259        if !exists {
260            return Err(RepositoryError::NotFound(format!(
261                "Plugin with ID {} not found",
262                plugin.id
263            )));
264        }
265
266        // Serialize plugin
267        let json = self.serialize_entity(&plugin, |p| &p.id, "plugin")?;
268
269        // Update the plugin data
270        conn.set::<_, _, ()>(&key, &json)
271            .await
272            .map_err(|e| self.map_redis_error(e, &format!("update_plugin_{}", plugin.id)))?;
273
274        debug!(plugin_id = %plugin.id, "successfully updated plugin");
275        Ok(plugin)
276    }
277
278    async fn list_paginated(
279        &self,
280        query: PaginationQuery,
281    ) -> Result<PaginatedResult<PluginModel>, RepositoryError> {
282        if query.page == 0 {
283            return Err(RepositoryError::InvalidData(
284                "Page number must be greater than 0".to_string(),
285            ));
286        }
287
288        if query.per_page == 0 {
289            return Err(RepositoryError::InvalidData(
290                "Per page count must be greater than 0".to_string(),
291            ));
292        }
293
294        let (total, ids_to_query) = {
295            let mut conn = self
296                .get_connection(self.connections.reader(), "list_paginated")
297                .await?;
298            let plugin_list_key = self.plugin_list_key();
299
300            // Get total count
301            let total: u64 = conn
302                .scard(&plugin_list_key)
303                .await
304                .map_err(|e| self.map_redis_error(e, "list_paginated_count"))?;
305
306            if total == 0 {
307                return Ok(PaginatedResult {
308                    items: vec![],
309                    total: 0,
310                    page: query.page,
311                    per_page: query.per_page,
312                });
313            }
314
315            // Get all IDs and paginate in memory
316            let all_ids: Vec<String> = conn
317                .smembers(&plugin_list_key)
318                .await
319                .map_err(|e| self.map_redis_error(e, "list_paginated_members"))?;
320
321            let start = ((query.page - 1) * query.per_page) as usize;
322            let end = (start + query.per_page as usize).min(all_ids.len());
323
324            (total, all_ids[start..end].to_vec())
325            // Connection dropped here before nested call to avoid connection doubling
326        };
327
328        let items = self.get_by_ids(&ids_to_query).await?;
329
330        Ok(PaginatedResult {
331            items: items.results.clone(),
332            total,
333            page: query.page,
334            per_page: query.per_page,
335        })
336    }
337
338    async fn count(&self) -> Result<usize, RepositoryError> {
339        let mut conn = self
340            .get_connection(self.connections.reader(), "count")
341            .await?;
342        let plugin_list_key = self.plugin_list_key();
343
344        let count: u64 = conn
345            .scard(&plugin_list_key)
346            .await
347            .map_err(|e| self.map_redis_error(e, "count_plugins"))?;
348
349        Ok(count as usize)
350    }
351
352    async fn has_entries(&self) -> Result<bool, RepositoryError> {
353        let mut conn = self
354            .get_connection(self.connections.reader(), "has_entries")
355            .await?;
356        let plugin_list_key = self.plugin_list_key();
357
358        debug!("checking if plugin entries exist");
359
360        let exists: bool = conn
361            .exists(&plugin_list_key)
362            .await
363            .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
364
365        debug!(exists = %exists, "plugin entries exist");
366        Ok(exists)
367    }
368
369    async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
370        let mut conn = self
371            .get_connection(self.connections.primary(), "drop_all_entries")
372            .await?;
373        let plugin_list_key = self.plugin_list_key();
374
375        debug!("dropping all plugin entries");
376
377        // Get all plugin IDs first
378        let plugin_ids: Vec<String> = conn
379            .smembers(&plugin_list_key)
380            .await
381            .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_ids"))?;
382
383        if plugin_ids.is_empty() {
384            debug!("no plugin entries to drop");
385            return Ok(());
386        }
387
388        // Use pipeline for atomic operations
389        let mut pipe = redis::pipe();
390        pipe.atomic();
391
392        // Delete all individual plugin entries
393        for plugin_id in &plugin_ids {
394            let plugin_key = self.plugin_key(plugin_id);
395            pipe.del(&plugin_key);
396        }
397
398        // Delete the plugin list key
399        pipe.del(&plugin_list_key);
400
401        pipe.exec_async(&mut conn)
402            .await
403            .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
404
405        debug!(count = %plugin_ids.len(), "dropped plugin entries");
406        Ok(())
407    }
408
409    // Compiled code cache methods
410
411    async fn get_compiled_code(&self, plugin_id: &str) -> Result<Option<String>, RepositoryError> {
412        const CACHE_TTL_SECS: u64 = 300; // 5 minutes
413
414        // Check memory cache first - extract data without holding lock across await
415        // The cache is invalidated by store_compiled_code and invalidate_compiled_code,
416        // so we can trust it within the TTL without calling Redis to verify
417        let cached_code = {
418            let cache = self.compiled_code_cache.read();
419            cache.peek(plugin_id).and_then(|cached| {
420                if cached.cached_at.elapsed().as_secs() < CACHE_TTL_SECS {
421                    Some(cached.code.clone())
422                } else {
423                    None
424                }
425            })
426        };
427
428        if let Some(code) = cached_code {
429            return Ok(Some(code));
430        }
431
432        // Cache miss or expired - fetch from Redis
433        let mut conn = self
434            .get_connection(self.connections.reader(), "get_compiled_code")
435            .await?;
436        let key = self.compiled_code_key(plugin_id);
437
438        let code: Option<String> = conn
439            .get(&key)
440            .await
441            .map_err(|e| self.map_redis_error(e, &format!("get_compiled_code_{plugin_id}")))?;
442
443        // Update cache if found - fetch hash in same connection batch would be better
444        // but for now we avoid the extra Redis call since the hash is only used for
445        // external invalidation which already clears the in-memory cache
446        if let Some(ref code_str) = code {
447            let mut cache = self.compiled_code_cache.write();
448            cache.put(
449                plugin_id.to_string(),
450                CachedCompiledCode {
451                    code: code_str.clone(),
452                    cached_at: Instant::now(),
453                },
454            );
455        }
456
457        Ok(code)
458    }
459
460    async fn store_compiled_code(
461        &self,
462        plugin_id: &str,
463        compiled_code: &str,
464        source_hash: Option<&str>,
465    ) -> Result<(), RepositoryError> {
466        let mut conn = self
467            .get_connection(self.connections.primary(), "store_compiled_code")
468            .await?;
469        let code_key = self.compiled_code_key(plugin_id);
470
471        debug!(plugin_id = %plugin_id, "storing compiled code in Redis");
472
473        // Use pipeline to store both code and hash atomically
474        let mut pipe = redis::pipe();
475        pipe.atomic();
476        pipe.set(&code_key, compiled_code);
477
478        if let Some(hash) = source_hash {
479            let hash_key = self.source_hash_key(plugin_id);
480            pipe.set(&hash_key, hash);
481        }
482
483        pipe.exec_async(&mut conn)
484            .await
485            .map_err(|e| self.map_redis_error(e, &format!("store_compiled_code_{plugin_id}")))?;
486
487        // Invalidate memory cache
488        self.compiled_code_cache.write().pop(plugin_id);
489
490        Ok(())
491    }
492
493    async fn invalidate_compiled_code(&self, plugin_id: &str) -> Result<(), RepositoryError> {
494        let mut conn = self
495            .get_connection(self.connections.primary(), "invalidate_compiled_code")
496            .await?;
497        let code_key = self.compiled_code_key(plugin_id);
498        let hash_key = self.source_hash_key(plugin_id);
499
500        debug!(plugin_id = %plugin_id, "invalidating compiled code in Redis");
501
502        // Use pipeline to delete both keys atomically
503        let mut pipe = redis::pipe();
504        pipe.atomic();
505        pipe.del(&code_key);
506        pipe.del(&hash_key);
507
508        pipe.exec_async(&mut conn).await.map_err(|e| {
509            self.map_redis_error(e, &format!("invalidate_compiled_code_{plugin_id}"))
510        })?;
511
512        // Invalidate memory cache
513        self.compiled_code_cache.write().pop(plugin_id);
514
515        Ok(())
516    }
517
518    async fn invalidate_all_compiled_code(&self) -> Result<(), RepositoryError> {
519        let mut conn = self
520            .get_connection(self.connections.primary(), "invalidate_all_compiled_code")
521            .await?;
522        let plugin_list_key = self.plugin_list_key();
523
524        debug!("invalidating all compiled code in Redis");
525
526        // Get all plugin IDs from the list
527        let plugin_ids: Vec<String> = conn
528            .smembers(&plugin_list_key)
529            .await
530            .map_err(|e| self.map_redis_error(e, "get_plugin_list_for_invalidate"))?;
531
532        // Delete all compiled code and hash keys
533        for plugin_id in &plugin_ids {
534            let code_key = self.compiled_code_key(plugin_id);
535            let hash_key = self.source_hash_key(plugin_id);
536
537            let _ = conn.del::<_, ()>(&code_key).await;
538            let _ = conn.del::<_, ()>(&hash_key).await;
539        }
540
541        // Clear all cached compiled code
542        self.compiled_code_cache.write().clear();
543
544        Ok(())
545    }
546
547    async fn has_compiled_code(&self, plugin_id: &str) -> Result<bool, RepositoryError> {
548        let mut conn = self
549            .get_connection(self.connections.reader(), "has_compiled_code")
550            .await?;
551        let key = self.compiled_code_key(plugin_id);
552
553        let exists: bool = conn
554            .exists(&key)
555            .await
556            .map_err(|e| self.map_redis_error(e, &format!("exists_compiled_code_{plugin_id}")))?;
557
558        Ok(exists)
559    }
560
561    async fn get_source_hash(&self, plugin_id: &str) -> Result<Option<String>, RepositoryError> {
562        let mut conn = self
563            .get_connection(self.connections.reader(), "get_source_hash")
564            .await?;
565        let key = self.source_hash_key(plugin_id);
566
567        let hash: Option<String> = conn
568            .get(&key)
569            .await
570            .map_err(|e| self.map_redis_error(e, &format!("get_source_hash_{plugin_id}")))?;
571
572        Ok(hash)
573    }
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579    use crate::constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS;
580    use crate::models::PluginModel;
581    use std::{sync::Arc, time::Duration};
582
583    fn create_test_plugin(id: &str, path: &str) -> PluginModel {
584        PluginModel {
585            id: id.to_string(),
586            path: path.to_string(),
587            timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
588            emit_logs: false,
589            emit_traces: false,
590            raw_response: false,
591            allow_get_invocation: false,
592            config: None,
593            forward_logs: false,
594        }
595    }
596
597    async fn setup_test_repo() -> RedisPluginRepository {
598        let redis_url =
599            std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
600        let cfg = deadpool_redis::Config::from_url(&redis_url);
601        let pool = Arc::new(
602            cfg.builder()
603                .expect("Failed to create pool builder")
604                .max_size(16)
605                .runtime(deadpool_redis::Runtime::Tokio1)
606                .build()
607                .expect("Failed to build Redis pool"),
608        );
609        let connections = Arc::new(RedisConnections::new_single_pool(pool));
610
611        let random_id = uuid::Uuid::new_v4().to_string();
612        let key_prefix = format!("test_prefix:{random_id}");
613
614        RedisPluginRepository::new(connections, key_prefix)
615            .expect("Failed to create Redis plugin repository")
616    }
617
618    #[tokio::test]
619    #[ignore = "Requires active Redis instance"]
620    async fn test_new_repository_creation() {
621        let repo = setup_test_repo().await;
622        assert!(repo.key_prefix.contains("test_prefix"));
623    }
624
625    #[tokio::test]
626    #[ignore = "Requires active Redis instance"]
627    async fn test_new_repository_empty_prefix_fails() {
628        let redis_url = "redis://127.0.0.1:6379/";
629        let cfg = deadpool_redis::Config::from_url(redis_url);
630        let pool = Arc::new(
631            cfg.builder()
632                .expect("Failed to create pool builder")
633                .max_size(16)
634                .runtime(deadpool_redis::Runtime::Tokio1)
635                .build()
636                .expect("Failed to build Redis pool"),
637        );
638        let connections = Arc::new(RedisConnections::new_single_pool(pool));
639
640        let result = RedisPluginRepository::new(connections, "".to_string());
641        assert!(result.is_err());
642        assert!(result
643            .unwrap_err()
644            .to_string()
645            .contains("key prefix cannot be empty"));
646    }
647
648    #[tokio::test]
649    #[ignore = "Requires active Redis instance"]
650    async fn test_key_generation() {
651        let repo = setup_test_repo().await;
652
653        let plugin_key = repo.plugin_key("test-plugin");
654        assert!(plugin_key.contains(":plugin:test-plugin"));
655
656        let list_key = repo.plugin_list_key();
657        assert!(list_key.contains(":plugin_list"));
658    }
659
660    #[tokio::test]
661    #[ignore = "Requires active Redis instance"]
662    async fn test_serialize_deserialize_plugin() {
663        let repo = setup_test_repo().await;
664        let plugin = create_test_plugin("test-plugin", "/path/to/plugin");
665
666        let json = repo.serialize_entity(&plugin, |p| &p.id, "plugin").unwrap();
667        let deserialized: PluginModel = repo
668            .deserialize_entity(&json, &plugin.id, "plugin")
669            .unwrap();
670
671        assert_eq!(plugin.id, deserialized.id);
672        assert_eq!(plugin.path, deserialized.path);
673    }
674
675    #[tokio::test]
676    #[ignore = "Requires active Redis instance"]
677    async fn test_add_plugin() {
678        let repo = setup_test_repo().await;
679        let plugin_name = uuid::Uuid::new_v4().to_string();
680        let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
681
682        let result = repo.add(plugin).await;
683        assert!(result.is_ok());
684    }
685
686    #[tokio::test]
687    #[ignore = "Requires active Redis instance"]
688    async fn test_get_plugin() {
689        let repo = setup_test_repo().await;
690        let plugin_name = uuid::Uuid::new_v4().to_string();
691        let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
692
693        // Add the plugin first
694        repo.add(plugin.clone()).await.unwrap();
695
696        // Get the plugin
697        let retrieved = repo.get_by_id(&plugin_name).await.unwrap();
698        assert!(retrieved.is_some());
699        let retrieved = retrieved.unwrap();
700        assert_eq!(retrieved.id, plugin.id);
701        assert_eq!(retrieved.path, plugin.path);
702    }
703
704    #[tokio::test]
705    #[ignore = "Requires active Redis instance"]
706    async fn test_get_nonexistent_plugin() {
707        let repo = setup_test_repo().await;
708
709        let result = repo.get_by_id("nonexistent-plugin").await;
710        assert!(matches!(result, Ok(None)));
711    }
712
713    #[tokio::test]
714    #[ignore = "Requires active Redis instance"]
715    async fn test_duplicate_plugin_addition() {
716        let repo = setup_test_repo().await;
717        let plugin_name = uuid::Uuid::new_v4().to_string();
718        let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
719
720        // Add the plugin first time
721        repo.add(plugin.clone()).await.unwrap();
722
723        // Try to add the same plugin again
724        let result = repo.add(plugin).await;
725        assert!(result.is_err());
726
727        if let Err(RepositoryError::ConstraintViolation(msg)) = result {
728            assert!(msg.contains("already exists"));
729        } else {
730            panic!("Expected ConstraintViolation error");
731        }
732    }
733
734    #[tokio::test]
735    #[ignore = "Requires active Redis instance"]
736    async fn test_debug_implementation() {
737        let repo = setup_test_repo().await;
738        let debug_str = format!("{repo:?}");
739        assert!(debug_str.contains("RedisPluginRepository"));
740        assert!(debug_str.contains("test_prefix"));
741    }
742
743    #[tokio::test]
744    #[ignore = "Requires active Redis instance"]
745    async fn test_error_handling_empty_id() {
746        let repo = setup_test_repo().await;
747
748        let result = repo.get_by_id("").await;
749        assert!(result.is_err());
750        assert!(result
751            .unwrap_err()
752            .to_string()
753            .contains("ID cannot be empty"));
754    }
755
756    #[tokio::test]
757    #[ignore = "Requires active Redis instance"]
758    async fn test_add_plugin_with_empty_id() {
759        let repo = setup_test_repo().await;
760        let plugin = create_test_plugin("", "/path/to/plugin");
761
762        let result = repo.add(plugin).await;
763        assert!(result.is_err());
764        assert!(result
765            .unwrap_err()
766            .to_string()
767            .contains("ID cannot be empty"));
768    }
769
770    #[tokio::test]
771    #[ignore = "Requires active Redis instance"]
772    async fn test_add_plugin_with_empty_path() {
773        let repo = setup_test_repo().await;
774        let plugin = create_test_plugin("test-plugin", "");
775
776        let result = repo.add(plugin).await;
777        assert!(result.is_err());
778        assert!(result
779            .unwrap_err()
780            .to_string()
781            .contains("path cannot be empty"));
782    }
783
784    #[tokio::test]
785    #[ignore = "Requires active Redis instance"]
786    async fn test_get_by_ids_plugins() {
787        let repo = setup_test_repo().await;
788        let plugin_name1 = uuid::Uuid::new_v4().to_string();
789        let plugin_name2 = uuid::Uuid::new_v4().to_string();
790        let plugin1 = create_test_plugin(&plugin_name1, "/path/to/plugin1");
791        let plugin2 = create_test_plugin(&plugin_name2, "/path/to/plugin2");
792
793        repo.add(plugin1.clone()).await.unwrap();
794        repo.add(plugin2.clone()).await.unwrap();
795
796        let retrieved = repo
797            .get_by_ids(&[plugin1.id.clone(), plugin2.id.clone()])
798            .await
799            .unwrap();
800        assert_eq!(retrieved.results.len(), 2);
801        // Results order may vary, so check that both plugins are present
802        let result_ids: Vec<String> = retrieved.results.iter().map(|p| p.id.clone()).collect();
803        assert!(result_ids.contains(&plugin1.id));
804        assert!(result_ids.contains(&plugin2.id));
805        assert_eq!(retrieved.failed_ids.len(), 0);
806    }
807
808    #[tokio::test]
809    #[ignore = "Requires active Redis instance"]
810    async fn test_list_paginated_plugins() {
811        let repo = setup_test_repo().await;
812
813        let plugin_id1 = uuid::Uuid::new_v4().to_string();
814        let plugin_id2 = uuid::Uuid::new_v4().to_string();
815        let plugin_id3 = uuid::Uuid::new_v4().to_string();
816        let plugin1 = create_test_plugin(&plugin_id1, "/path/to/plugin1");
817        let plugin2 = create_test_plugin(&plugin_id2, "/path/to/plugin2");
818        let plugin3 = create_test_plugin(&plugin_id3, "/path/to/plugin3");
819
820        repo.add(plugin1.clone()).await.unwrap();
821        repo.add(plugin2.clone()).await.unwrap();
822        repo.add(plugin3.clone()).await.unwrap();
823
824        let query = PaginationQuery {
825            page: 1,
826            per_page: 2,
827        };
828
829        let result = repo.list_paginated(query).await;
830        assert!(result.is_ok());
831        let result = result.unwrap();
832        assert!(result.items.len() == 2);
833    }
834
835    #[tokio::test]
836    #[ignore = "Requires active Redis instance"]
837    async fn test_has_entries() {
838        let repo = setup_test_repo().await;
839        assert!(!repo.has_entries().await.unwrap());
840        repo.add(create_test_plugin("test-plugin", "/path/to/plugin"))
841            .await
842            .unwrap();
843        assert!(repo.has_entries().await.unwrap());
844        repo.drop_all_entries().await.unwrap();
845        assert!(!repo.has_entries().await.unwrap());
846    }
847
848    #[tokio::test]
849    #[ignore = "Requires active Redis instance"]
850    async fn test_drop_all_entries() {
851        let repo = setup_test_repo().await;
852        repo.add(create_test_plugin("test-plugin", "/path/to/plugin"))
853            .await
854            .unwrap();
855        assert!(repo.has_entries().await.unwrap());
856        repo.drop_all_entries().await.unwrap();
857        assert!(!repo.has_entries().await.unwrap());
858    }
859}