openzeppelin_relayer/bootstrap/
initialize_plugins.rs

1//! Plugin Pool Initialization
2//!
3//! This module handles conditional initialization of the plugin worker pool.
4//! The pool is only started if plugins are configured, avoiding overhead
5//! when the plugin system is not in use.
6
7use std::sync::Arc;
8use tracing::{info, warn};
9
10use crate::repositories::PluginRepositoryTrait;
11use crate::services::plugins::{get_pool_manager, PluginRunner, PluginService, PoolManager};
12
13/// Initialize the plugin worker pool if plugins are configured.
14///
15/// This function checks if any plugins are registered in the repository.
16/// If plugins exist, it starts the Piscina worker pool for efficient
17/// plugin execution. If no plugins are configured, it skips initialization.
18///
19/// # Arguments
20///
21/// * `plugin_repository` - Reference to the plugin repository
22///
23/// # Returns
24///
25/// * `Ok(Some(Arc<PoolManager>))` - Pool manager if plugins are configured
26/// * `Ok(None)` - If no plugins are configured
27/// * `Err` - If pool initialization fails
28pub async fn initialize_plugin_pool<PR: PluginRepositoryTrait>(
29    plugin_repository: &PR,
30) -> eyre::Result<Option<Arc<PoolManager>>> {
31    let has_plugins = plugin_repository
32        .has_entries()
33        .await
34        .map_err(|e| eyre::eyre!("Failed to check plugin repository: {}", e))?;
35
36    if !has_plugins {
37        info!("No plugins configured, skipping plugin pool initialization");
38        return Ok(None);
39    }
40
41    let plugin_count = plugin_repository.count().await.unwrap_or(0);
42    info!(
43        plugin_count = plugin_count,
44        "Plugins detected, initializing worker pool"
45    );
46
47    let pool_manager = get_pool_manager();
48
49    match pool_manager.ensure_started().await {
50        Ok(()) => {
51            info!("Plugin worker pool initialized successfully");
52            Ok(Some(pool_manager))
53        }
54        Err(e) => {
55            warn!(error = %e, "Failed to start plugin worker pool, falling back to ts-node execution");
56            Ok(None)
57        }
58    }
59}
60
61/// Precompile all configured plugins.
62///
63/// This function loads all plugins from the repository and triggers
64/// precompilation via the worker pool. Compiled code is cached in
65/// the pool for fast execution.
66///
67/// # Arguments
68///
69/// * `plugin_repository` - Reference to the plugin repository
70/// * `pool_manager` - The pool manager to use for compilation
71///
72/// # Returns
73///
74/// * `Ok(usize)` - Number of plugins successfully precompiled
75/// * `Err` - If precompilation fails critically
76pub async fn precompile_plugins<PR: PluginRepositoryTrait>(
77    plugin_repository: &PR,
78    pool_manager: &PoolManager,
79) -> eyre::Result<usize> {
80    use crate::models::PaginationQuery;
81
82    let query = PaginationQuery {
83        page: 1,
84        per_page: 1000,
85    };
86
87    let plugins = plugin_repository
88        .list_paginated(query)
89        .await
90        .map_err(|e| eyre::eyre!("Failed to list plugins: {}", e))?;
91
92    let mut compiled_count = 0;
93
94    for plugin in plugins.items {
95        let plugin_path = PluginService::<PluginRunner>::resolve_plugin_path(&plugin.path);
96
97        match pool_manager
98            .precompile_plugin(plugin.id.clone(), Some(plugin_path), None)
99            .await
100        {
101            Ok(compiled_code) => {
102                if let Err(e) = pool_manager
103                    .cache_compiled_code(plugin.id.clone(), compiled_code)
104                    .await
105                {
106                    warn!(
107                        plugin_id = %plugin.id,
108                        error = %e,
109                        "Failed to cache compiled plugin code"
110                    );
111                } else {
112                    compiled_count += 1;
113                    info!(plugin_id = %plugin.id, "Plugin precompiled successfully");
114                }
115            }
116            Err(e) => {
117                warn!(
118                    plugin_id = %plugin.id,
119                    error = %e,
120                    "Failed to precompile plugin"
121                );
122            }
123        }
124    }
125
126    info!(
127        compiled_count = compiled_count,
128        total_plugins = plugins.total,
129        "Plugin precompilation complete"
130    );
131
132    Ok(compiled_count)
133}
134
135/// Shutdown the plugin pool gracefully.
136///
137/// This should be called during application shutdown to properly
138/// terminate the worker pool and clean up resources.
139pub async fn shutdown_plugin_pool() -> eyre::Result<()> {
140    let pool_manager = get_pool_manager();
141    pool_manager
142        .shutdown()
143        .await
144        .map_err(|e| eyre::eyre!("Failed to shutdown plugin pool: {}", e))?;
145    info!("Plugin worker pool shutdown complete");
146    Ok(())
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152    use crate::models::RepositoryError;
153    use crate::repositories::MockPluginRepositoryTrait;
154
155    // ============================================
156    // initialize_plugin_pool tests
157    // ============================================
158
159    #[tokio::test]
160    async fn test_initialize_plugin_pool_no_plugins() {
161        let mut mock_repo = MockPluginRepositoryTrait::new();
162        mock_repo
163            .expect_has_entries()
164            .returning(|| Box::pin(async { Ok(false) }));
165
166        let result = initialize_plugin_pool(&mock_repo).await;
167        assert!(result.is_ok());
168        assert!(result.unwrap().is_none());
169    }
170
171    #[tokio::test]
172    async fn test_initialize_plugin_pool_has_entries_error() {
173        let mut mock_repo = MockPluginRepositoryTrait::new();
174        mock_repo.expect_has_entries().returning(|| {
175            Box::pin(async {
176                Err(RepositoryError::ConnectionError(
177                    "Database unavailable".to_string(),
178                ))
179            })
180        });
181
182        let result = initialize_plugin_pool(&mock_repo).await;
183        assert!(result.is_err());
184
185        match result {
186            Err(e) => assert!(e.to_string().contains("Failed to check plugin repository")),
187            Ok(_) => panic!("Expected error"),
188        }
189    }
190
191    #[tokio::test]
192    async fn test_initialize_plugin_pool_has_entries_unknown_error() {
193        let mut mock_repo = MockPluginRepositoryTrait::new();
194        mock_repo.expect_has_entries().returning(|| {
195            Box::pin(async { Err(RepositoryError::Unknown("Unknown error".to_string())) })
196        });
197
198        let result = initialize_plugin_pool(&mock_repo).await;
199        assert!(result.is_err());
200    }
201
202    #[tokio::test]
203    async fn test_initialize_plugin_pool_with_plugins_count_fails() {
204        let mut mock_repo = MockPluginRepositoryTrait::new();
205
206        // has_entries returns true (plugins exist)
207        mock_repo
208            .expect_has_entries()
209            .returning(|| Box::pin(async { Ok(true) }));
210
211        // count fails but we handle it gracefully (unwrap_or(0))
212        mock_repo.expect_count().returning(|| {
213            Box::pin(async { Err(RepositoryError::Unknown("Count failed".to_string())) })
214        });
215
216        // The function should still proceed even if count fails
217        let result = initialize_plugin_pool(&mock_repo).await;
218        // Result depends on whether pool can be started, but we at least verify
219        // that the function doesn't panic when count fails
220        assert!(result.is_ok() || result.is_err());
221    }
222
223    #[tokio::test]
224    async fn test_initialize_plugin_pool_with_plugins_exists() {
225        let mut mock_repo = MockPluginRepositoryTrait::new();
226
227        mock_repo
228            .expect_has_entries()
229            .returning(|| Box::pin(async { Ok(true) }));
230
231        mock_repo
232            .expect_count()
233            .returning(|| Box::pin(async { Ok(5) }));
234
235        // This test verifies the function proceeds when plugins exist
236        // The actual pool startup may succeed or fail depending on environment
237        let result = initialize_plugin_pool(&mock_repo).await;
238        // We just verify it doesn't panic and returns a result
239        assert!(result.is_ok() || result.is_err());
240    }
241
242    #[tokio::test]
243    async fn test_initialize_plugin_pool_with_zero_count_but_has_entries() {
244        let mut mock_repo = MockPluginRepositoryTrait::new();
245
246        // Edge case: has_entries returns true but count returns 0
247        // This shouldn't happen in practice but tests defensive coding
248        mock_repo
249            .expect_has_entries()
250            .returning(|| Box::pin(async { Ok(true) }));
251
252        mock_repo
253            .expect_count()
254            .returning(|| Box::pin(async { Ok(0) }));
255
256        let result = initialize_plugin_pool(&mock_repo).await;
257        // Should still attempt to start the pool
258        assert!(result.is_ok() || result.is_err());
259    }
260
261    // ============================================
262    // precompile_plugins tests
263    // ============================================
264
265    #[tokio::test]
266    async fn test_precompile_plugins_list_paginated_error() {
267        use crate::models::PaginationQuery;
268
269        let mut mock_repo = MockPluginRepositoryTrait::new();
270
271        mock_repo
272            .expect_list_paginated()
273            .withf(|query: &PaginationQuery| query.page == 1 && query.per_page == 1000)
274            .returning(|_| {
275                Box::pin(async {
276                    Err(RepositoryError::ConnectionError(
277                        "Database unavailable".to_string(),
278                    ))
279                })
280            });
281
282        // We need a real pool manager for this test, but since we're testing
283        // the error path, we can use the global one
284        let pool_manager = get_pool_manager();
285
286        let result = precompile_plugins(&mock_repo, &pool_manager).await;
287        assert!(result.is_err());
288
289        match result {
290            Err(e) => assert!(e.to_string().contains("Failed to list plugins")),
291            Ok(_) => panic!("Expected error"),
292        }
293    }
294
295    #[tokio::test]
296    async fn test_precompile_plugins_empty_list() {
297        use crate::repositories::PaginatedResult;
298
299        let mut mock_repo = MockPluginRepositoryTrait::new();
300
301        mock_repo.expect_list_paginated().returning(|_| {
302            Box::pin(async {
303                Ok(PaginatedResult {
304                    items: vec![],
305                    total: 0,
306                    page: 1,
307                    per_page: 1000,
308                })
309            })
310        });
311
312        let pool_manager = get_pool_manager();
313
314        let result = precompile_plugins(&mock_repo, &pool_manager).await;
315        assert!(result.is_ok());
316        assert_eq!(result.unwrap(), 0);
317    }
318
319    #[tokio::test]
320    async fn test_precompile_plugins_pagination_query_params() {
321        use crate::models::PaginationQuery;
322        use crate::repositories::PaginatedResult;
323        use std::sync::atomic::{AtomicBool, Ordering};
324        use std::sync::Arc;
325
326        let was_called = Arc::new(AtomicBool::new(false));
327        let was_called_clone = was_called.clone();
328
329        let mut mock_repo = MockPluginRepositoryTrait::new();
330
331        mock_repo
332            .expect_list_paginated()
333            .withf(move |query: &PaginationQuery| {
334                // Verify the correct pagination parameters are used
335                let correct = query.page == 1 && query.per_page == 1000;
336                was_called_clone.store(true, Ordering::SeqCst);
337                correct
338            })
339            .returning(|_| {
340                Box::pin(async {
341                    Ok(PaginatedResult {
342                        items: vec![],
343                        total: 0,
344                        page: 1,
345                        per_page: 1000,
346                    })
347                })
348            });
349
350        let pool_manager = get_pool_manager();
351
352        let _ = precompile_plugins(&mock_repo, &pool_manager).await;
353
354        assert!(
355            was_called.load(Ordering::SeqCst),
356            "list_paginated should have been called"
357        );
358    }
359
360    // ============================================
361    // Helper function tests
362    // ============================================
363
364    #[test]
365    fn test_resolve_plugin_path_absolute() {
366        // Test that PluginService::resolve_plugin_path handles paths correctly
367        // This is an indirect test of the path resolution used in precompile_plugins
368        let path = "/absolute/path/to/plugin.ts";
369        let resolved = PluginService::<PluginRunner>::resolve_plugin_path(path);
370        assert!(resolved.contains("plugin.ts"));
371    }
372
373    #[test]
374    fn test_resolve_plugin_path_relative() {
375        let path = "relative/path/plugin.ts";
376        let resolved = PluginService::<PluginRunner>::resolve_plugin_path(path);
377        assert!(resolved.contains("plugin.ts"));
378    }
379
380    // ============================================
381    // Error handling tests
382    // ============================================
383
384    #[tokio::test]
385    async fn test_initialize_plugin_pool_not_found_error() {
386        let mut mock_repo = MockPluginRepositoryTrait::new();
387        mock_repo.expect_has_entries().returning(|| {
388            Box::pin(async { Err(RepositoryError::NotFound("not found".to_string())) })
389        });
390
391        let result = initialize_plugin_pool(&mock_repo).await;
392        assert!(result.is_err(), "Should fail for NotFound error");
393    }
394
395    #[tokio::test]
396    async fn test_initialize_plugin_pool_lock_error() {
397        let mut mock_repo = MockPluginRepositoryTrait::new();
398        mock_repo.expect_has_entries().returning(|| {
399            Box::pin(async { Err(RepositoryError::LockError("lock error".to_string())) })
400        });
401
402        let result = initialize_plugin_pool(&mock_repo).await;
403        assert!(result.is_err(), "Should fail for LockError");
404    }
405
406    #[tokio::test]
407    async fn test_initialize_plugin_pool_invalid_data_error() {
408        let mut mock_repo = MockPluginRepositoryTrait::new();
409        mock_repo.expect_has_entries().returning(|| {
410            Box::pin(async { Err(RepositoryError::InvalidData("invalid".to_string())) })
411        });
412
413        let result = initialize_plugin_pool(&mock_repo).await;
414        assert!(result.is_err(), "Should fail for InvalidData error");
415    }
416
417    #[tokio::test]
418    async fn test_precompile_plugins_not_found_error() {
419        let mut mock_repo = MockPluginRepositoryTrait::new();
420        mock_repo.expect_list_paginated().returning(|_| {
421            Box::pin(async { Err(RepositoryError::NotFound("not found".to_string())) })
422        });
423
424        let pool_manager = get_pool_manager();
425        let result = precompile_plugins(&mock_repo, &pool_manager).await;
426        assert!(result.is_err(), "Should fail for NotFound error");
427    }
428
429    #[tokio::test]
430    async fn test_precompile_plugins_unknown_error() {
431        let mut mock_repo = MockPluginRepositoryTrait::new();
432        mock_repo.expect_list_paginated().returning(|_| {
433            Box::pin(async { Err(RepositoryError::Unknown("unknown".to_string())) })
434        });
435
436        let pool_manager = get_pool_manager();
437        let result = precompile_plugins(&mock_repo, &pool_manager).await;
438        assert!(result.is_err(), "Should fail for Unknown error");
439    }
440
441    #[tokio::test]
442    async fn test_precompile_plugins_connection_error() {
443        let mut mock_repo = MockPluginRepositoryTrait::new();
444        mock_repo.expect_list_paginated().returning(|_| {
445            Box::pin(async { Err(RepositoryError::ConnectionError("connection".to_string())) })
446        });
447
448        let pool_manager = get_pool_manager();
449        let result = precompile_plugins(&mock_repo, &pool_manager).await;
450        assert!(result.is_err(), "Should fail for ConnectionError");
451    }
452
453    // ============================================
454    // Integration-style tests
455    // ============================================
456
457    #[tokio::test]
458    async fn test_initialize_then_check_pool_state() {
459        let mut mock_repo = MockPluginRepositoryTrait::new();
460        mock_repo
461            .expect_has_entries()
462            .returning(|| Box::pin(async { Ok(false) }));
463
464        let result = initialize_plugin_pool(&mock_repo).await;
465        assert!(result.is_ok());
466
467        // When no plugins, should return None
468        let pool_manager = result.unwrap();
469        assert!(pool_manager.is_none());
470    }
471
472    #[tokio::test]
473    async fn test_multiple_initialize_calls_no_plugins() {
474        // Verify that multiple calls don't cause issues
475        for _ in 0..3 {
476            let mut mock_repo = MockPluginRepositoryTrait::new();
477            mock_repo
478                .expect_has_entries()
479                .returning(|| Box::pin(async { Ok(false) }));
480
481            let result = initialize_plugin_pool(&mock_repo).await;
482            assert!(result.is_ok());
483            assert!(result.unwrap().is_none());
484        }
485    }
486
487    #[tokio::test]
488    async fn test_precompile_with_large_plugin_count() {
489        use crate::repositories::PaginatedResult;
490
491        let mut mock_repo = MockPluginRepositoryTrait::new();
492
493        // Simulate having many plugins (but empty list for simplicity)
494        mock_repo.expect_list_paginated().returning(|_| {
495            Box::pin(async {
496                Ok(PaginatedResult {
497                    items: vec![],
498                    total: 500, // Large total but empty items
499                    page: 1,
500                    per_page: 1000,
501                })
502            })
503        });
504
505        let pool_manager = get_pool_manager();
506
507        let result = precompile_plugins(&mock_repo, &pool_manager).await;
508        assert!(result.is_ok());
509        assert_eq!(result.unwrap(), 0); // No items to compile
510    }
511}