1use crate::models::{PaginationQuery, PluginModel, RepositoryError};
4use crate::repositories::redis_base::RedisRepository;
5use crate::repositories::{BatchRetrievalResult, PaginatedResult, PluginRepositoryTrait};
6use crate::utils::RedisConnections;
7use async_trait::async_trait;
8use lru::LruCache;
9use parking_lot::RwLock;
10use redis::AsyncCommands;
11use std::fmt;
12use std::num::NonZeroUsize;
13use std::sync::Arc;
14use std::time::Instant;
15use tracing::{debug, error, warn};
16
17const PLUGIN_PREFIX: &str = "plugin";
18const PLUGIN_LIST_KEY: &str = "plugin_list";
19const COMPILED_CODE_PREFIX: &str = "compiled_code";
20const SOURCE_HASH_PREFIX: &str = "source_hash";
21
22#[derive(Clone)]
23struct CachedCompiledCode {
24 code: String,
25 cached_at: Instant,
26}
27
28#[derive(Clone)]
29pub struct RedisPluginRepository {
30 pub connections: Arc<RedisConnections>,
31 pub key_prefix: String,
32 compiled_code_cache: Arc<RwLock<LruCache<String, CachedCompiledCode>>>,
33}
34
35impl RedisRepository for RedisPluginRepository {}
36
37impl RedisPluginRepository {
38 pub fn new(
39 connections: Arc<RedisConnections>,
40 key_prefix: String,
41 ) -> Result<Self, RepositoryError> {
42 if key_prefix.is_empty() {
43 return Err(RepositoryError::InvalidData(
44 "Redis key prefix cannot be empty".to_string(),
45 ));
46 }
47
48 Ok(Self {
49 connections,
50 key_prefix,
51 compiled_code_cache: Arc::new(RwLock::new(LruCache::new(
52 NonZeroUsize::new(100).unwrap(),
53 ))),
54 })
55 }
56
57 fn plugin_key(&self, plugin_id: &str) -> String {
59 format!("{}:{}:{}", self.key_prefix, PLUGIN_PREFIX, plugin_id)
60 }
61
62 fn plugin_list_key(&self) -> String {
64 format!("{}:{}", self.key_prefix, PLUGIN_LIST_KEY)
65 }
66
67 fn compiled_code_key(&self, plugin_id: &str) -> String {
69 format!("{}:{}:{}", self.key_prefix, COMPILED_CODE_PREFIX, plugin_id)
70 }
71
72 fn source_hash_key(&self, plugin_id: &str) -> String {
74 format!("{}:{}:{}", self.key_prefix, SOURCE_HASH_PREFIX, plugin_id)
75 }
76
77 async fn get_by_id_with_connection(
86 &self,
87 id: &str,
88 conn: &mut deadpool_redis::Connection,
89 ) -> Result<Option<PluginModel>, RepositoryError> {
90 if id.is_empty() {
91 return Err(RepositoryError::InvalidData(
92 "Plugin ID cannot be empty".to_string(),
93 ));
94 }
95 let key = self.plugin_key(id);
96
97 debug!(plugin_id = %id, "fetching plugin data");
98
99 let json: Option<String> = conn
100 .get(&key)
101 .await
102 .map_err(|e| self.map_redis_error(e, "get_plugin_by_id"))?;
103
104 match json {
105 Some(json) => {
106 debug!(plugin_id = %id, "found plugin data");
107 let plugin = self.deserialize_entity::<PluginModel>(&json, id, "plugin")?;
108 Ok(Some(plugin))
109 }
110 None => {
111 debug!(plugin_id = %id, "no plugin found");
112 Ok(None)
113 }
114 }
115 }
116
117 async fn get_by_ids(
118 &self,
119 ids: &[String],
120 ) -> Result<BatchRetrievalResult<PluginModel>, RepositoryError> {
121 if ids.is_empty() {
122 debug!("no plugin IDs provided for batch fetch");
123 return Ok(BatchRetrievalResult {
124 results: vec![],
125 failed_ids: vec![],
126 });
127 }
128
129 let mut conn = self
130 .get_connection(self.connections.reader(), "get_by_ids")
131 .await?;
132 let keys: Vec<String> = ids.iter().map(|id| self.plugin_key(id)).collect();
133
134 let values: Vec<Option<String>> = conn
135 .mget(&keys)
136 .await
137 .map_err(|e| self.map_redis_error(e, "batch_fetch_plugins"))?;
138
139 let mut plugins = Vec::new();
140 let mut failed_count = 0;
141 let mut failed_ids = Vec::new();
142 for (i, value) in values.into_iter().enumerate() {
143 match value {
144 Some(json) => match self.deserialize_entity(&json, &ids[i], "plugin") {
145 Ok(plugin) => plugins.push(plugin),
146 Err(e) => {
147 failed_count += 1;
148 error!(plugin_id = %ids[i], error = %e, "failed to deserialize plugin");
149 failed_ids.push(ids[i].clone());
150 }
151 },
152 None => {
153 warn!(plugin_id = %ids[i], "plugin not found in batch fetch");
154 }
155 }
156 }
157
158 if failed_count > 0 {
159 warn!(failed_count = %failed_count, total_count = %ids.len(), "failed to deserialize plugins in batch");
160 }
161
162 Ok(BatchRetrievalResult {
163 results: plugins,
164 failed_ids,
165 })
166 }
167}
168
169impl fmt::Debug for RedisPluginRepository {
170 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
171 f.debug_struct("RedisPluginRepository")
172 .field("pool", &"<Pool>")
173 .field("key_prefix", &self.key_prefix)
174 .finish()
175 }
176}
177
178#[async_trait]
179impl PluginRepositoryTrait for RedisPluginRepository {
180 async fn get_by_id(&self, id: &str) -> Result<Option<PluginModel>, RepositoryError> {
181 let mut conn = self
182 .get_connection(self.connections.reader(), "get_by_id")
183 .await?;
184 self.get_by_id_with_connection(id, &mut conn).await
185 }
186
187 async fn add(&self, plugin: PluginModel) -> Result<(), RepositoryError> {
188 if plugin.id.is_empty() {
189 return Err(RepositoryError::InvalidData(
190 "Plugin ID cannot be empty".to_string(),
191 ));
192 }
193
194 if plugin.path.is_empty() {
195 return Err(RepositoryError::InvalidData(
196 "Plugin path cannot be empty".to_string(),
197 ));
198 }
199
200 let mut conn = self
201 .get_connection(self.connections.primary(), "add")
202 .await?;
203 let key = self.plugin_key(&plugin.id);
204 let list_key = self.plugin_list_key();
205
206 debug!(plugin_id = %plugin.id, "adding plugin");
207
208 let exists: bool = conn
210 .exists(&key)
211 .await
212 .map_err(|e| self.map_redis_error(e, "check_plugin_exists"))?;
213
214 if exists {
215 return Err(RepositoryError::ConstraintViolation(format!(
216 "Plugin with ID {} already exists",
217 plugin.id
218 )));
219 }
220
221 let json = self.serialize_entity(&plugin, |p| &p.id, "plugin")?;
223
224 let mut pipe = redis::pipe();
226 pipe.atomic();
227 pipe.set(&key, &json);
228 pipe.sadd(&list_key, &plugin.id);
229
230 pipe.exec_async(&mut conn).await.map_err(|e| {
231 error!(plugin_id = %plugin.id, error = %e, "failed to add plugin");
232 self.map_redis_error(e, "add_plugin")
233 })?;
234
235 debug!(plugin_id = %plugin.id, "successfully added plugin");
236 Ok(())
237 }
238
239 async fn update(&self, plugin: PluginModel) -> Result<PluginModel, RepositoryError> {
240 if plugin.id.is_empty() {
241 return Err(RepositoryError::InvalidData(
242 "Plugin ID cannot be empty".to_string(),
243 ));
244 }
245
246 let mut conn = self
247 .get_connection(self.connections.primary(), "update")
248 .await?;
249 let key = self.plugin_key(&plugin.id);
250
251 debug!(plugin_id = %plugin.id, "updating plugin");
252
253 let exists: bool = conn
255 .exists(&key)
256 .await
257 .map_err(|e| self.map_redis_error(e, &format!("check_plugin_exists_{}", plugin.id)))?;
258
259 if !exists {
260 return Err(RepositoryError::NotFound(format!(
261 "Plugin with ID {} not found",
262 plugin.id
263 )));
264 }
265
266 let json = self.serialize_entity(&plugin, |p| &p.id, "plugin")?;
268
269 conn.set::<_, _, ()>(&key, &json)
271 .await
272 .map_err(|e| self.map_redis_error(e, &format!("update_plugin_{}", plugin.id)))?;
273
274 debug!(plugin_id = %plugin.id, "successfully updated plugin");
275 Ok(plugin)
276 }
277
278 async fn list_paginated(
279 &self,
280 query: PaginationQuery,
281 ) -> Result<PaginatedResult<PluginModel>, RepositoryError> {
282 if query.page == 0 {
283 return Err(RepositoryError::InvalidData(
284 "Page number must be greater than 0".to_string(),
285 ));
286 }
287
288 if query.per_page == 0 {
289 return Err(RepositoryError::InvalidData(
290 "Per page count must be greater than 0".to_string(),
291 ));
292 }
293
294 let (total, ids_to_query) = {
295 let mut conn = self
296 .get_connection(self.connections.reader(), "list_paginated")
297 .await?;
298 let plugin_list_key = self.plugin_list_key();
299
300 let total: u64 = conn
302 .scard(&plugin_list_key)
303 .await
304 .map_err(|e| self.map_redis_error(e, "list_paginated_count"))?;
305
306 if total == 0 {
307 return Ok(PaginatedResult {
308 items: vec![],
309 total: 0,
310 page: query.page,
311 per_page: query.per_page,
312 });
313 }
314
315 let all_ids: Vec<String> = conn
317 .smembers(&plugin_list_key)
318 .await
319 .map_err(|e| self.map_redis_error(e, "list_paginated_members"))?;
320
321 let start = ((query.page - 1) * query.per_page) as usize;
322 let end = (start + query.per_page as usize).min(all_ids.len());
323
324 (total, all_ids[start..end].to_vec())
325 };
327
328 let items = self.get_by_ids(&ids_to_query).await?;
329
330 Ok(PaginatedResult {
331 items: items.results.clone(),
332 total,
333 page: query.page,
334 per_page: query.per_page,
335 })
336 }
337
338 async fn count(&self) -> Result<usize, RepositoryError> {
339 let mut conn = self
340 .get_connection(self.connections.reader(), "count")
341 .await?;
342 let plugin_list_key = self.plugin_list_key();
343
344 let count: u64 = conn
345 .scard(&plugin_list_key)
346 .await
347 .map_err(|e| self.map_redis_error(e, "count_plugins"))?;
348
349 Ok(count as usize)
350 }
351
352 async fn has_entries(&self) -> Result<bool, RepositoryError> {
353 let mut conn = self
354 .get_connection(self.connections.reader(), "has_entries")
355 .await?;
356 let plugin_list_key = self.plugin_list_key();
357
358 debug!("checking if plugin entries exist");
359
360 let exists: bool = conn
361 .exists(&plugin_list_key)
362 .await
363 .map_err(|e| self.map_redis_error(e, "has_entries_check"))?;
364
365 debug!(exists = %exists, "plugin entries exist");
366 Ok(exists)
367 }
368
369 async fn drop_all_entries(&self) -> Result<(), RepositoryError> {
370 let mut conn = self
371 .get_connection(self.connections.primary(), "drop_all_entries")
372 .await?;
373 let plugin_list_key = self.plugin_list_key();
374
375 debug!("dropping all plugin entries");
376
377 let plugin_ids: Vec<String> = conn
379 .smembers(&plugin_list_key)
380 .await
381 .map_err(|e| self.map_redis_error(e, "drop_all_entries_get_ids"))?;
382
383 if plugin_ids.is_empty() {
384 debug!("no plugin entries to drop");
385 return Ok(());
386 }
387
388 let mut pipe = redis::pipe();
390 pipe.atomic();
391
392 for plugin_id in &plugin_ids {
394 let plugin_key = self.plugin_key(plugin_id);
395 pipe.del(&plugin_key);
396 }
397
398 pipe.del(&plugin_list_key);
400
401 pipe.exec_async(&mut conn)
402 .await
403 .map_err(|e| self.map_redis_error(e, "drop_all_entries_pipeline"))?;
404
405 debug!(count = %plugin_ids.len(), "dropped plugin entries");
406 Ok(())
407 }
408
409 async fn get_compiled_code(&self, plugin_id: &str) -> Result<Option<String>, RepositoryError> {
412 const CACHE_TTL_SECS: u64 = 300; let cached_code = {
418 let cache = self.compiled_code_cache.read();
419 cache.peek(plugin_id).and_then(|cached| {
420 if cached.cached_at.elapsed().as_secs() < CACHE_TTL_SECS {
421 Some(cached.code.clone())
422 } else {
423 None
424 }
425 })
426 };
427
428 if let Some(code) = cached_code {
429 return Ok(Some(code));
430 }
431
432 let mut conn = self
434 .get_connection(self.connections.reader(), "get_compiled_code")
435 .await?;
436 let key = self.compiled_code_key(plugin_id);
437
438 let code: Option<String> = conn
439 .get(&key)
440 .await
441 .map_err(|e| self.map_redis_error(e, &format!("get_compiled_code_{plugin_id}")))?;
442
443 if let Some(ref code_str) = code {
447 let mut cache = self.compiled_code_cache.write();
448 cache.put(
449 plugin_id.to_string(),
450 CachedCompiledCode {
451 code: code_str.clone(),
452 cached_at: Instant::now(),
453 },
454 );
455 }
456
457 Ok(code)
458 }
459
460 async fn store_compiled_code(
461 &self,
462 plugin_id: &str,
463 compiled_code: &str,
464 source_hash: Option<&str>,
465 ) -> Result<(), RepositoryError> {
466 let mut conn = self
467 .get_connection(self.connections.primary(), "store_compiled_code")
468 .await?;
469 let code_key = self.compiled_code_key(plugin_id);
470
471 debug!(plugin_id = %plugin_id, "storing compiled code in Redis");
472
473 let mut pipe = redis::pipe();
475 pipe.atomic();
476 pipe.set(&code_key, compiled_code);
477
478 if let Some(hash) = source_hash {
479 let hash_key = self.source_hash_key(plugin_id);
480 pipe.set(&hash_key, hash);
481 }
482
483 pipe.exec_async(&mut conn)
484 .await
485 .map_err(|e| self.map_redis_error(e, &format!("store_compiled_code_{plugin_id}")))?;
486
487 self.compiled_code_cache.write().pop(plugin_id);
489
490 Ok(())
491 }
492
493 async fn invalidate_compiled_code(&self, plugin_id: &str) -> Result<(), RepositoryError> {
494 let mut conn = self
495 .get_connection(self.connections.primary(), "invalidate_compiled_code")
496 .await?;
497 let code_key = self.compiled_code_key(plugin_id);
498 let hash_key = self.source_hash_key(plugin_id);
499
500 debug!(plugin_id = %plugin_id, "invalidating compiled code in Redis");
501
502 let mut pipe = redis::pipe();
504 pipe.atomic();
505 pipe.del(&code_key);
506 pipe.del(&hash_key);
507
508 pipe.exec_async(&mut conn).await.map_err(|e| {
509 self.map_redis_error(e, &format!("invalidate_compiled_code_{plugin_id}"))
510 })?;
511
512 self.compiled_code_cache.write().pop(plugin_id);
514
515 Ok(())
516 }
517
518 async fn invalidate_all_compiled_code(&self) -> Result<(), RepositoryError> {
519 let mut conn = self
520 .get_connection(self.connections.primary(), "invalidate_all_compiled_code")
521 .await?;
522 let plugin_list_key = self.plugin_list_key();
523
524 debug!("invalidating all compiled code in Redis");
525
526 let plugin_ids: Vec<String> = conn
528 .smembers(&plugin_list_key)
529 .await
530 .map_err(|e| self.map_redis_error(e, "get_plugin_list_for_invalidate"))?;
531
532 for plugin_id in &plugin_ids {
534 let code_key = self.compiled_code_key(plugin_id);
535 let hash_key = self.source_hash_key(plugin_id);
536
537 let _ = conn.del::<_, ()>(&code_key).await;
538 let _ = conn.del::<_, ()>(&hash_key).await;
539 }
540
541 self.compiled_code_cache.write().clear();
543
544 Ok(())
545 }
546
547 async fn has_compiled_code(&self, plugin_id: &str) -> Result<bool, RepositoryError> {
548 let mut conn = self
549 .get_connection(self.connections.reader(), "has_compiled_code")
550 .await?;
551 let key = self.compiled_code_key(plugin_id);
552
553 let exists: bool = conn
554 .exists(&key)
555 .await
556 .map_err(|e| self.map_redis_error(e, &format!("exists_compiled_code_{plugin_id}")))?;
557
558 Ok(exists)
559 }
560
561 async fn get_source_hash(&self, plugin_id: &str) -> Result<Option<String>, RepositoryError> {
562 let mut conn = self
563 .get_connection(self.connections.reader(), "get_source_hash")
564 .await?;
565 let key = self.source_hash_key(plugin_id);
566
567 let hash: Option<String> = conn
568 .get(&key)
569 .await
570 .map_err(|e| self.map_redis_error(e, &format!("get_source_hash_{plugin_id}")))?;
571
572 Ok(hash)
573 }
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579 use crate::constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS;
580 use crate::models::PluginModel;
581 use std::{sync::Arc, time::Duration};
582
583 fn create_test_plugin(id: &str, path: &str) -> PluginModel {
584 PluginModel {
585 id: id.to_string(),
586 path: path.to_string(),
587 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
588 emit_logs: false,
589 emit_traces: false,
590 raw_response: false,
591 allow_get_invocation: false,
592 config: None,
593 forward_logs: false,
594 }
595 }
596
597 async fn setup_test_repo() -> RedisPluginRepository {
598 let redis_url =
599 std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379/".to_string());
600 let cfg = deadpool_redis::Config::from_url(&redis_url);
601 let pool = Arc::new(
602 cfg.builder()
603 .expect("Failed to create pool builder")
604 .max_size(16)
605 .runtime(deadpool_redis::Runtime::Tokio1)
606 .build()
607 .expect("Failed to build Redis pool"),
608 );
609 let connections = Arc::new(RedisConnections::new_single_pool(pool));
610
611 let random_id = uuid::Uuid::new_v4().to_string();
612 let key_prefix = format!("test_prefix:{random_id}");
613
614 RedisPluginRepository::new(connections, key_prefix)
615 .expect("Failed to create Redis plugin repository")
616 }
617
618 #[tokio::test]
619 #[ignore = "Requires active Redis instance"]
620 async fn test_new_repository_creation() {
621 let repo = setup_test_repo().await;
622 assert!(repo.key_prefix.contains("test_prefix"));
623 }
624
625 #[tokio::test]
626 #[ignore = "Requires active Redis instance"]
627 async fn test_new_repository_empty_prefix_fails() {
628 let redis_url = "redis://127.0.0.1:6379/";
629 let cfg = deadpool_redis::Config::from_url(redis_url);
630 let pool = Arc::new(
631 cfg.builder()
632 .expect("Failed to create pool builder")
633 .max_size(16)
634 .runtime(deadpool_redis::Runtime::Tokio1)
635 .build()
636 .expect("Failed to build Redis pool"),
637 );
638 let connections = Arc::new(RedisConnections::new_single_pool(pool));
639
640 let result = RedisPluginRepository::new(connections, "".to_string());
641 assert!(result.is_err());
642 assert!(result
643 .unwrap_err()
644 .to_string()
645 .contains("key prefix cannot be empty"));
646 }
647
648 #[tokio::test]
649 #[ignore = "Requires active Redis instance"]
650 async fn test_key_generation() {
651 let repo = setup_test_repo().await;
652
653 let plugin_key = repo.plugin_key("test-plugin");
654 assert!(plugin_key.contains(":plugin:test-plugin"));
655
656 let list_key = repo.plugin_list_key();
657 assert!(list_key.contains(":plugin_list"));
658 }
659
660 #[tokio::test]
661 #[ignore = "Requires active Redis instance"]
662 async fn test_serialize_deserialize_plugin() {
663 let repo = setup_test_repo().await;
664 let plugin = create_test_plugin("test-plugin", "/path/to/plugin");
665
666 let json = repo.serialize_entity(&plugin, |p| &p.id, "plugin").unwrap();
667 let deserialized: PluginModel = repo
668 .deserialize_entity(&json, &plugin.id, "plugin")
669 .unwrap();
670
671 assert_eq!(plugin.id, deserialized.id);
672 assert_eq!(plugin.path, deserialized.path);
673 }
674
675 #[tokio::test]
676 #[ignore = "Requires active Redis instance"]
677 async fn test_add_plugin() {
678 let repo = setup_test_repo().await;
679 let plugin_name = uuid::Uuid::new_v4().to_string();
680 let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
681
682 let result = repo.add(plugin).await;
683 assert!(result.is_ok());
684 }
685
686 #[tokio::test]
687 #[ignore = "Requires active Redis instance"]
688 async fn test_get_plugin() {
689 let repo = setup_test_repo().await;
690 let plugin_name = uuid::Uuid::new_v4().to_string();
691 let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
692
693 repo.add(plugin.clone()).await.unwrap();
695
696 let retrieved = repo.get_by_id(&plugin_name).await.unwrap();
698 assert!(retrieved.is_some());
699 let retrieved = retrieved.unwrap();
700 assert_eq!(retrieved.id, plugin.id);
701 assert_eq!(retrieved.path, plugin.path);
702 }
703
704 #[tokio::test]
705 #[ignore = "Requires active Redis instance"]
706 async fn test_get_nonexistent_plugin() {
707 let repo = setup_test_repo().await;
708
709 let result = repo.get_by_id("nonexistent-plugin").await;
710 assert!(matches!(result, Ok(None)));
711 }
712
713 #[tokio::test]
714 #[ignore = "Requires active Redis instance"]
715 async fn test_duplicate_plugin_addition() {
716 let repo = setup_test_repo().await;
717 let plugin_name = uuid::Uuid::new_v4().to_string();
718 let plugin = create_test_plugin(&plugin_name, "/path/to/plugin");
719
720 repo.add(plugin.clone()).await.unwrap();
722
723 let result = repo.add(plugin).await;
725 assert!(result.is_err());
726
727 if let Err(RepositoryError::ConstraintViolation(msg)) = result {
728 assert!(msg.contains("already exists"));
729 } else {
730 panic!("Expected ConstraintViolation error");
731 }
732 }
733
734 #[tokio::test]
735 #[ignore = "Requires active Redis instance"]
736 async fn test_debug_implementation() {
737 let repo = setup_test_repo().await;
738 let debug_str = format!("{repo:?}");
739 assert!(debug_str.contains("RedisPluginRepository"));
740 assert!(debug_str.contains("test_prefix"));
741 }
742
743 #[tokio::test]
744 #[ignore = "Requires active Redis instance"]
745 async fn test_error_handling_empty_id() {
746 let repo = setup_test_repo().await;
747
748 let result = repo.get_by_id("").await;
749 assert!(result.is_err());
750 assert!(result
751 .unwrap_err()
752 .to_string()
753 .contains("ID cannot be empty"));
754 }
755
756 #[tokio::test]
757 #[ignore = "Requires active Redis instance"]
758 async fn test_add_plugin_with_empty_id() {
759 let repo = setup_test_repo().await;
760 let plugin = create_test_plugin("", "/path/to/plugin");
761
762 let result = repo.add(plugin).await;
763 assert!(result.is_err());
764 assert!(result
765 .unwrap_err()
766 .to_string()
767 .contains("ID cannot be empty"));
768 }
769
770 #[tokio::test]
771 #[ignore = "Requires active Redis instance"]
772 async fn test_add_plugin_with_empty_path() {
773 let repo = setup_test_repo().await;
774 let plugin = create_test_plugin("test-plugin", "");
775
776 let result = repo.add(plugin).await;
777 assert!(result.is_err());
778 assert!(result
779 .unwrap_err()
780 .to_string()
781 .contains("path cannot be empty"));
782 }
783
784 #[tokio::test]
785 #[ignore = "Requires active Redis instance"]
786 async fn test_get_by_ids_plugins() {
787 let repo = setup_test_repo().await;
788 let plugin_name1 = uuid::Uuid::new_v4().to_string();
789 let plugin_name2 = uuid::Uuid::new_v4().to_string();
790 let plugin1 = create_test_plugin(&plugin_name1, "/path/to/plugin1");
791 let plugin2 = create_test_plugin(&plugin_name2, "/path/to/plugin2");
792
793 repo.add(plugin1.clone()).await.unwrap();
794 repo.add(plugin2.clone()).await.unwrap();
795
796 let retrieved = repo
797 .get_by_ids(&[plugin1.id.clone(), plugin2.id.clone()])
798 .await
799 .unwrap();
800 assert_eq!(retrieved.results.len(), 2);
801 let result_ids: Vec<String> = retrieved.results.iter().map(|p| p.id.clone()).collect();
803 assert!(result_ids.contains(&plugin1.id));
804 assert!(result_ids.contains(&plugin2.id));
805 assert_eq!(retrieved.failed_ids.len(), 0);
806 }
807
808 #[tokio::test]
809 #[ignore = "Requires active Redis instance"]
810 async fn test_list_paginated_plugins() {
811 let repo = setup_test_repo().await;
812
813 let plugin_id1 = uuid::Uuid::new_v4().to_string();
814 let plugin_id2 = uuid::Uuid::new_v4().to_string();
815 let plugin_id3 = uuid::Uuid::new_v4().to_string();
816 let plugin1 = create_test_plugin(&plugin_id1, "/path/to/plugin1");
817 let plugin2 = create_test_plugin(&plugin_id2, "/path/to/plugin2");
818 let plugin3 = create_test_plugin(&plugin_id3, "/path/to/plugin3");
819
820 repo.add(plugin1.clone()).await.unwrap();
821 repo.add(plugin2.clone()).await.unwrap();
822 repo.add(plugin3.clone()).await.unwrap();
823
824 let query = PaginationQuery {
825 page: 1,
826 per_page: 2,
827 };
828
829 let result = repo.list_paginated(query).await;
830 assert!(result.is_ok());
831 let result = result.unwrap();
832 assert!(result.items.len() == 2);
833 }
834
835 #[tokio::test]
836 #[ignore = "Requires active Redis instance"]
837 async fn test_has_entries() {
838 let repo = setup_test_repo().await;
839 assert!(!repo.has_entries().await.unwrap());
840 repo.add(create_test_plugin("test-plugin", "/path/to/plugin"))
841 .await
842 .unwrap();
843 assert!(repo.has_entries().await.unwrap());
844 repo.drop_all_entries().await.unwrap();
845 assert!(!repo.has_entries().await.unwrap());
846 }
847
848 #[tokio::test]
849 #[ignore = "Requires active Redis instance"]
850 async fn test_drop_all_entries() {
851 let repo = setup_test_repo().await;
852 repo.add(create_test_plugin("test-plugin", "/path/to/plugin"))
853 .await
854 .unwrap();
855 assert!(repo.has_entries().await.unwrap());
856 repo.drop_all_entries().await.unwrap();
857 assert!(!repo.has_entries().await.unwrap());
858 }
859}