openzeppelin_relayer/bootstrap/
initialize_plugins.rs1use std::sync::Arc;
8use tracing::{info, warn};
9
10use crate::repositories::PluginRepositoryTrait;
11use crate::services::plugins::{get_pool_manager, PluginRunner, PluginService, PoolManager};
12
13pub async fn initialize_plugin_pool<PR: PluginRepositoryTrait>(
29 plugin_repository: &PR,
30) -> eyre::Result<Option<Arc<PoolManager>>> {
31 let has_plugins = plugin_repository
32 .has_entries()
33 .await
34 .map_err(|e| eyre::eyre!("Failed to check plugin repository: {}", e))?;
35
36 if !has_plugins {
37 info!("No plugins configured, skipping plugin pool initialization");
38 return Ok(None);
39 }
40
41 let plugin_count = plugin_repository.count().await.unwrap_or(0);
42 info!(
43 plugin_count = plugin_count,
44 "Plugins detected, initializing worker pool"
45 );
46
47 let pool_manager = get_pool_manager();
48
49 match pool_manager.ensure_started().await {
50 Ok(()) => {
51 info!("Plugin worker pool initialized successfully");
52 Ok(Some(pool_manager))
53 }
54 Err(e) => {
55 warn!(error = %e, "Failed to start plugin worker pool, falling back to ts-node execution");
56 Ok(None)
57 }
58 }
59}
60
61pub async fn precompile_plugins<PR: PluginRepositoryTrait>(
77 plugin_repository: &PR,
78 pool_manager: &PoolManager,
79) -> eyre::Result<usize> {
80 use crate::models::PaginationQuery;
81
82 let query = PaginationQuery {
83 page: 1,
84 per_page: 1000,
85 };
86
87 let plugins = plugin_repository
88 .list_paginated(query)
89 .await
90 .map_err(|e| eyre::eyre!("Failed to list plugins: {}", e))?;
91
92 let mut compiled_count = 0;
93
94 for plugin in plugins.items {
95 let plugin_path = PluginService::<PluginRunner>::resolve_plugin_path(&plugin.path);
96
97 match pool_manager
98 .precompile_plugin(plugin.id.clone(), Some(plugin_path), None)
99 .await
100 {
101 Ok(compiled_code) => {
102 if let Err(e) = pool_manager
103 .cache_compiled_code(plugin.id.clone(), compiled_code)
104 .await
105 {
106 warn!(
107 plugin_id = %plugin.id,
108 error = %e,
109 "Failed to cache compiled plugin code"
110 );
111 } else {
112 compiled_count += 1;
113 info!(plugin_id = %plugin.id, "Plugin precompiled successfully");
114 }
115 }
116 Err(e) => {
117 warn!(
118 plugin_id = %plugin.id,
119 error = %e,
120 "Failed to precompile plugin"
121 );
122 }
123 }
124 }
125
126 info!(
127 compiled_count = compiled_count,
128 total_plugins = plugins.total,
129 "Plugin precompilation complete"
130 );
131
132 Ok(compiled_count)
133}
134
135pub async fn shutdown_plugin_pool() -> eyre::Result<()> {
140 let pool_manager = get_pool_manager();
141 pool_manager
142 .shutdown()
143 .await
144 .map_err(|e| eyre::eyre!("Failed to shutdown plugin pool: {}", e))?;
145 info!("Plugin worker pool shutdown complete");
146 Ok(())
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152 use crate::models::RepositoryError;
153 use crate::repositories::MockPluginRepositoryTrait;
154
155 #[tokio::test]
160 async fn test_initialize_plugin_pool_no_plugins() {
161 let mut mock_repo = MockPluginRepositoryTrait::new();
162 mock_repo
163 .expect_has_entries()
164 .returning(|| Box::pin(async { Ok(false) }));
165
166 let result = initialize_plugin_pool(&mock_repo).await;
167 assert!(result.is_ok());
168 assert!(result.unwrap().is_none());
169 }
170
171 #[tokio::test]
172 async fn test_initialize_plugin_pool_has_entries_error() {
173 let mut mock_repo = MockPluginRepositoryTrait::new();
174 mock_repo.expect_has_entries().returning(|| {
175 Box::pin(async {
176 Err(RepositoryError::ConnectionError(
177 "Database unavailable".to_string(),
178 ))
179 })
180 });
181
182 let result = initialize_plugin_pool(&mock_repo).await;
183 assert!(result.is_err());
184
185 match result {
186 Err(e) => assert!(e.to_string().contains("Failed to check plugin repository")),
187 Ok(_) => panic!("Expected error"),
188 }
189 }
190
191 #[tokio::test]
192 async fn test_initialize_plugin_pool_has_entries_unknown_error() {
193 let mut mock_repo = MockPluginRepositoryTrait::new();
194 mock_repo.expect_has_entries().returning(|| {
195 Box::pin(async { Err(RepositoryError::Unknown("Unknown error".to_string())) })
196 });
197
198 let result = initialize_plugin_pool(&mock_repo).await;
199 assert!(result.is_err());
200 }
201
202 #[tokio::test]
203 async fn test_initialize_plugin_pool_with_plugins_count_fails() {
204 let mut mock_repo = MockPluginRepositoryTrait::new();
205
206 mock_repo
208 .expect_has_entries()
209 .returning(|| Box::pin(async { Ok(true) }));
210
211 mock_repo.expect_count().returning(|| {
213 Box::pin(async { Err(RepositoryError::Unknown("Count failed".to_string())) })
214 });
215
216 let result = initialize_plugin_pool(&mock_repo).await;
218 assert!(result.is_ok() || result.is_err());
221 }
222
223 #[tokio::test]
224 async fn test_initialize_plugin_pool_with_plugins_exists() {
225 let mut mock_repo = MockPluginRepositoryTrait::new();
226
227 mock_repo
228 .expect_has_entries()
229 .returning(|| Box::pin(async { Ok(true) }));
230
231 mock_repo
232 .expect_count()
233 .returning(|| Box::pin(async { Ok(5) }));
234
235 let result = initialize_plugin_pool(&mock_repo).await;
238 assert!(result.is_ok() || result.is_err());
240 }
241
242 #[tokio::test]
243 async fn test_initialize_plugin_pool_with_zero_count_but_has_entries() {
244 let mut mock_repo = MockPluginRepositoryTrait::new();
245
246 mock_repo
249 .expect_has_entries()
250 .returning(|| Box::pin(async { Ok(true) }));
251
252 mock_repo
253 .expect_count()
254 .returning(|| Box::pin(async { Ok(0) }));
255
256 let result = initialize_plugin_pool(&mock_repo).await;
257 assert!(result.is_ok() || result.is_err());
259 }
260
261 #[tokio::test]
266 async fn test_precompile_plugins_list_paginated_error() {
267 use crate::models::PaginationQuery;
268
269 let mut mock_repo = MockPluginRepositoryTrait::new();
270
271 mock_repo
272 .expect_list_paginated()
273 .withf(|query: &PaginationQuery| query.page == 1 && query.per_page == 1000)
274 .returning(|_| {
275 Box::pin(async {
276 Err(RepositoryError::ConnectionError(
277 "Database unavailable".to_string(),
278 ))
279 })
280 });
281
282 let pool_manager = get_pool_manager();
285
286 let result = precompile_plugins(&mock_repo, &pool_manager).await;
287 assert!(result.is_err());
288
289 match result {
290 Err(e) => assert!(e.to_string().contains("Failed to list plugins")),
291 Ok(_) => panic!("Expected error"),
292 }
293 }
294
295 #[tokio::test]
296 async fn test_precompile_plugins_empty_list() {
297 use crate::repositories::PaginatedResult;
298
299 let mut mock_repo = MockPluginRepositoryTrait::new();
300
301 mock_repo.expect_list_paginated().returning(|_| {
302 Box::pin(async {
303 Ok(PaginatedResult {
304 items: vec![],
305 total: 0,
306 page: 1,
307 per_page: 1000,
308 })
309 })
310 });
311
312 let pool_manager = get_pool_manager();
313
314 let result = precompile_plugins(&mock_repo, &pool_manager).await;
315 assert!(result.is_ok());
316 assert_eq!(result.unwrap(), 0);
317 }
318
319 #[tokio::test]
320 async fn test_precompile_plugins_pagination_query_params() {
321 use crate::models::PaginationQuery;
322 use crate::repositories::PaginatedResult;
323 use std::sync::atomic::{AtomicBool, Ordering};
324 use std::sync::Arc;
325
326 let was_called = Arc::new(AtomicBool::new(false));
327 let was_called_clone = was_called.clone();
328
329 let mut mock_repo = MockPluginRepositoryTrait::new();
330
331 mock_repo
332 .expect_list_paginated()
333 .withf(move |query: &PaginationQuery| {
334 let correct = query.page == 1 && query.per_page == 1000;
336 was_called_clone.store(true, Ordering::SeqCst);
337 correct
338 })
339 .returning(|_| {
340 Box::pin(async {
341 Ok(PaginatedResult {
342 items: vec![],
343 total: 0,
344 page: 1,
345 per_page: 1000,
346 })
347 })
348 });
349
350 let pool_manager = get_pool_manager();
351
352 let _ = precompile_plugins(&mock_repo, &pool_manager).await;
353
354 assert!(
355 was_called.load(Ordering::SeqCst),
356 "list_paginated should have been called"
357 );
358 }
359
360 #[test]
365 fn test_resolve_plugin_path_absolute() {
366 let path = "/absolute/path/to/plugin.ts";
369 let resolved = PluginService::<PluginRunner>::resolve_plugin_path(path);
370 assert!(resolved.contains("plugin.ts"));
371 }
372
373 #[test]
374 fn test_resolve_plugin_path_relative() {
375 let path = "relative/path/plugin.ts";
376 let resolved = PluginService::<PluginRunner>::resolve_plugin_path(path);
377 assert!(resolved.contains("plugin.ts"));
378 }
379
380 #[tokio::test]
385 async fn test_initialize_plugin_pool_not_found_error() {
386 let mut mock_repo = MockPluginRepositoryTrait::new();
387 mock_repo.expect_has_entries().returning(|| {
388 Box::pin(async { Err(RepositoryError::NotFound("not found".to_string())) })
389 });
390
391 let result = initialize_plugin_pool(&mock_repo).await;
392 assert!(result.is_err(), "Should fail for NotFound error");
393 }
394
395 #[tokio::test]
396 async fn test_initialize_plugin_pool_lock_error() {
397 let mut mock_repo = MockPluginRepositoryTrait::new();
398 mock_repo.expect_has_entries().returning(|| {
399 Box::pin(async { Err(RepositoryError::LockError("lock error".to_string())) })
400 });
401
402 let result = initialize_plugin_pool(&mock_repo).await;
403 assert!(result.is_err(), "Should fail for LockError");
404 }
405
406 #[tokio::test]
407 async fn test_initialize_plugin_pool_invalid_data_error() {
408 let mut mock_repo = MockPluginRepositoryTrait::new();
409 mock_repo.expect_has_entries().returning(|| {
410 Box::pin(async { Err(RepositoryError::InvalidData("invalid".to_string())) })
411 });
412
413 let result = initialize_plugin_pool(&mock_repo).await;
414 assert!(result.is_err(), "Should fail for InvalidData error");
415 }
416
417 #[tokio::test]
418 async fn test_precompile_plugins_not_found_error() {
419 let mut mock_repo = MockPluginRepositoryTrait::new();
420 mock_repo.expect_list_paginated().returning(|_| {
421 Box::pin(async { Err(RepositoryError::NotFound("not found".to_string())) })
422 });
423
424 let pool_manager = get_pool_manager();
425 let result = precompile_plugins(&mock_repo, &pool_manager).await;
426 assert!(result.is_err(), "Should fail for NotFound error");
427 }
428
429 #[tokio::test]
430 async fn test_precompile_plugins_unknown_error() {
431 let mut mock_repo = MockPluginRepositoryTrait::new();
432 mock_repo.expect_list_paginated().returning(|_| {
433 Box::pin(async { Err(RepositoryError::Unknown("unknown".to_string())) })
434 });
435
436 let pool_manager = get_pool_manager();
437 let result = precompile_plugins(&mock_repo, &pool_manager).await;
438 assert!(result.is_err(), "Should fail for Unknown error");
439 }
440
441 #[tokio::test]
442 async fn test_precompile_plugins_connection_error() {
443 let mut mock_repo = MockPluginRepositoryTrait::new();
444 mock_repo.expect_list_paginated().returning(|_| {
445 Box::pin(async { Err(RepositoryError::ConnectionError("connection".to_string())) })
446 });
447
448 let pool_manager = get_pool_manager();
449 let result = precompile_plugins(&mock_repo, &pool_manager).await;
450 assert!(result.is_err(), "Should fail for ConnectionError");
451 }
452
453 #[tokio::test]
458 async fn test_initialize_then_check_pool_state() {
459 let mut mock_repo = MockPluginRepositoryTrait::new();
460 mock_repo
461 .expect_has_entries()
462 .returning(|| Box::pin(async { Ok(false) }));
463
464 let result = initialize_plugin_pool(&mock_repo).await;
465 assert!(result.is_ok());
466
467 let pool_manager = result.unwrap();
469 assert!(pool_manager.is_none());
470 }
471
472 #[tokio::test]
473 async fn test_multiple_initialize_calls_no_plugins() {
474 for _ in 0..3 {
476 let mut mock_repo = MockPluginRepositoryTrait::new();
477 mock_repo
478 .expect_has_entries()
479 .returning(|| Box::pin(async { Ok(false) }));
480
481 let result = initialize_plugin_pool(&mock_repo).await;
482 assert!(result.is_ok());
483 assert!(result.unwrap().is_none());
484 }
485 }
486
487 #[tokio::test]
488 async fn test_precompile_with_large_plugin_count() {
489 use crate::repositories::PaginatedResult;
490
491 let mut mock_repo = MockPluginRepositoryTrait::new();
492
493 mock_repo.expect_list_paginated().returning(|_| {
495 Box::pin(async {
496 Ok(PaginatedResult {
497 items: vec![],
498 total: 500, page: 1,
500 per_page: 1000,
501 })
502 })
503 });
504
505 let pool_manager = get_pool_manager();
506
507 let result = precompile_plugins(&mock_repo, &pool_manager).await;
508 assert!(result.is_ok());
509 assert_eq!(result.unwrap(), 0); }
511}