1use std::collections::HashMap;
10use std::process::Stdio;
11use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::io::{AsyncBufReadExt, BufReader};
15use tokio::process::{Child, Command};
16use tokio::sync::oneshot;
17use uuid::Uuid;
18
19use super::config::get_config;
20use super::connection::{ConnectionPool, PoolConnection};
21use super::health::{
22 CircuitBreaker, CircuitState, DeadServerIndicator, HealthStatus, ProcessStatus,
23};
24use super::protocol::{PoolError, PoolRequest, PoolResponse};
25use super::shared_socket::get_shared_socket_service;
26use super::{LogEntry, PluginError, PluginHandlerPayload, ScriptResult};
27
28struct QueuedRequest {
30 plugin_id: String,
31 compiled_code: Option<String>,
32 plugin_path: Option<String>,
33 params: serde_json::Value,
34 headers: Option<HashMap<String, Vec<String>>>,
35 socket_path: String,
36 http_request_id: Option<String>,
37 timeout_secs: Option<u64>,
38 route: Option<String>,
39 config: Option<serde_json::Value>,
40 method: Option<String>,
41 query: Option<serde_json::Value>,
42 response_tx: oneshot::Sender<Result<ScriptResult, PluginError>>,
43}
44
45#[derive(Debug, Default, PartialEq)]
50pub struct ParsedHealthResult {
51 pub status: String,
52 pub uptime_ms: Option<u64>,
53 pub memory: Option<u64>,
54 pub pool_completed: Option<u64>,
55 pub pool_queued: Option<u64>,
56 pub success_rate: Option<f64>,
57}
58
59pub struct PoolManager {
61 socket_path: String,
62 process: tokio::sync::Mutex<Option<Child>>,
63 initialized: Arc<AtomicBool>,
64 restart_lock: tokio::sync::Mutex<()>,
66 connection_pool: Arc<ConnectionPool>,
68 request_tx: async_channel::Sender<QueuedRequest>,
70 max_queue_size: usize,
72 health_check_needed: Arc<AtomicBool>,
74 consecutive_failures: Arc<AtomicU32>,
76 circuit_breaker: Arc<CircuitBreaker>,
78 last_restart_time_ms: Arc<AtomicU64>,
80 recovery_mode: Arc<AtomicBool>,
82 recovery_allowance: Arc<AtomicU32>,
84 shutdown_signal: Arc<tokio::sync::Notify>,
86}
87
88impl Default for PoolManager {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94impl PoolManager {
95 const BASE_HEAP_MB: usize = 512;
98
99 const CONCURRENCY_DIVISOR: usize = 10;
102
103 const HEAP_INCREMENT_PER_DIVISOR_MB: usize = 32;
107
108 const MAX_HEAP_MB: usize = 8192;
112
113 pub fn calculate_heap_size(max_concurrency: usize) -> usize {
120 let calculated = Self::BASE_HEAP_MB
121 + ((max_concurrency / Self::CONCURRENCY_DIVISOR) * Self::HEAP_INCREMENT_PER_DIVISOR_MB);
122 calculated.min(Self::MAX_HEAP_MB)
123 }
124
125 pub fn format_return_value(value: Option<serde_json::Value>) -> String {
130 value
131 .map(|v| {
132 if v.is_string() {
133 v.as_str().unwrap_or("").to_string()
134 } else {
135 serde_json::to_string(&v).unwrap_or_default()
136 }
137 })
138 .unwrap_or_default()
139 }
140
141 pub fn parse_success_response(response: PoolResponse) -> ScriptResult {
145 let logs: Vec<LogEntry> = response
146 .logs
147 .map(|logs| logs.into_iter().map(|l| l.into()).collect())
148 .unwrap_or_default();
149
150 ScriptResult {
151 logs,
152 error: String::new(),
153 return_value: Self::format_return_value(response.result),
154 trace: Vec::new(),
155 }
156 }
157
158 pub fn parse_error_response(response: PoolResponse) -> PluginError {
162 let logs: Vec<LogEntry> = response
163 .logs
164 .map(|logs| logs.into_iter().map(|l| l.into()).collect())
165 .unwrap_or_default();
166
167 let error = response.error.unwrap_or(PoolError {
168 message: "Unknown error".to_string(),
169 code: None,
170 status: None,
171 details: None,
172 });
173
174 PluginError::HandlerError(Box::new(PluginHandlerPayload {
175 message: error.message,
176 status: error.status.unwrap_or(500),
177 code: error.code,
178 details: error.details,
179 logs: Some(logs),
180 traces: None,
181 }))
182 }
183
184 pub fn parse_pool_response(response: PoolResponse) -> Result<ScriptResult, PluginError> {
189 if response.success {
190 Ok(Self::parse_success_response(response))
191 } else {
192 Err(Self::parse_error_response(response))
193 }
194 }
195
196 pub fn parse_health_result(result: &serde_json::Value) -> ParsedHealthResult {
201 ParsedHealthResult {
202 status: result
203 .get("status")
204 .and_then(|v| v.as_str())
205 .unwrap_or("unknown")
206 .to_string(),
207 uptime_ms: result.get("uptime").and_then(|v| v.as_u64()),
208 memory: result
209 .get("memory")
210 .and_then(|v| v.get("heapUsed"))
211 .and_then(|v| v.as_u64()),
212 pool_completed: result
213 .get("pool")
214 .and_then(|v| v.get("completed"))
215 .and_then(|v| v.as_u64()),
216 pool_queued: result
217 .get("pool")
218 .and_then(|v| v.get("queued"))
219 .and_then(|v| v.as_u64()),
220 success_rate: result
221 .get("execution")
222 .and_then(|v| v.get("successRate"))
223 .and_then(|v| v.as_f64()),
224 }
225 }
226
227 pub fn new() -> Self {
229 Self::init(format!("/tmp/relayer-plugin-pool-{}.sock", Uuid::new_v4()))
230 }
231
232 pub fn with_socket_path(socket_path: String) -> Self {
234 Self::init(socket_path)
235 }
236
237 fn init(socket_path: String) -> Self {
239 let config = get_config();
240 let max_connections = config.pool_max_connections;
241 let max_queue_size = config.pool_max_queue_size;
242
243 let (tx, rx) = async_channel::bounded(max_queue_size);
244
245 let connection_pool = Arc::new(ConnectionPool::new(socket_path.clone(), max_connections));
246 let connection_pool_clone = connection_pool.clone();
247
248 let shutdown_signal = Arc::new(tokio::sync::Notify::new());
249
250 Self::spawn_queue_workers(
251 rx,
252 connection_pool_clone,
253 config.pool_workers,
254 shutdown_signal.clone(),
255 );
256
257 let health_check_needed = Arc::new(AtomicBool::new(false));
258 let consecutive_failures = Arc::new(AtomicU32::new(0));
259 let circuit_breaker = Arc::new(CircuitBreaker::new());
260 let last_restart_time_ms = Arc::new(AtomicU64::new(0));
261 let recovery_mode = Arc::new(AtomicBool::new(false));
262 let recovery_allowance = Arc::new(AtomicU32::new(0));
263
264 Self::spawn_health_check_task(
265 health_check_needed.clone(),
266 config.health_check_interval_secs,
267 shutdown_signal.clone(),
268 );
269
270 Self::spawn_recovery_task(
271 recovery_mode.clone(),
272 recovery_allowance.clone(),
273 shutdown_signal.clone(),
274 );
275
276 Self {
277 connection_pool,
278 socket_path,
279 process: tokio::sync::Mutex::new(None),
280 initialized: Arc::new(AtomicBool::new(false)),
281 restart_lock: tokio::sync::Mutex::new(()),
282 request_tx: tx,
283 max_queue_size,
284 health_check_needed,
285 consecutive_failures,
286 circuit_breaker,
287 last_restart_time_ms,
288 recovery_mode,
289 recovery_allowance,
290 shutdown_signal,
291 }
292 }
293
294 fn spawn_recovery_task(
296 recovery_mode: Arc<AtomicBool>,
297 recovery_allowance: Arc<AtomicU32>,
298 shutdown_signal: Arc<tokio::sync::Notify>,
299 ) {
300 tokio::spawn(async move {
301 let mut interval = tokio::time::interval(Duration::from_millis(500));
302 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
303
304 loop {
305 tokio::select! {
306 biased;
307
308 _ = shutdown_signal.notified() => {
309 tracing::debug!("Recovery task received shutdown signal");
310 break;
311 }
312
313 _ = interval.tick() => {
314 if recovery_mode.load(Ordering::Relaxed) {
315 let current = recovery_allowance.load(Ordering::Relaxed);
316 if current < 100 {
317 let new_allowance = (current + 10).min(100);
318 recovery_allowance.store(new_allowance, Ordering::Relaxed);
319 tracing::debug!(
320 allowance = new_allowance,
321 "Recovery mode: increasing request allowance"
322 );
323 } else {
324 recovery_mode.store(false, Ordering::Relaxed);
325 tracing::info!("Recovery mode complete - full capacity restored");
326 }
327 }
328 }
329 }
330 }
331 });
332 }
333
334 fn spawn_health_check_task(
336 health_check_needed: Arc<AtomicBool>,
337 interval_secs: u64,
338 shutdown_signal: Arc<tokio::sync::Notify>,
339 ) {
340 tokio::spawn(async move {
341 let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
342 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
343
344 loop {
345 tokio::select! {
346 biased;
347
348 _ = shutdown_signal.notified() => {
349 tracing::debug!("Health check task received shutdown signal");
350 break;
351 }
352
353 _ = interval.tick() => {
354 health_check_needed.store(true, Ordering::Relaxed);
355 }
356 }
357 }
358 });
359 }
360
361 fn spawn_queue_workers(
363 rx: async_channel::Receiver<QueuedRequest>,
364 connection_pool: Arc<ConnectionPool>,
365 configured_workers: usize,
366 shutdown_signal: Arc<tokio::sync::Notify>,
367 ) {
368 let num_workers = if configured_workers > 0 {
369 configured_workers
370 } else {
371 std::thread::available_parallelism()
372 .map(|n| n.get().clamp(4, 32))
373 .unwrap_or(8)
374 };
375
376 tracing::info!(num_workers = num_workers, "Starting request queue workers");
377
378 for worker_id in 0..num_workers {
379 let rx_clone = rx.clone();
380 let pool_clone = connection_pool.clone();
381 let shutdown = shutdown_signal.clone();
382
383 tokio::spawn(async move {
384 loop {
385 tokio::select! {
386 biased;
387
388 _ = shutdown.notified() => {
389 tracing::debug!(worker_id = worker_id, "Request queue worker received shutdown signal");
390 break;
391 }
392
393 request_result = rx_clone.recv() => {
394 let request = match request_result {
395 Ok(r) => r,
396 Err(_) => break,
397 };
398
399 let start = std::time::Instant::now();
400 let plugin_id = request.plugin_id.clone();
401
402 let result = Self::execute_plugin_internal(
403 &pool_clone,
404 request.plugin_id,
405 request.compiled_code,
406 request.plugin_path,
407 request.params,
408 request.headers,
409 request.socket_path,
410 request.http_request_id,
411 request.timeout_secs,
412 request.route,
413 request.config,
414 request.method,
415 request.query,
416 )
417 .await;
418
419 let elapsed = start.elapsed();
420 if let Err(ref e) = result {
421 let error_str = format!("{e:?}");
422 if error_str.contains("shutdown") || error_str.contains("Shutdown") {
423 tracing::debug!(
424 worker_id = worker_id,
425 plugin_id = %plugin_id,
426 "Plugin execution cancelled during shutdown"
427 );
428 } else {
429 tracing::warn!(
430 worker_id = worker_id,
431 plugin_id = %plugin_id,
432 elapsed_ms = elapsed.as_millis() as u64,
433 error = ?e,
434 "Plugin execution failed"
435 );
436 }
437 } else if elapsed.as_secs() > 1 {
438 tracing::debug!(
439 worker_id = worker_id,
440 plugin_id = %plugin_id,
441 elapsed_ms = elapsed.as_millis() as u64,
442 "Slow plugin execution"
443 );
444 }
445
446 let _ = request.response_tx.send(result);
447 }
448 }
449 }
450
451 tracing::debug!(worker_id = worker_id, "Request queue worker exited");
452 });
453 }
454 }
455
456 fn spawn_rate_limited_stderr_reader(stderr: tokio::process::ChildStderr) {
458 tokio::spawn(async move {
459 let reader = BufReader::new(stderr);
460 let mut lines = reader.lines();
461
462 let mut last_log_time = std::time::Instant::now();
463 let mut suppressed_count = 0u64;
464 let min_interval = Duration::from_millis(100);
465
466 while let Ok(Some(line)) = lines.next_line().await {
467 let now = std::time::Instant::now();
468 let elapsed = now.duration_since(last_log_time);
469
470 if elapsed >= min_interval {
471 if suppressed_count > 0 {
472 tracing::warn!(
473 target: "pool_server",
474 suppressed = suppressed_count,
475 "... ({} lines suppressed due to rate limiting)",
476 suppressed_count
477 );
478 suppressed_count = 0;
479 }
480 tracing::error!(target: "pool_server", "{}", line);
481 last_log_time = now;
482 } else {
483 suppressed_count += 1;
484 if suppressed_count % 100 == 0 {
485 tracing::warn!(
486 target: "pool_server",
487 suppressed = suppressed_count,
488 "Pool server producing excessive stderr output"
489 );
490 }
491 }
492 }
493
494 if suppressed_count > 0 {
495 tracing::warn!(
496 target: "pool_server",
497 suppressed = suppressed_count,
498 "Pool server stderr closed ({} final lines suppressed)",
499 suppressed_count
500 );
501 }
502 });
503 }
504
505 #[allow(clippy::too_many_arguments)]
507 async fn execute_with_permit(
508 connection_pool: &Arc<ConnectionPool>,
509 permit: Option<tokio::sync::OwnedSemaphorePermit>,
510 plugin_id: String,
511 compiled_code: Option<String>,
512 plugin_path: Option<String>,
513 params: serde_json::Value,
514 headers: Option<HashMap<String, Vec<String>>>,
515 socket_path: String,
516 http_request_id: Option<String>,
517 timeout_secs: Option<u64>,
518 route: Option<String>,
519 config: Option<serde_json::Value>,
520 method: Option<String>,
521 query: Option<serde_json::Value>,
522 ) -> Result<ScriptResult, PluginError> {
523 let mut conn = connection_pool.acquire_with_permit(permit).await?;
524
525 let request = PoolRequest::Execute(Box::new(super::protocol::ExecuteRequest {
526 task_id: Uuid::new_v4().to_string(),
527 plugin_id: plugin_id.clone(),
528 compiled_code,
529 plugin_path,
530 params,
531 headers,
532 socket_path,
533 http_request_id,
534 timeout: timeout_secs.map(|s| s * 1000),
535 route,
536 config,
537 method,
538 query,
539 }));
540
541 let timeout = timeout_secs.unwrap_or(get_config().pool_request_timeout_secs);
542 let response = conn.send_request_with_timeout(&request, timeout).await?;
543
544 Self::parse_pool_response(response)
546 }
547
548 #[allow(clippy::too_many_arguments)]
550 async fn execute_plugin_internal(
551 connection_pool: &Arc<ConnectionPool>,
552 plugin_id: String,
553 compiled_code: Option<String>,
554 plugin_path: Option<String>,
555 params: serde_json::Value,
556 headers: Option<HashMap<String, Vec<String>>>,
557 socket_path: String,
558 http_request_id: Option<String>,
559 timeout_secs: Option<u64>,
560 route: Option<String>,
561 config: Option<serde_json::Value>,
562 method: Option<String>,
563 query: Option<serde_json::Value>,
564 ) -> Result<ScriptResult, PluginError> {
565 Self::execute_with_permit(
566 connection_pool,
567 None,
568 plugin_id,
569 compiled_code,
570 plugin_path,
571 params,
572 headers,
573 socket_path,
574 http_request_id,
575 timeout_secs,
576 route,
577 config,
578 method,
579 query,
580 )
581 .await
582 }
583
584 pub async fn is_initialized(&self) -> bool {
589 self.initialized.load(Ordering::Acquire)
590 }
591
592 pub async fn ensure_started(&self) -> Result<(), PluginError> {
594 if self.initialized.load(Ordering::Acquire) {
595 return Ok(());
596 }
597
598 let _startup_guard = self.restart_lock.lock().await;
599
600 if self.initialized.load(Ordering::Acquire) {
601 return Ok(());
602 }
603
604 self.start_pool_server().await?;
605 self.initialized.store(true, Ordering::Release);
606 Ok(())
607 }
608
609 async fn ensure_started_and_healthy(&self) -> Result<(), PluginError> {
611 self.ensure_started().await?;
612
613 if !self.health_check_needed.load(Ordering::Relaxed) {
614 return Ok(());
615 }
616
617 if self
618 .health_check_needed
619 .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
620 .is_err()
621 {
622 return Ok(());
623 }
624
625 self.check_and_restart_if_needed().await
626 }
627
628 async fn check_and_restart_if_needed(&self) -> Result<(), PluginError> {
630 let process_status = {
632 let mut process_guard = self.process.lock().await;
633 if let Some(child) = process_guard.as_mut() {
634 match child.try_wait() {
635 Ok(Some(exit_status)) => {
636 tracing::warn!(
637 exit_status = ?exit_status,
638 "Pool server process has exited"
639 );
640 *process_guard = None;
641 ProcessStatus::Exited
642 }
643 Ok(None) => ProcessStatus::Running,
644 Err(e) => {
645 tracing::warn!(
646 error = %e,
647 "Failed to check pool server process status, assuming dead"
648 );
649 *process_guard = None;
650 ProcessStatus::Unknown
651 }
652 }
653 } else {
654 ProcessStatus::NoProcess
655 }
656 };
657
658 let needs_restart = match process_status {
660 ProcessStatus::Running => {
661 let socket_exists = std::path::Path::new(&self.socket_path).exists();
662 if !socket_exists {
663 tracing::warn!(
664 socket_path = %self.socket_path,
665 "Pool server socket file missing, needs restart"
666 );
667 true
668 } else {
669 false
670 }
671 }
672 ProcessStatus::Exited | ProcessStatus::Unknown | ProcessStatus::NoProcess => {
673 tracing::warn!("Pool server not running, needs restart");
674 true
675 }
676 };
677
678 if needs_restart {
680 let _restart_guard = self.restart_lock.lock().await;
681 self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
682 self.restart_internal().await?;
683 self.consecutive_failures.store(0, Ordering::Relaxed);
684 }
685
686 Ok(())
687 }
688
689 async fn cleanup_socket_file(socket_path: &str) {
691 let max_cleanup_attempts = 5;
692 let mut attempts = 0;
693
694 while attempts < max_cleanup_attempts {
695 match std::fs::remove_file(socket_path) {
696 Ok(_) => break,
697 Err(e) if e.kind() == std::io::ErrorKind::NotFound => break,
698 Err(e) => {
699 attempts += 1;
700 if attempts >= max_cleanup_attempts {
701 tracing::warn!(
702 socket_path = %socket_path,
703 error = %e,
704 "Failed to remove socket file after {} attempts, proceeding anyway",
705 max_cleanup_attempts
706 );
707 break;
708 }
709 let delay_ms = 10 * (1 << attempts.min(3));
710 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
711 }
712 }
713 }
714
715 tokio::time::sleep(Duration::from_millis(50)).await;
716 }
717
718 async fn spawn_pool_server_process(
720 socket_path: &str,
721 context: &str,
722 ) -> Result<Child, PluginError> {
723 let pool_server_path = std::env::current_dir()
724 .map(|cwd| cwd.join("plugins/lib/pool-server.ts").display().to_string())
725 .unwrap_or_else(|_| "plugins/lib/pool-server.ts".to_string());
726
727 let config = get_config();
728
729 let pool_server_heap_mb = Self::calculate_heap_size(config.max_concurrency);
731
732 let uncapped_heap = Self::BASE_HEAP_MB
734 + ((config.max_concurrency / Self::CONCURRENCY_DIVISOR)
735 * Self::HEAP_INCREMENT_PER_DIVISOR_MB);
736 if uncapped_heap > Self::MAX_HEAP_MB {
737 tracing::warn!(
738 calculated_heap_mb = uncapped_heap,
739 capped_heap_mb = pool_server_heap_mb,
740 max_concurrency = config.max_concurrency,
741 "Pool server heap calculation exceeded 8GB cap"
742 );
743 }
744
745 tracing::info!(
746 socket_path = %socket_path,
747 heap_mb = pool_server_heap_mb,
748 max_concurrency = config.max_concurrency,
749 context = context,
750 "Spawning plugin pool server"
751 );
752
753 let node_options = format!("--max-old-space-size={pool_server_heap_mb} --expose-gc");
754
755 let mut child = Command::new("ts-node")
756 .arg("--transpile-only")
757 .arg(&pool_server_path)
758 .arg(socket_path)
759 .env("NODE_OPTIONS", node_options)
760 .env("PLUGIN_MAX_CONCURRENCY", config.max_concurrency.to_string())
761 .env(
762 "PLUGIN_POOL_MIN_THREADS",
763 config.nodejs_pool_min_threads.to_string(),
764 )
765 .env(
766 "PLUGIN_POOL_MAX_THREADS",
767 config.nodejs_pool_max_threads.to_string(),
768 )
769 .env(
770 "PLUGIN_POOL_CONCURRENT_TASKS",
771 config.nodejs_pool_concurrent_tasks.to_string(),
772 )
773 .env(
774 "PLUGIN_POOL_IDLE_TIMEOUT",
775 config.nodejs_pool_idle_timeout_ms.to_string(),
776 )
777 .env(
778 "PLUGIN_WORKER_HEAP_MB",
779 config.nodejs_worker_heap_mb.to_string(),
780 )
781 .env(
782 "PLUGIN_POOL_SOCKET_BACKLOG",
783 config.pool_socket_backlog.to_string(),
784 )
785 .stdin(Stdio::null())
786 .stdout(Stdio::piped())
787 .stderr(Stdio::piped())
788 .spawn()
789 .map_err(|e| {
790 PluginError::PluginExecutionError(format!("Failed to {context} pool server: {e}"))
791 })?;
792
793 if let Some(stderr) = child.stderr.take() {
794 Self::spawn_rate_limited_stderr_reader(stderr);
795 }
796
797 if let Some(stdout) = child.stdout.take() {
798 let reader = BufReader::new(stdout);
799 let mut lines = reader.lines();
800
801 let timeout_result = tokio::time::timeout(Duration::from_secs(10), async {
802 while let Ok(Some(line)) = lines.next_line().await {
803 if line.contains("POOL_SERVER_READY") {
804 return Ok(());
805 }
806 }
807 Err(PluginError::PluginExecutionError(
808 "Pool server did not send ready signal".to_string(),
809 ))
810 })
811 .await;
812
813 match timeout_result {
814 Ok(Ok(())) => {
815 tracing::info!(context = context, "Plugin pool server ready");
816 }
817 Ok(Err(e)) => return Err(e),
818 Err(_) => {
819 return Err(PluginError::PluginExecutionError(format!(
820 "Timeout waiting for pool server to {context}"
821 )))
822 }
823 }
824 }
825
826 Ok(child)
827 }
828
829 async fn start_pool_server(&self) -> Result<(), PluginError> {
830 let mut process_guard = self.process.lock().await;
831
832 if process_guard.is_some() {
833 return Ok(());
834 }
835
836 Self::cleanup_socket_file(&self.socket_path).await;
837
838 let child = Self::spawn_pool_server_process(&self.socket_path, "start").await?;
839
840 *process_guard = Some(child);
841 Ok(())
842 }
843
844 #[allow(clippy::too_many_arguments)]
846 pub async fn execute_plugin(
847 &self,
848 plugin_id: String,
849 compiled_code: Option<String>,
850 plugin_path: Option<String>,
851 params: serde_json::Value,
852 headers: Option<HashMap<String, Vec<String>>>,
853 socket_path: String,
854 http_request_id: Option<String>,
855 timeout_secs: Option<u64>,
856 route: Option<String>,
857 config: Option<serde_json::Value>,
858 method: Option<String>,
859 query: Option<serde_json::Value>,
860 ) -> Result<ScriptResult, PluginError> {
861 let rid = http_request_id.as_deref().unwrap_or("unknown");
862 let effective_timeout =
863 timeout_secs.unwrap_or_else(|| get_config().pool_request_timeout_secs);
864 tracing::debug!(
865 plugin_id = %plugin_id,
866 http_request_id = %rid,
867 timeout_secs = effective_timeout,
868 "Pool execute request received"
869 );
870 let recovery_allowance = if self.recovery_mode.load(Ordering::Relaxed) {
871 Some(self.recovery_allowance.load(Ordering::Relaxed))
872 } else {
873 None
874 };
875
876 if !self
877 .circuit_breaker
878 .should_allow_request(recovery_allowance)
879 {
880 let state = self.circuit_breaker.state();
881 tracing::warn!(
882 plugin_id = %plugin_id,
883 circuit_state = ?state,
884 recovery_allowance = ?recovery_allowance,
885 "Request rejected by circuit breaker"
886 );
887 return Err(PluginError::PluginExecutionError(
888 "Plugin system temporarily unavailable due to high load. Please retry shortly."
889 .to_string(),
890 ));
891 }
892
893 let start_time = Instant::now();
894
895 self.ensure_started_and_healthy().await?;
896 tracing::debug!(
897 plugin_id = %plugin_id,
898 http_request_id = %rid,
899 "Pool execute start (healthy/started)"
900 );
901
902 let circuit_breaker = self.circuit_breaker.clone();
903 match self.connection_pool.semaphore.clone().try_acquire_owned() {
904 Ok(permit) => {
905 tracing::debug!(
906 plugin_id = %plugin_id,
907 http_request_id = %rid,
908 "Pool execute acquired connection permit (fast path)"
909 );
910 let result = Self::execute_with_permit(
911 &self.connection_pool,
912 Some(permit),
913 plugin_id,
914 compiled_code,
915 plugin_path,
916 params,
917 headers,
918 socket_path,
919 http_request_id,
920 timeout_secs,
921 route,
922 config,
923 method,
924 query,
925 )
926 .await;
927
928 let elapsed_ms = start_time.elapsed().as_millis() as u32;
929 match &result {
930 Ok(_) => circuit_breaker.record_success(elapsed_ms),
931 Err(e) => {
932 if Self::is_dead_server_error(e) {
935 circuit_breaker.record_failure();
936 tracing::warn!(
937 error = %e,
938 "Detected dead pool server error, triggering health check for restart"
939 );
940 self.health_check_needed.store(true, Ordering::Relaxed);
941 } else {
942 circuit_breaker.record_success(elapsed_ms);
944 }
945 }
946 }
947
948 tracing::debug!(
949 elapsed_ms = elapsed_ms,
950 result_ok = result.is_ok(),
951 "Pool execute finished (fast path)"
952 );
953 result
954 }
955 Err(_) => {
956 tracing::debug!(
957 plugin_id = %plugin_id,
958 http_request_id = %rid,
959 "Pool execute queueing (no permits)"
960 );
961 let (response_tx, response_rx) = oneshot::channel();
962
963 let queued_request = QueuedRequest {
964 plugin_id,
965 compiled_code,
966 plugin_path,
967 params,
968 headers,
969 socket_path,
970 http_request_id,
971 timeout_secs,
972 route,
973 config,
974 method,
975 query,
976 response_tx,
977 };
978
979 let result = match self.request_tx.try_send(queued_request) {
980 Ok(()) => {
981 let queue_len = self.request_tx.len();
982 if queue_len > self.max_queue_size / 2 {
983 tracing::warn!(
984 queue_len = queue_len,
985 max_queue_size = self.max_queue_size,
986 "Plugin queue is over 50% capacity"
987 );
988 }
989 let response_timeout = timeout_secs
991 .map(Duration::from_secs)
992 .unwrap_or(Duration::from_secs(get_config().pool_request_timeout_secs))
993 + Duration::from_secs(5); match tokio::time::timeout(response_timeout, response_rx).await {
996 Ok(Ok(result)) => result,
997 Ok(Err(_)) => Err(PluginError::PluginExecutionError(
998 "Request queue processor closed".to_string(),
999 )),
1000 Err(_) => Err(PluginError::PluginExecutionError(format!(
1001 "Request timed out after {}s waiting for worker response",
1002 response_timeout.as_secs()
1003 ))),
1004 }
1005 }
1006 Err(async_channel::TrySendError::Full(req)) => {
1007 let queue_timeout_ms = get_config().pool_queue_send_timeout_ms;
1008 let queue_timeout = Duration::from_millis(queue_timeout_ms);
1009 match tokio::time::timeout(queue_timeout, self.request_tx.send(req)).await {
1010 Ok(Ok(())) => {
1011 let queue_len = self.request_tx.len();
1012 tracing::debug!(
1013 queue_len = queue_len,
1014 "Request queued after waiting for queue space"
1015 );
1016 let response_timeout =
1018 timeout_secs.map(Duration::from_secs).unwrap_or(
1019 Duration::from_secs(get_config().pool_request_timeout_secs),
1020 ) + Duration::from_secs(5); match tokio::time::timeout(response_timeout, response_rx).await {
1023 Ok(Ok(result)) => result,
1024 Ok(Err(_)) => Err(PluginError::PluginExecutionError(
1025 "Request queue processor closed".to_string(),
1026 )),
1027 Err(_) => Err(PluginError::PluginExecutionError(format!(
1028 "Request timed out after {}s waiting for worker response",
1029 response_timeout.as_secs()
1030 ))),
1031 }
1032 }
1033 Ok(Err(async_channel::SendError(_))) => {
1034 Err(PluginError::PluginExecutionError(
1035 "Plugin execution queue is closed".to_string(),
1036 ))
1037 }
1038 Err(_) => {
1039 let queue_len = self.request_tx.len();
1040 tracing::error!(
1041 queue_len = queue_len,
1042 max_queue_size = self.max_queue_size,
1043 timeout_ms = queue_timeout.as_millis(),
1044 "Plugin execution queue is FULL - timeout waiting for space"
1045 );
1046 Err(PluginError::PluginExecutionError(format!(
1047 "Plugin execution queue is full (max: {}) and timeout waiting for space. \
1048 Consider increasing PLUGIN_POOL_MAX_QUEUE_SIZE or PLUGIN_POOL_MAX_CONNECTIONS.",
1049 self.max_queue_size
1050 )))
1051 }
1052 }
1053 }
1054 Err(async_channel::TrySendError::Closed(_)) => {
1055 Err(PluginError::PluginExecutionError(
1056 "Plugin execution queue is closed".to_string(),
1057 ))
1058 }
1059 };
1060
1061 let elapsed_ms = start_time.elapsed().as_millis() as u32;
1062 match &result {
1063 Ok(_) => circuit_breaker.record_success(elapsed_ms),
1064 Err(e) => {
1065 if Self::is_dead_server_error(e) {
1067 circuit_breaker.record_failure();
1068 tracing::warn!(
1069 error = %e,
1070 "Detected dead pool server error (queued path), triggering health check for restart"
1071 );
1072 self.health_check_needed.store(true, Ordering::Relaxed);
1073 } else {
1074 circuit_breaker.record_success(elapsed_ms);
1076 }
1077 }
1078 }
1079
1080 tracing::debug!(
1081 elapsed_ms = elapsed_ms,
1082 result_ok = result.is_ok(),
1083 "Pool execute finished (queued path)"
1084 );
1085 result
1086 }
1087 }
1088 }
1089
1090 pub fn is_dead_server_error(err: &PluginError) -> bool {
1092 let error_str = err.to_string();
1093 let lower = error_str.to_lowercase();
1094
1095 if lower.contains("handler timed out")
1096 || (lower.contains("plugin") && lower.contains("timed out"))
1097 {
1098 return false;
1099 }
1100
1101 DeadServerIndicator::from_error_str(&error_str).is_some()
1102 }
1103
1104 pub async fn precompile_plugin(
1106 &self,
1107 plugin_id: String,
1108 plugin_path: Option<String>,
1109 source_code: Option<String>,
1110 ) -> Result<String, PluginError> {
1111 self.ensure_started().await?;
1112
1113 let mut conn = self.connection_pool.acquire().await?;
1114
1115 let request = PoolRequest::Precompile {
1116 task_id: Uuid::new_v4().to_string(),
1117 plugin_id: plugin_id.clone(),
1118 plugin_path,
1119 source_code,
1120 };
1121
1122 let response = conn
1123 .send_request_with_timeout(&request, get_config().pool_request_timeout_secs)
1124 .await?;
1125
1126 if response.success {
1127 response
1128 .result
1129 .and_then(|v| {
1130 v.get("code")
1131 .and_then(|c| c.as_str())
1132 .map(|s| s.to_string())
1133 })
1134 .ok_or_else(|| {
1135 PluginError::PluginExecutionError("No compiled code in response".to_string())
1136 })
1137 } else {
1138 let error = response.error.unwrap_or(PoolError {
1139 message: "Compilation failed".to_string(),
1140 code: None,
1141 status: None,
1142 details: None,
1143 });
1144 Err(PluginError::PluginExecutionError(error.message))
1145 }
1146 }
1147
1148 pub async fn cache_compiled_code(
1150 &self,
1151 plugin_id: String,
1152 compiled_code: String,
1153 ) -> Result<(), PluginError> {
1154 self.ensure_started().await?;
1155
1156 let mut conn = self.connection_pool.acquire().await?;
1157
1158 let request = PoolRequest::Cache {
1159 task_id: Uuid::new_v4().to_string(),
1160 plugin_id: plugin_id.clone(),
1161 compiled_code,
1162 };
1163
1164 let response = conn
1165 .send_request_with_timeout(&request, get_config().pool_request_timeout_secs)
1166 .await?;
1167
1168 if response.success {
1169 Ok(())
1170 } else {
1171 let error = response.error.unwrap_or(PoolError {
1172 message: "Cache failed".to_string(),
1173 code: None,
1174 status: None,
1175 details: None,
1176 });
1177 Err(PluginError::PluginError(error.message))
1178 }
1179 }
1180
1181 pub async fn invalidate_plugin(&self, plugin_id: String) -> Result<(), PluginError> {
1183 if !self.initialized.load(Ordering::Acquire) {
1184 return Ok(());
1185 }
1186
1187 let mut conn = self.connection_pool.acquire().await?;
1188
1189 let request = PoolRequest::Invalidate {
1190 task_id: Uuid::new_v4().to_string(),
1191 plugin_id,
1192 };
1193
1194 let _ = conn
1195 .send_request_with_timeout(&request, get_config().pool_request_timeout_secs)
1196 .await?;
1197 Ok(())
1198 }
1199
1200 async fn collect_socket_stats(
1203 &self,
1204 ) -> (
1205 Option<usize>,
1206 Option<usize>,
1207 Option<usize>,
1208 Option<usize>,
1209 Option<usize>,
1210 ) {
1211 let (shared_available, shared_active, shared_executions) = match get_shared_socket_service()
1213 {
1214 Ok(service) => {
1215 let available = service.available_connection_slots();
1216 let active = service.active_connection_count();
1217 let executions = service.registered_executions_count().await;
1218 (Some(available), Some(active), Some(executions))
1219 }
1220 Err(_) => (None, None, None),
1221 };
1222
1223 let pool_available = self.connection_pool.semaphore.available_permits();
1225 let pool_max = get_config().pool_max_connections;
1226 let pool_active = pool_max.saturating_sub(pool_available);
1227
1228 (
1229 shared_available,
1230 shared_active,
1231 shared_executions,
1232 Some(pool_available),
1233 Some(pool_active),
1234 )
1235 }
1236
1237 pub async fn health_check(&self) -> Result<HealthStatus, PluginError> {
1238 let circuit_info = || {
1239 let state = match self.circuit_breaker.state() {
1240 CircuitState::Closed => "closed",
1241 CircuitState::HalfOpen => "half_open",
1242 CircuitState::Open => "open",
1243 };
1244 (
1245 Some(state.to_string()),
1246 Some(self.circuit_breaker.avg_response_time()),
1247 Some(self.recovery_mode.load(Ordering::Relaxed)),
1248 Some(self.recovery_allowance.load(Ordering::Relaxed)),
1249 )
1250 };
1251
1252 let socket_stats = self.collect_socket_stats().await;
1253
1254 if !self.initialized.load(Ordering::Acquire) {
1255 let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1256 let (shared_available, shared_active, shared_executions, pool_available, pool_active) =
1257 socket_stats;
1258 return Ok(HealthStatus {
1259 healthy: false,
1260 status: "not_initialized".to_string(),
1261 uptime_ms: None,
1262 memory: None,
1263 pool_completed: None,
1264 pool_queued: None,
1265 success_rate: None,
1266 circuit_state,
1267 avg_response_time_ms: avg_rt,
1268 recovering,
1269 recovery_percent: recovery_pct,
1270 shared_socket_available_slots: shared_available,
1271 shared_socket_active_connections: shared_active,
1272 shared_socket_registered_executions: shared_executions,
1273 connection_pool_available_slots: pool_available,
1274 connection_pool_active_connections: pool_active,
1275 });
1276 }
1277
1278 if !std::path::Path::new(&self.socket_path).exists() {
1279 let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1280 let (shared_available, shared_active, shared_executions, pool_available, pool_active) =
1281 socket_stats;
1282 return Ok(HealthStatus {
1283 healthy: false,
1284 status: "socket_missing".to_string(),
1285 uptime_ms: None,
1286 memory: None,
1287 pool_completed: None,
1288 pool_queued: None,
1289 success_rate: None,
1290 circuit_state,
1291 avg_response_time_ms: avg_rt,
1292 recovering,
1293 recovery_percent: recovery_pct,
1294 shared_socket_available_slots: shared_available,
1295 shared_socket_active_connections: shared_active,
1296 shared_socket_registered_executions: shared_executions,
1297 connection_pool_available_slots: pool_available,
1298 connection_pool_active_connections: pool_active,
1299 });
1300 }
1301
1302 let mut conn =
1303 match tokio::time::timeout(Duration::from_millis(100), self.connection_pool.acquire())
1304 .await
1305 {
1306 Ok(Ok(c)) => c,
1307 Ok(Err(e)) => {
1308 let err_str = e.to_string();
1309 let is_pool_exhausted =
1310 err_str.contains("semaphore") || err_str.contains("Connection refused");
1311
1312 let process_status = match self.process.try_lock() {
1314 Ok(guard) => {
1315 if let Some(child) = guard.as_ref() {
1316 format!("process_pid_{}", child.id().unwrap_or(0))
1317 } else {
1318 "no_process".to_string()
1319 }
1320 }
1321 Err(_) => "process_lock_busy".to_string(),
1322 };
1323
1324 let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1325 let (
1326 shared_available,
1327 shared_active,
1328 shared_executions,
1329 pool_available,
1330 pool_active,
1331 ) = socket_stats;
1332 return Ok(HealthStatus {
1333 healthy: is_pool_exhausted,
1334 status: if is_pool_exhausted {
1335 format!("pool_exhausted: {e} ({process_status})")
1336 } else {
1337 format!("connection_failed: {e} ({process_status})")
1338 },
1339 uptime_ms: None,
1340 memory: None,
1341 pool_completed: None,
1342 pool_queued: None,
1343 success_rate: None,
1344 circuit_state,
1345 avg_response_time_ms: avg_rt,
1346 recovering,
1347 recovery_percent: recovery_pct,
1348 shared_socket_available_slots: shared_available,
1349 shared_socket_active_connections: shared_active,
1350 shared_socket_registered_executions: shared_executions,
1351 connection_pool_available_slots: pool_available,
1352 connection_pool_active_connections: pool_active,
1353 });
1354 }
1355 Err(_) => {
1356 let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1357 let (
1358 shared_available,
1359 shared_active,
1360 shared_executions,
1361 pool_available,
1362 pool_active,
1363 ) = socket_stats;
1364 return Ok(HealthStatus {
1365 healthy: true,
1366 status: "pool_busy".to_string(),
1367 uptime_ms: None,
1368 memory: None,
1369 pool_completed: None,
1370 pool_queued: None,
1371 success_rate: None,
1372 circuit_state,
1373 avg_response_time_ms: avg_rt,
1374 recovering,
1375 recovery_percent: recovery_pct,
1376 shared_socket_available_slots: shared_available,
1377 shared_socket_active_connections: shared_active,
1378 shared_socket_registered_executions: shared_executions,
1379 connection_pool_available_slots: pool_available,
1380 connection_pool_active_connections: pool_active,
1381 });
1382 }
1383 };
1384
1385 let request = PoolRequest::Health {
1386 task_id: Uuid::new_v4().to_string(),
1387 };
1388
1389 let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1390
1391 match conn.send_request_with_timeout(&request, 5).await {
1392 Ok(response) => {
1393 if response.success {
1394 let result = response.result.unwrap_or_default();
1395 let parsed = Self::parse_health_result(&result);
1397
1398 {
1399 let (
1400 shared_available,
1401 shared_active,
1402 shared_executions,
1403 pool_available,
1404 pool_active,
1405 ) = socket_stats;
1406 Ok(HealthStatus {
1407 healthy: true,
1408 status: parsed.status,
1409 uptime_ms: parsed.uptime_ms,
1410 memory: parsed.memory,
1411 pool_completed: parsed.pool_completed,
1412 pool_queued: parsed.pool_queued,
1413 success_rate: parsed.success_rate,
1414 circuit_state,
1415 avg_response_time_ms: avg_rt,
1416 recovering,
1417 recovery_percent: recovery_pct,
1418 shared_socket_available_slots: shared_available,
1419 shared_socket_active_connections: shared_active,
1420 shared_socket_registered_executions: shared_executions,
1421 connection_pool_available_slots: pool_available,
1422 connection_pool_active_connections: pool_active,
1423 })
1424 }
1425 } else {
1426 let (
1427 shared_available,
1428 shared_active,
1429 shared_executions,
1430 pool_available,
1431 pool_active,
1432 ) = socket_stats;
1433 Ok(HealthStatus {
1434 healthy: false,
1435 status: response
1436 .error
1437 .map(|e| e.message)
1438 .unwrap_or_else(|| "unknown_error".to_string()),
1439 uptime_ms: None,
1440 memory: None,
1441 pool_completed: None,
1442 pool_queued: None,
1443 success_rate: None,
1444 circuit_state,
1445 avg_response_time_ms: avg_rt,
1446 recovering,
1447 recovery_percent: recovery_pct,
1448 shared_socket_available_slots: shared_available,
1449 shared_socket_active_connections: shared_active,
1450 shared_socket_registered_executions: shared_executions,
1451 connection_pool_available_slots: pool_available,
1452 connection_pool_active_connections: pool_active,
1453 })
1454 }
1455 }
1456 Err(e) => {
1457 let (
1458 shared_available,
1459 shared_active,
1460 shared_executions,
1461 pool_available,
1462 pool_active,
1463 ) = socket_stats;
1464 Ok(HealthStatus {
1465 healthy: false,
1466 status: format!("request_failed: {e}"),
1467 uptime_ms: None,
1468 memory: None,
1469 pool_completed: None,
1470 pool_queued: None,
1471 success_rate: None,
1472 circuit_state,
1473 avg_response_time_ms: avg_rt,
1474 recovering,
1475 recovery_percent: recovery_pct,
1476 shared_socket_available_slots: shared_available,
1477 shared_socket_active_connections: shared_active,
1478 shared_socket_registered_executions: shared_executions,
1479 connection_pool_available_slots: pool_available,
1480 connection_pool_active_connections: pool_active,
1481 })
1482 }
1483 }
1484 }
1485
1486 pub async fn ensure_healthy(&self) -> Result<bool, PluginError> {
1488 let health = self.health_check().await?;
1489
1490 if health.healthy {
1491 return Ok(true);
1492 }
1493
1494 match self.restart_lock.try_lock() {
1495 Ok(_guard) => {
1496 let health_recheck = self.health_check().await?;
1497 if health_recheck.healthy {
1498 return Ok(true);
1499 }
1500
1501 tracing::warn!(status = %health.status, "Pool server unhealthy, attempting restart");
1502 self.restart_internal().await?;
1503 }
1504 Err(_) => {
1505 tracing::debug!("Waiting for another task to complete pool server restart");
1506 let _guard = self.restart_lock.lock().await;
1507 }
1508 }
1509
1510 let health_after = self.health_check().await?;
1511 Ok(health_after.healthy)
1512 }
1513
1514 pub async fn restart(&self) -> Result<(), PluginError> {
1516 let _guard = self.restart_lock.lock().await;
1517 self.restart_internal().await
1518 }
1519
1520 async fn restart_internal(&self) -> Result<(), PluginError> {
1522 tracing::info!("Restarting plugin pool server");
1523
1524 {
1525 let mut process_guard = self.process.lock().await;
1526 if let Some(mut child) = process_guard.take() {
1527 let _ = child.kill().await;
1528 tokio::time::sleep(Duration::from_millis(100)).await;
1529 }
1530 }
1531
1532 Self::cleanup_socket_file(&self.socket_path).await;
1533
1534 self.initialized.store(false, Ordering::Release);
1535
1536 let mut process_guard = self.process.lock().await;
1537 if process_guard.is_some() {
1538 return Ok(());
1539 }
1540
1541 let child = Self::spawn_pool_server_process(&self.socket_path, "restart").await?;
1542 *process_guard = Some(child);
1543
1544 self.initialized.store(true, Ordering::Release);
1545
1546 self.recovery_allowance.store(10, Ordering::Relaxed);
1547 self.recovery_mode.store(true, Ordering::Relaxed);
1548
1549 self.circuit_breaker.force_close();
1550
1551 let now = std::time::SystemTime::now()
1552 .duration_since(std::time::UNIX_EPOCH)
1553 .unwrap_or_default()
1554 .as_millis() as u64;
1555 self.last_restart_time_ms.store(now, Ordering::Relaxed);
1556
1557 tracing::info!("Recovery mode enabled - requests will gradually increase from 10%");
1558
1559 Ok(())
1560 }
1561
1562 pub fn circuit_state(&self) -> CircuitState {
1564 self.circuit_breaker.state()
1565 }
1566
1567 pub fn avg_response_time_ms(&self) -> u32 {
1569 self.circuit_breaker.avg_response_time()
1570 }
1571
1572 pub fn is_recovering(&self) -> bool {
1574 self.recovery_mode.load(Ordering::Relaxed)
1575 }
1576
1577 pub fn recovery_allowance_percent(&self) -> u32 {
1579 self.recovery_allowance.load(Ordering::Relaxed)
1580 }
1581
1582 pub async fn shutdown(&self) -> Result<(), PluginError> {
1584 if !self.initialized.load(Ordering::Acquire) {
1585 return Ok(());
1586 }
1587
1588 tracing::info!("Initiating graceful shutdown of plugin pool server");
1589
1590 self.shutdown_signal.notify_waiters();
1591
1592 let shutdown_timeout = std::time::Duration::from_secs(35);
1593 let shutdown_result = self.send_shutdown_request(shutdown_timeout).await;
1594
1595 match &shutdown_result {
1596 Ok(response) => {
1597 tracing::info!(
1598 response = ?response,
1599 "Pool server acknowledged shutdown, waiting for graceful exit"
1600 );
1601 }
1602 Err(e) => {
1603 tracing::warn!(
1604 error = %e,
1605 "Failed to send shutdown request, will force kill"
1606 );
1607 }
1608 }
1609
1610 let mut process_guard = self.process.lock().await;
1611 if let Some(ref mut child) = *process_guard {
1612 let graceful_wait = std::time::Duration::from_secs(35);
1613 let start = std::time::Instant::now();
1614
1615 loop {
1616 match child.try_wait() {
1617 Ok(Some(status)) => {
1618 tracing::info!(
1619 exit_status = ?status,
1620 elapsed_ms = start.elapsed().as_millis(),
1621 "Pool server exited gracefully"
1622 );
1623 break;
1624 }
1625 Ok(None) => {
1626 if start.elapsed() >= graceful_wait {
1627 tracing::warn!(
1628 "Pool server did not exit within graceful timeout, force killing"
1629 );
1630 let _ = child.kill().await;
1631 break;
1632 }
1633 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1634 }
1635 Err(e) => {
1636 tracing::warn!(error = %e, "Error checking pool server status");
1637 let _ = child.kill().await;
1638 break;
1639 }
1640 }
1641 }
1642 }
1643 *process_guard = None;
1644
1645 let _ = std::fs::remove_file(&self.socket_path);
1646
1647 self.initialized.store(false, Ordering::Release);
1648 tracing::info!("Plugin pool server shutdown complete");
1649 Ok(())
1650 }
1651
1652 async fn send_shutdown_request(
1654 &self,
1655 timeout: std::time::Duration,
1656 ) -> Result<PoolResponse, PluginError> {
1657 let request = PoolRequest::Shutdown {
1658 task_id: Uuid::new_v4().to_string(),
1659 };
1660
1661 let connection_id = self.connection_pool.next_connection_id();
1664 let mut conn = match PoolConnection::new(&self.socket_path, connection_id).await {
1665 Ok(c) => c,
1666 Err(e) => {
1667 return Err(PluginError::PluginExecutionError(format!(
1668 "Failed to connect for shutdown: {e}"
1669 )));
1670 }
1671 };
1672
1673 conn.send_request_with_timeout(&request, timeout.as_secs())
1674 .await
1675 }
1676}
1677
1678impl Drop for PoolManager {
1679 fn drop(&mut self) {
1680 let _ = std::fs::remove_file(&self.socket_path);
1681 }
1682}
1683
1684static POOL_MANAGER: std::sync::OnceLock<Arc<PoolManager>> = std::sync::OnceLock::new();
1686
1687pub fn get_pool_manager() -> Arc<PoolManager> {
1689 POOL_MANAGER
1690 .get_or_init(|| Arc::new(PoolManager::new()))
1691 .clone()
1692}
1693
1694#[cfg(test)]
1695mod tests {
1696 use super::*;
1697 use crate::services::plugins::script_executor::LogLevel;
1698
1699 #[test]
1700 fn test_is_dead_server_error_detects_dead_server() {
1701 let err = PluginError::PluginExecutionError("Connection refused".to_string());
1702 assert!(PoolManager::is_dead_server_error(&err));
1703
1704 let err = PluginError::PluginExecutionError("Broken pipe".to_string());
1705 assert!(PoolManager::is_dead_server_error(&err));
1706 }
1707
1708 #[test]
1709 fn test_is_dead_server_error_excludes_plugin_timeouts() {
1710 let err = PluginError::PluginExecutionError("Plugin timed out after 30s".to_string());
1711 assert!(!PoolManager::is_dead_server_error(&err));
1712
1713 let err = PluginError::PluginExecutionError("Handler timed out".to_string());
1714 assert!(!PoolManager::is_dead_server_error(&err));
1715 }
1716
1717 #[test]
1718 fn test_is_dead_server_error_normal_errors() {
1719 let err =
1720 PluginError::PluginExecutionError("TypeError: undefined is not a function".to_string());
1721 assert!(!PoolManager::is_dead_server_error(&err));
1722
1723 let err = PluginError::PluginExecutionError("Plugin returned invalid JSON".to_string());
1724 assert!(!PoolManager::is_dead_server_error(&err));
1725 }
1726
1727 #[test]
1728 fn test_is_dead_server_error_detects_all_dead_server_indicators() {
1729 let dead_server_errors = vec![
1731 "EOF while parsing JSON response",
1732 "Broken pipe when writing to socket",
1733 "Connection refused: server not running",
1734 "Connection reset by peer",
1735 "Socket not connected",
1736 "Failed to connect to pool server",
1737 "Socket file missing: /tmp/test.sock",
1738 "No such file or directory",
1739 ];
1740
1741 for error_msg in dead_server_errors {
1742 let err = PluginError::PluginExecutionError(error_msg.to_string());
1743 assert!(
1744 PoolManager::is_dead_server_error(&err),
1745 "Expected '{error_msg}' to be detected as dead server error"
1746 );
1747 }
1748 }
1749
1750 #[test]
1751 fn test_dead_server_indicator_patterns() {
1752 use super::super::health::DeadServerIndicator;
1754
1755 assert!(DeadServerIndicator::from_error_str("eof while parsing").is_some());
1757 assert!(DeadServerIndicator::from_error_str("broken pipe").is_some());
1758 assert!(DeadServerIndicator::from_error_str("connection refused").is_some());
1759 assert!(DeadServerIndicator::from_error_str("connection reset").is_some());
1760 assert!(DeadServerIndicator::from_error_str("not connected").is_some());
1761 assert!(DeadServerIndicator::from_error_str("failed to connect").is_some());
1762 assert!(DeadServerIndicator::from_error_str("socket file missing").is_some());
1763 assert!(DeadServerIndicator::from_error_str("no such file").is_some());
1764 assert!(DeadServerIndicator::from_error_str("connection timed out").is_some());
1765 assert!(DeadServerIndicator::from_error_str("connect timed out").is_some());
1766
1767 assert!(DeadServerIndicator::from_error_str("handler timed out").is_none());
1769 assert!(DeadServerIndicator::from_error_str("validation error").is_none());
1770 assert!(DeadServerIndicator::from_error_str("TypeError: undefined").is_none());
1771 }
1772
1773 #[test]
1774 fn test_is_dead_server_error_excludes_plugin_timeouts_with_connection() {
1775 let plugin_timeout =
1777 PluginError::PluginExecutionError("plugin connection timed out".to_string());
1778 assert!(!PoolManager::is_dead_server_error(&plugin_timeout));
1780 }
1781
1782 #[test]
1783 fn test_is_dead_server_error_case_insensitive() {
1784 let err = PluginError::PluginExecutionError("CONNECTION REFUSED".to_string());
1786 assert!(PoolManager::is_dead_server_error(&err));
1787
1788 let err = PluginError::PluginExecutionError("BROKEN PIPE".to_string());
1789 assert!(PoolManager::is_dead_server_error(&err));
1790
1791 let err = PluginError::PluginExecutionError("Connection Reset By Peer".to_string());
1792 assert!(PoolManager::is_dead_server_error(&err));
1793 }
1794
1795 #[test]
1796 fn test_is_dead_server_error_handler_timeout_variations() {
1797 let timeout_errors = vec![
1799 "Handler timed out",
1800 "handler timed out after 30000ms",
1801 "Plugin handler timed out",
1802 "plugin timed out",
1803 "Plugin execution timed out after 60s",
1804 ];
1805
1806 for error_msg in timeout_errors {
1807 let err = PluginError::PluginExecutionError(error_msg.to_string());
1808 assert!(
1809 !PoolManager::is_dead_server_error(&err),
1810 "Expected '{error_msg}' to NOT be detected as dead server error"
1811 );
1812 }
1813 }
1814
1815 #[test]
1816 fn test_is_dead_server_error_business_errors_not_detected() {
1817 let business_errors = vec![
1819 "ReferenceError: x is not defined",
1820 "SyntaxError: Unexpected token",
1821 "TypeError: Cannot read property 'foo' of undefined",
1822 "Plugin returned status 400: Bad Request",
1823 "Validation error: missing required field",
1824 "Authorization failed",
1825 "Rate limit exceeded",
1826 "Plugin threw an error: Invalid input",
1827 ];
1828
1829 for error_msg in business_errors {
1830 let err = PluginError::PluginExecutionError(error_msg.to_string());
1831 assert!(
1832 !PoolManager::is_dead_server_error(&err),
1833 "Expected '{error_msg}' to NOT be detected as dead server error"
1834 );
1835 }
1836 }
1837
1838 #[test]
1839 fn test_is_dead_server_error_with_handler_error_type() {
1840 let handler_payload = PluginHandlerPayload {
1842 message: "Connection refused".to_string(),
1843 status: 500,
1844 code: None,
1845 details: None,
1846 logs: None,
1847 traces: None,
1848 };
1849 let err = PluginError::HandlerError(Box::new(handler_payload));
1850 assert!(PoolManager::is_dead_server_error(&err));
1853 }
1854
1855 #[test]
1860 fn test_heap_calculation_base_case() {
1861 let base = PoolManager::BASE_HEAP_MB;
1863 let divisor = PoolManager::CONCURRENCY_DIVISOR;
1864 let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1865
1866 let concurrency = 100;
1869 let expected = base + ((concurrency / divisor) * increment);
1870 assert_eq!(expected, 832);
1871 }
1872
1873 #[test]
1874 fn test_heap_calculation_minimum() {
1875 let base = PoolManager::BASE_HEAP_MB;
1877 let divisor = PoolManager::CONCURRENCY_DIVISOR;
1878 let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1879
1880 let concurrency = 5;
1883 let expected = base + ((concurrency / divisor) * increment);
1884 assert_eq!(expected, 512);
1885 }
1886
1887 #[test]
1888 fn test_heap_calculation_high_concurrency() {
1889 let base = PoolManager::BASE_HEAP_MB;
1891 let divisor = PoolManager::CONCURRENCY_DIVISOR;
1892 let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1893
1894 let concurrency = 500;
1897 let expected = base + ((concurrency / divisor) * increment);
1898 assert_eq!(expected, 2112);
1899 }
1900
1901 #[test]
1902 fn test_heap_calculation_max_cap() {
1903 let max_heap = PoolManager::MAX_HEAP_MB;
1905 assert_eq!(max_heap, 8192);
1906
1907 let base = PoolManager::BASE_HEAP_MB;
1911 let divisor = PoolManager::CONCURRENCY_DIVISOR;
1912 let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1913
1914 let concurrency = 3000;
1915 let calculated = base + ((concurrency / divisor) * increment);
1916 let capped = calculated.min(max_heap);
1917
1918 assert_eq!(calculated, 10112);
1919 assert_eq!(capped, 8192);
1920 }
1921
1922 #[test]
1927 fn test_pool_manager_constants() {
1928 assert_eq!(PoolManager::BASE_HEAP_MB, 512);
1930 assert_eq!(PoolManager::CONCURRENCY_DIVISOR, 10);
1931 assert_eq!(PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB, 32);
1932 assert_eq!(PoolManager::MAX_HEAP_MB, 8192);
1933 }
1934
1935 #[test]
1940 fn test_calculate_heap_size_low_concurrency() {
1941 assert_eq!(PoolManager::calculate_heap_size(5), 512);
1943 assert_eq!(PoolManager::calculate_heap_size(9), 512);
1944 }
1945
1946 #[test]
1947 fn test_calculate_heap_size_medium_concurrency() {
1948 assert_eq!(PoolManager::calculate_heap_size(10), 544);
1950 assert_eq!(PoolManager::calculate_heap_size(50), 672);
1952 assert_eq!(PoolManager::calculate_heap_size(100), 832);
1954 }
1955
1956 #[test]
1957 fn test_calculate_heap_size_high_concurrency() {
1958 assert_eq!(PoolManager::calculate_heap_size(500), 2112);
1960 assert_eq!(PoolManager::calculate_heap_size(1000), 3712);
1962 }
1963
1964 #[test]
1965 fn test_calculate_heap_size_capped_at_max() {
1966 assert_eq!(PoolManager::calculate_heap_size(3000), 8192);
1968 assert_eq!(PoolManager::calculate_heap_size(10000), 8192);
1970 }
1971
1972 #[test]
1973 fn test_calculate_heap_size_zero_concurrency() {
1974 assert_eq!(PoolManager::calculate_heap_size(0), 512);
1976 }
1977
1978 #[test]
1983 fn test_format_return_value_none() {
1984 assert_eq!(PoolManager::format_return_value(None), "");
1985 }
1986
1987 #[test]
1988 fn test_format_return_value_string() {
1989 let value = Some(serde_json::json!("hello world"));
1990 assert_eq!(PoolManager::format_return_value(value), "hello world");
1991 }
1992
1993 #[test]
1994 fn test_format_return_value_empty_string() {
1995 let value = Some(serde_json::json!(""));
1996 assert_eq!(PoolManager::format_return_value(value), "");
1997 }
1998
1999 #[test]
2000 fn test_format_return_value_object() {
2001 let value = Some(serde_json::json!({"key": "value", "num": 42}));
2002 let result = PoolManager::format_return_value(value);
2003 assert!(result.contains("key"));
2005 assert!(result.contains("value"));
2006 assert!(result.contains("42"));
2007 }
2008
2009 #[test]
2010 fn test_format_return_value_array() {
2011 let value = Some(serde_json::json!([1, 2, 3]));
2012 assert_eq!(PoolManager::format_return_value(value), "[1,2,3]");
2013 }
2014
2015 #[test]
2016 fn test_format_return_value_number() {
2017 let value = Some(serde_json::json!(42));
2018 assert_eq!(PoolManager::format_return_value(value), "42");
2019 }
2020
2021 #[test]
2022 fn test_format_return_value_boolean() {
2023 assert_eq!(
2024 PoolManager::format_return_value(Some(serde_json::json!(true))),
2025 "true"
2026 );
2027 assert_eq!(
2028 PoolManager::format_return_value(Some(serde_json::json!(false))),
2029 "false"
2030 );
2031 }
2032
2033 #[test]
2034 fn test_format_return_value_null() {
2035 let value = Some(serde_json::json!(null));
2036 assert_eq!(PoolManager::format_return_value(value), "null");
2037 }
2038
2039 #[test]
2044 fn test_parse_pool_response_success_with_string_result() {
2045 use super::super::protocol::{PoolLogEntry, PoolResponse};
2046
2047 let response = PoolResponse {
2048 task_id: "test-123".to_string(),
2049 success: true,
2050 result: Some(serde_json::json!("success result")),
2051 error: None,
2052 logs: Some(vec![PoolLogEntry {
2053 level: "info".to_string(),
2054 message: "test log".to_string(),
2055 }]),
2056 };
2057
2058 let result = PoolManager::parse_pool_response(response).unwrap();
2059 assert_eq!(result.return_value, "success result");
2060 assert!(result.error.is_empty());
2061 assert_eq!(result.logs.len(), 1);
2062 assert_eq!(result.logs[0].level, LogLevel::Info);
2063 assert_eq!(result.logs[0].message, "test log");
2064 }
2065
2066 #[test]
2067 fn test_parse_pool_response_success_with_object_result() {
2068 use super::super::protocol::PoolResponse;
2069
2070 let response = PoolResponse {
2071 task_id: "test-456".to_string(),
2072 success: true,
2073 result: Some(serde_json::json!({"data": "value"})),
2074 error: None,
2075 logs: None,
2076 };
2077
2078 let result = PoolManager::parse_pool_response(response).unwrap();
2079 assert!(result.return_value.contains("data"));
2080 assert!(result.return_value.contains("value"));
2081 assert!(result.logs.is_empty());
2082 }
2083
2084 #[test]
2085 fn test_parse_pool_response_success_no_result() {
2086 use super::super::protocol::PoolResponse;
2087
2088 let response = PoolResponse {
2089 task_id: "test-789".to_string(),
2090 success: true,
2091 result: None,
2092 error: None,
2093 logs: None,
2094 };
2095
2096 let result = PoolManager::parse_pool_response(response).unwrap();
2097 assert_eq!(result.return_value, "");
2098 assert!(result.error.is_empty());
2099 }
2100
2101 #[test]
2102 fn test_parse_pool_response_failure_with_error() {
2103 use super::super::protocol::{PoolError, PoolResponse};
2104
2105 let response = PoolResponse {
2106 task_id: "test-error".to_string(),
2107 success: false,
2108 result: None,
2109 error: Some(PoolError {
2110 message: "Something went wrong".to_string(),
2111 code: Some("ERR_001".to_string()),
2112 status: Some(400),
2113 details: Some(serde_json::json!({"field": "name"})),
2114 }),
2115 logs: None,
2116 };
2117
2118 let err = PoolManager::parse_pool_response(response).unwrap_err();
2119 match err {
2120 PluginError::HandlerError(payload) => {
2121 assert_eq!(payload.message, "Something went wrong");
2122 assert_eq!(payload.status, 400);
2123 assert_eq!(payload.code, Some("ERR_001".to_string()));
2124 }
2125 _ => panic!("Expected HandlerError"),
2126 }
2127 }
2128
2129 #[test]
2130 fn test_parse_pool_response_failure_no_error_details() {
2131 use super::super::protocol::PoolResponse;
2132
2133 let response = PoolResponse {
2134 task_id: "test-unknown".to_string(),
2135 success: false,
2136 result: None,
2137 error: None,
2138 logs: None,
2139 };
2140
2141 let err = PoolManager::parse_pool_response(response).unwrap_err();
2142 match err {
2143 PluginError::HandlerError(payload) => {
2144 assert_eq!(payload.message, "Unknown error");
2145 assert_eq!(payload.status, 500);
2146 }
2147 _ => panic!("Expected HandlerError"),
2148 }
2149 }
2150
2151 #[test]
2152 fn test_parse_pool_response_failure_preserves_logs() {
2153 use super::super::protocol::{PoolError, PoolLogEntry, PoolResponse};
2154
2155 let response = PoolResponse {
2156 task_id: "test-logs".to_string(),
2157 success: false,
2158 result: None,
2159 error: Some(PoolError {
2160 message: "Error with logs".to_string(),
2161 code: None,
2162 status: None,
2163 details: None,
2164 }),
2165 logs: Some(vec![
2166 PoolLogEntry {
2167 level: "debug".to_string(),
2168 message: "debug message".to_string(),
2169 },
2170 PoolLogEntry {
2171 level: "error".to_string(),
2172 message: "error message".to_string(),
2173 },
2174 ]),
2175 };
2176
2177 let err = PoolManager::parse_pool_response(response).unwrap_err();
2178 match err {
2179 PluginError::HandlerError(payload) => {
2180 let logs = payload.logs.unwrap();
2181 assert_eq!(logs.len(), 2);
2182 assert_eq!(logs[0].level, LogLevel::Debug);
2183 assert_eq!(logs[1].level, LogLevel::Error);
2184 }
2185 _ => panic!("Expected HandlerError"),
2186 }
2187 }
2188
2189 #[test]
2194 fn test_parse_success_response_complete() {
2195 use super::super::protocol::{PoolLogEntry, PoolResponse};
2196
2197 let response = PoolResponse {
2198 task_id: "task-1".to_string(),
2199 success: true,
2200 result: Some(serde_json::json!("completed")),
2201 error: None,
2202 logs: Some(vec![
2203 PoolLogEntry {
2204 level: "log".to_string(),
2205 message: "starting".to_string(),
2206 },
2207 PoolLogEntry {
2208 level: "result".to_string(),
2209 message: "finished".to_string(),
2210 },
2211 ]),
2212 };
2213
2214 let result = PoolManager::parse_success_response(response);
2215 assert_eq!(result.return_value, "completed");
2216 assert!(result.error.is_empty());
2217 assert_eq!(result.logs.len(), 2);
2218 assert_eq!(result.logs[0].level, LogLevel::Log);
2219 assert_eq!(result.logs[1].level, LogLevel::Result);
2220 }
2221
2222 #[test]
2227 fn test_parse_error_response_with_all_fields() {
2228 use super::super::protocol::{PoolError, PoolLogEntry, PoolResponse};
2229
2230 let response = PoolResponse {
2231 task_id: "err-task".to_string(),
2232 success: false,
2233 result: None,
2234 error: Some(PoolError {
2235 message: "Validation failed".to_string(),
2236 code: Some("VALIDATION_ERROR".to_string()),
2237 status: Some(422),
2238 details: Some(serde_json::json!({"fields": ["email"]})),
2239 }),
2240 logs: Some(vec![PoolLogEntry {
2241 level: "warn".to_string(),
2242 message: "validation warning".to_string(),
2243 }]),
2244 };
2245
2246 let err = PoolManager::parse_error_response(response);
2247 match err {
2248 PluginError::HandlerError(payload) => {
2249 assert_eq!(payload.message, "Validation failed");
2250 assert_eq!(payload.status, 422);
2251 assert_eq!(payload.code, Some("VALIDATION_ERROR".to_string()));
2252 assert!(payload.details.is_some());
2253 let logs = payload.logs.unwrap();
2254 assert_eq!(logs.len(), 1);
2255 assert_eq!(logs[0].level, LogLevel::Warn);
2256 }
2257 _ => panic!("Expected HandlerError"),
2258 }
2259 }
2260
2261 #[test]
2266 fn test_parse_health_result_complete() {
2267 let json = serde_json::json!({
2268 "status": "healthy",
2269 "uptime": 123456,
2270 "memory": {
2271 "heapUsed": 50000000,
2272 "heapTotal": 100000000
2273 },
2274 "pool": {
2275 "completed": 1000,
2276 "queued": 5
2277 },
2278 "execution": {
2279 "successRate": 0.99
2280 }
2281 });
2282
2283 let result = PoolManager::parse_health_result(&json);
2284
2285 assert_eq!(result.status, "healthy");
2286 assert_eq!(result.uptime_ms, Some(123456));
2287 assert_eq!(result.memory, Some(50000000));
2288 assert_eq!(result.pool_completed, Some(1000));
2289 assert_eq!(result.pool_queued, Some(5));
2290 assert!((result.success_rate.unwrap() - 0.99).abs() < 0.001);
2291 }
2292
2293 #[test]
2294 fn test_parse_health_result_minimal() {
2295 let json = serde_json::json!({});
2296
2297 let result = PoolManager::parse_health_result(&json);
2298
2299 assert_eq!(result.status, "unknown");
2300 assert_eq!(result.uptime_ms, None);
2301 assert_eq!(result.memory, None);
2302 assert_eq!(result.pool_completed, None);
2303 assert_eq!(result.pool_queued, None);
2304 assert_eq!(result.success_rate, None);
2305 }
2306
2307 #[test]
2308 fn test_parse_health_result_partial() {
2309 let json = serde_json::json!({
2310 "status": "degraded",
2311 "uptime": 5000,
2312 "memory": {
2313 "heapTotal": 100000000
2314 }
2316 });
2317
2318 let result = PoolManager::parse_health_result(&json);
2319
2320 assert_eq!(result.status, "degraded");
2321 assert_eq!(result.uptime_ms, Some(5000));
2322 assert_eq!(result.memory, None); assert_eq!(result.pool_completed, None);
2324 assert_eq!(result.pool_queued, None);
2325 assert_eq!(result.success_rate, None);
2326 }
2327
2328 #[test]
2329 fn test_parse_health_result_wrong_types() {
2330 let json = serde_json::json!({
2331 "status": 123, "uptime": "not a number", "memory": "invalid" });
2335
2336 let result = PoolManager::parse_health_result(&json);
2337
2338 assert_eq!(result.status, "unknown"); assert_eq!(result.uptime_ms, None);
2340 assert_eq!(result.memory, None);
2341 assert_eq!(result.pool_completed, None);
2342 assert_eq!(result.pool_queued, None);
2343 assert_eq!(result.success_rate, None);
2344 }
2345
2346 #[test]
2347 fn test_parse_health_result_nested_values() {
2348 let json = serde_json::json!({
2349 "pool": {
2350 "completed": 0,
2351 "queued": 0
2352 },
2353 "execution": {
2354 "successRate": 1.0
2355 }
2356 });
2357
2358 let result = PoolManager::parse_health_result(&json);
2359
2360 assert_eq!(result.status, "unknown");
2361 assert_eq!(result.uptime_ms, None);
2362 assert_eq!(result.memory, None);
2363 assert_eq!(result.pool_completed, Some(0));
2364 assert_eq!(result.pool_queued, Some(0));
2365 assert!((result.success_rate.unwrap() - 1.0).abs() < 0.001);
2366 }
2367
2368 #[tokio::test]
2373 async fn test_pool_manager_new_creates_unique_socket_path() {
2374 let manager1 = PoolManager::new();
2376 let manager2 = PoolManager::new();
2377
2378 assert_ne!(manager1.socket_path, manager2.socket_path);
2379 assert!(manager1
2380 .socket_path
2381 .starts_with("/tmp/relayer-plugin-pool-"));
2382 assert!(manager2
2383 .socket_path
2384 .starts_with("/tmp/relayer-plugin-pool-"));
2385 }
2386
2387 #[tokio::test]
2388 async fn test_pool_manager_with_custom_socket_path() {
2389 let custom_path = "/tmp/custom-test-pool.sock".to_string();
2390 let manager = PoolManager::with_socket_path(custom_path.clone());
2391
2392 assert_eq!(manager.socket_path, custom_path);
2393 }
2394
2395 #[tokio::test]
2396 async fn test_pool_manager_default_trait() {
2397 let manager = PoolManager::default();
2399 assert!(manager.socket_path.starts_with("/tmp/relayer-plugin-pool-"));
2400 }
2401
2402 #[tokio::test]
2407 async fn test_circuit_state_initial() {
2408 let manager = PoolManager::new();
2409
2410 assert_eq!(manager.circuit_state(), CircuitState::Closed);
2412 }
2413
2414 #[tokio::test]
2415 async fn test_avg_response_time_initial() {
2416 let manager = PoolManager::new();
2417
2418 assert_eq!(manager.avg_response_time_ms(), 0);
2420 }
2421
2422 #[tokio::test]
2427 async fn test_recovery_mode_initial() {
2428 let manager = PoolManager::new();
2429
2430 assert!(!manager.is_recovering());
2432 assert_eq!(manager.recovery_allowance_percent(), 0);
2433 }
2434
2435 #[test]
2440 fn test_script_result_success_construction() {
2441 let result = ScriptResult {
2442 logs: vec![LogEntry {
2443 level: LogLevel::Info,
2444 message: "Test log".to_string(),
2445 }],
2446 error: String::new(),
2447 return_value: r#"{"success": true}"#.to_string(),
2448 trace: vec![],
2449 };
2450
2451 assert!(result.error.is_empty());
2452 assert_eq!(result.logs.len(), 1);
2453 assert_eq!(result.logs[0].level, LogLevel::Info);
2454 }
2455
2456 #[test]
2457 fn test_script_result_with_multiple_logs() {
2458 let result = ScriptResult {
2459 logs: vec![
2460 LogEntry {
2461 level: LogLevel::Log,
2462 message: "Starting execution".to_string(),
2463 },
2464 LogEntry {
2465 level: LogLevel::Debug,
2466 message: "Processing data".to_string(),
2467 },
2468 LogEntry {
2469 level: LogLevel::Warn,
2470 message: "Deprecated API used".to_string(),
2471 },
2472 LogEntry {
2473 level: LogLevel::Error,
2474 message: "Non-fatal error".to_string(),
2475 },
2476 ],
2477 error: String::new(),
2478 return_value: "done".to_string(),
2479 trace: vec![],
2480 };
2481
2482 assert_eq!(result.logs.len(), 4);
2483 assert_eq!(result.logs[0].level, LogLevel::Log);
2484 assert_eq!(result.logs[1].level, LogLevel::Debug);
2485 assert_eq!(result.logs[2].level, LogLevel::Warn);
2486 assert_eq!(result.logs[3].level, LogLevel::Error);
2487 }
2488
2489 #[test]
2494 fn test_queued_request_required_fields() {
2495 let (tx, _rx) = oneshot::channel();
2496
2497 let request = QueuedRequest {
2498 plugin_id: "test-plugin".to_string(),
2499 compiled_code: Some("module.exports.handler = () => {}".to_string()),
2500 plugin_path: None,
2501 params: serde_json::json!({"key": "value"}),
2502 headers: None,
2503 socket_path: "/tmp/test.sock".to_string(),
2504 http_request_id: Some("req-123".to_string()),
2505 timeout_secs: Some(30),
2506 route: Some("/api/test".to_string()),
2507 config: Some(serde_json::json!({"setting": true})),
2508 method: Some("POST".to_string()),
2509 query: Some(serde_json::json!({"page": "1"})),
2510 response_tx: tx,
2511 };
2512
2513 assert_eq!(request.plugin_id, "test-plugin");
2514 assert!(request.compiled_code.is_some());
2515 assert!(request.plugin_path.is_none());
2516 assert_eq!(request.timeout_secs, Some(30));
2517 }
2518
2519 #[test]
2520 fn test_queued_request_minimal() {
2521 let (tx, _rx) = oneshot::channel();
2522
2523 let request = QueuedRequest {
2524 plugin_id: "minimal".to_string(),
2525 compiled_code: None,
2526 plugin_path: Some("/path/to/plugin.ts".to_string()),
2527 params: serde_json::json!(null),
2528 headers: None,
2529 socket_path: "/tmp/min.sock".to_string(),
2530 http_request_id: None,
2531 timeout_secs: None,
2532 route: None,
2533 config: None,
2534 method: None,
2535 query: None,
2536 response_tx: tx,
2537 };
2538
2539 assert_eq!(request.plugin_id, "minimal");
2540 assert!(request.compiled_code.is_none());
2541 assert!(request.plugin_path.is_some());
2542 }
2543
2544 #[test]
2549 fn test_plugin_error_socket_error() {
2550 let err = PluginError::SocketError("Connection failed".to_string());
2551 let display = format!("{err}");
2552 assert!(display.contains("Socket error"));
2553 assert!(display.contains("Connection failed"));
2554 }
2555
2556 #[test]
2557 fn test_plugin_error_plugin_execution_error() {
2558 let err = PluginError::PluginExecutionError("Execution failed".to_string());
2559 let display = format!("{err}");
2560 assert!(display.contains("Execution failed"));
2561 }
2562
2563 #[test]
2564 fn test_plugin_error_handler_error() {
2565 let payload = PluginHandlerPayload {
2566 message: "Handler error".to_string(),
2567 status: 400,
2568 code: Some("BAD_REQUEST".to_string()),
2569 details: Some(serde_json::json!({"field": "name"})),
2570 logs: None,
2571 traces: None,
2572 };
2573 let err = PluginError::HandlerError(Box::new(payload));
2574
2575 let display = format!("{err:?}");
2577 assert!(display.contains("HandlerError"));
2578 }
2579
2580 #[test]
2585 fn test_plugin_handler_payload_full() {
2586 let payload = PluginHandlerPayload {
2587 message: "Validation failed".to_string(),
2588 status: 422,
2589 code: Some("VALIDATION_ERROR".to_string()),
2590 details: Some(serde_json::json!({
2591 "errors": [
2592 {"field": "email", "message": "Invalid format"}
2593 ]
2594 })),
2595 logs: Some(vec![LogEntry {
2596 level: LogLevel::Error,
2597 message: "Validation failed for email".to_string(),
2598 }]),
2599 traces: Some(vec![serde_json::json!({"stack": "Error at line 10"})]),
2600 };
2601
2602 assert_eq!(payload.status, 422);
2603 assert_eq!(payload.code, Some("VALIDATION_ERROR".to_string()));
2604 assert!(payload.logs.is_some());
2605 assert!(payload.traces.is_some());
2606 }
2607
2608 #[test]
2609 fn test_plugin_handler_payload_minimal() {
2610 let payload = PluginHandlerPayload {
2611 message: "Error".to_string(),
2612 status: 500,
2613 code: None,
2614 details: None,
2615 logs: None,
2616 traces: None,
2617 };
2618
2619 assert_eq!(payload.status, 500);
2620 assert!(payload.code.is_none());
2621 assert!(payload.details.is_none());
2622 }
2623
2624 #[tokio::test]
2629 async fn test_pool_manager_not_initialized_health_check() {
2630 let manager = PoolManager::with_socket_path("/tmp/test-health.sock".to_string());
2631
2632 let health = manager.health_check().await.unwrap();
2634
2635 assert!(!health.healthy);
2636 assert_eq!(health.status, "not_initialized");
2637 assert!(health.uptime_ms.is_none());
2638 assert!(health.memory.is_none());
2639 }
2640
2641 #[tokio::test]
2642 async fn test_pool_manager_circuit_info_in_health_status() {
2643 let manager = PoolManager::with_socket_path("/tmp/test-circuit.sock".to_string());
2644
2645 let health = manager.health_check().await.unwrap();
2646
2647 assert!(health.circuit_state.is_some());
2649 assert_eq!(health.circuit_state, Some("closed".to_string()));
2650 assert!(health.avg_response_time_ms.is_some());
2651 assert!(health.recovering.is_some());
2652 assert!(health.recovery_percent.is_some());
2653 }
2654
2655 #[tokio::test]
2656 async fn test_invalidate_plugin_when_not_initialized() {
2657 let manager = PoolManager::with_socket_path("/tmp/test-invalidate.sock".to_string());
2658
2659 let result = manager.invalidate_plugin("test-plugin".to_string()).await;
2661
2662 assert!(result.is_ok());
2664 }
2665
2666 #[tokio::test]
2667 async fn test_shutdown_when_not_initialized() {
2668 let manager = PoolManager::with_socket_path("/tmp/test-shutdown.sock".to_string());
2669
2670 let result = manager.shutdown().await;
2672
2673 assert!(result.is_ok());
2675 }
2676
2677 #[test]
2682 fn test_parsed_health_result_default() {
2683 let result = ParsedHealthResult::default();
2684 assert_eq!(result.status, "");
2685 assert_eq!(result.uptime_ms, None);
2686 assert_eq!(result.memory, None);
2687 assert_eq!(result.pool_completed, None);
2688 assert_eq!(result.pool_queued, None);
2689 assert_eq!(result.success_rate, None);
2690 }
2691
2692 #[test]
2693 fn test_parsed_health_result_equality() {
2694 let result1 = ParsedHealthResult {
2695 status: "ok".to_string(),
2696 uptime_ms: Some(1000),
2697 memory: Some(500000),
2698 pool_completed: Some(50),
2699 pool_queued: Some(2),
2700 success_rate: Some(1.0),
2701 };
2702 let result2 = ParsedHealthResult {
2703 status: "ok".to_string(),
2704 uptime_ms: Some(1000),
2705 memory: Some(500000),
2706 pool_completed: Some(50),
2707 pool_queued: Some(2),
2708 success_rate: Some(1.0),
2709 };
2710 assert_eq!(result1, result2);
2711 }
2712
2713 #[test]
2714 fn test_format_return_value_nested_object() {
2715 let value = Some(serde_json::json!({
2716 "user": { "name": "John", "age": 30 }
2717 }));
2718 let result = PoolManager::format_return_value(value);
2719 assert!(result.contains("John"));
2720 assert!(result.contains("30"));
2721 }
2722
2723 #[test]
2724 fn test_format_return_value_empty_collections() {
2725 let value = Some(serde_json::json!({}));
2726 assert_eq!(PoolManager::format_return_value(value), "{}");
2727 let value = Some(serde_json::json!([]));
2728 assert_eq!(PoolManager::format_return_value(value), "[]");
2729 }
2730
2731 #[test]
2732 fn test_parse_health_result_zero_values() {
2733 let json = serde_json::json!({
2734 "status": "starting",
2735 "uptime": 0,
2736 "memory": { "heapUsed": 0 },
2737 "pool": { "completed": 0, "queued": 0 },
2738 "execution": { "successRate": 0.0 }
2739 });
2740 let result = PoolManager::parse_health_result(&json);
2741 assert_eq!(result.status, "starting");
2742 assert_eq!(result.uptime_ms, Some(0));
2743 assert_eq!(result.memory, Some(0));
2744 assert_eq!(result.pool_completed, Some(0));
2745 assert_eq!(result.pool_queued, Some(0));
2746 assert_eq!(result.success_rate, Some(0.0));
2747 }
2748
2749 #[test]
2750 fn test_calculate_heap_size_precise_calculations() {
2751 assert_eq!(PoolManager::calculate_heap_size(0), 512);
2752 assert_eq!(PoolManager::calculate_heap_size(1), 512);
2753 assert_eq!(PoolManager::calculate_heap_size(10), 544);
2754 assert_eq!(PoolManager::calculate_heap_size(20), 576);
2755 assert_eq!(PoolManager::calculate_heap_size(100), 832);
2756 assert_eq!(PoolManager::calculate_heap_size(200), 1152);
2757 }
2758
2759 #[tokio::test]
2760 async fn test_pool_manager_health_check_flag_initial() {
2761 let manager = PoolManager::new();
2762 assert!(!manager.health_check_needed.load(Ordering::Relaxed));
2763 }
2764
2765 #[tokio::test]
2766 async fn test_pool_manager_consecutive_failures_initial() {
2767 let manager = PoolManager::new();
2768 assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 0);
2769 }
2770
2771 #[tokio::test]
2772 async fn test_recovery_allowance_bounds() {
2773 let manager = PoolManager::new();
2774 manager.recovery_allowance.store(0, Ordering::Relaxed);
2775 assert_eq!(manager.recovery_allowance_percent(), 0);
2776 manager.recovery_allowance.store(50, Ordering::Relaxed);
2777 assert_eq!(manager.recovery_allowance_percent(), 50);
2778 manager.recovery_allowance.store(100, Ordering::Relaxed);
2779 assert_eq!(manager.recovery_allowance_percent(), 100);
2780 }
2781
2782 #[tokio::test]
2783 async fn test_is_initialized_changes_with_state() {
2784 let manager = PoolManager::with_socket_path("/tmp/init-test-123.sock".to_string());
2785 assert!(!manager.is_initialized().await);
2786 manager.initialized.store(true, Ordering::Release);
2787 assert!(manager.is_initialized().await);
2788 manager.initialized.store(false, Ordering::Release);
2789 assert!(!manager.is_initialized().await);
2790 }
2791
2792 #[test]
2797 fn test_is_dead_server_error_with_script_timeout() {
2798 let err = PluginError::ScriptTimeout(30);
2800 assert!(!PoolManager::is_dead_server_error(&err));
2801 }
2802
2803 #[test]
2804 fn test_is_dead_server_error_with_plugin_error() {
2805 let err = PluginError::PluginError("some plugin error".to_string());
2806 assert!(!PoolManager::is_dead_server_error(&err));
2807 }
2808
2809 #[test]
2810 fn test_is_dead_server_error_with_connection_timeout_in_plugin_error() {
2811 let err = PluginError::PluginExecutionError("connection timed out".to_string());
2816 assert!(!PoolManager::is_dead_server_error(&err));
2819
2820 let err = PluginError::SocketError("connect timed out".to_string());
2823 assert!(PoolManager::is_dead_server_error(&err));
2824 }
2825
2826 #[test]
2827 fn test_parse_pool_response_success_with_logs_various_levels() {
2828 use super::super::protocol::{PoolLogEntry, PoolResponse};
2829
2830 let response = PoolResponse {
2831 task_id: "test-levels".to_string(),
2832 success: true,
2833 result: Some(serde_json::json!("ok")),
2834 error: None,
2835 logs: Some(vec![
2836 PoolLogEntry {
2837 level: "log".to_string(),
2838 message: "log level".to_string(),
2839 },
2840 PoolLogEntry {
2841 level: "debug".to_string(),
2842 message: "debug level".to_string(),
2843 },
2844 PoolLogEntry {
2845 level: "info".to_string(),
2846 message: "info level".to_string(),
2847 },
2848 PoolLogEntry {
2849 level: "warn".to_string(),
2850 message: "warn level".to_string(),
2851 },
2852 PoolLogEntry {
2853 level: "error".to_string(),
2854 message: "error level".to_string(),
2855 },
2856 PoolLogEntry {
2857 level: "result".to_string(),
2858 message: "result level".to_string(),
2859 },
2860 ]),
2861 };
2862
2863 let result = PoolManager::parse_pool_response(response).unwrap();
2864 assert_eq!(result.logs.len(), 6);
2865 assert_eq!(result.logs[0].level, LogLevel::Log);
2866 assert_eq!(result.logs[1].level, LogLevel::Debug);
2867 assert_eq!(result.logs[2].level, LogLevel::Info);
2868 assert_eq!(result.logs[3].level, LogLevel::Warn);
2869 assert_eq!(result.logs[4].level, LogLevel::Error);
2870 assert_eq!(result.logs[5].level, LogLevel::Result);
2871 }
2872
2873 #[test]
2874 fn test_parse_error_response_defaults() {
2875 use super::super::protocol::PoolResponse;
2876
2877 let response = PoolResponse {
2879 task_id: "no-error".to_string(),
2880 success: false,
2881 result: None,
2882 error: None,
2883 logs: None,
2884 };
2885
2886 let err = PoolManager::parse_error_response(response);
2887 match err {
2888 PluginError::HandlerError(payload) => {
2889 assert_eq!(payload.message, "Unknown error");
2890 assert_eq!(payload.status, 500);
2891 assert!(payload.code.is_none());
2892 assert!(payload.details.is_none());
2893 }
2894 _ => panic!("Expected HandlerError"),
2895 }
2896 }
2897
2898 #[test]
2899 fn test_format_return_value_float() {
2900 let value = Some(serde_json::json!(3.14159));
2901 let result = PoolManager::format_return_value(value);
2902 assert!(result.contains("3.14159"));
2903 }
2904
2905 #[test]
2906 fn test_format_return_value_large_array() {
2907 let value = Some(serde_json::json!([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
2908 let result = PoolManager::format_return_value(value);
2909 assert_eq!(result, "[1,2,3,4,5,6,7,8,9,10]");
2910 }
2911
2912 #[test]
2913 fn test_format_return_value_string_with_special_chars() {
2914 let value = Some(serde_json::json!("hello\nworld\ttab"));
2915 assert_eq!(PoolManager::format_return_value(value), "hello\nworld\ttab");
2916 }
2917
2918 #[test]
2919 fn test_format_return_value_unicode() {
2920 let value = Some(serde_json::json!("こんにちは世界 🌍"));
2921 assert_eq!(PoolManager::format_return_value(value), "こんにちは世界 🌍");
2922 }
2923
2924 #[test]
2925 fn test_parse_health_result_large_values() {
2926 let json = serde_json::json!({
2927 "status": "healthy",
2928 "uptime": 999999999999_u64,
2929 "memory": { "heapUsed": 9999999999_u64 },
2930 "pool": { "completed": 999999999_u64, "queued": 999999_u64 },
2931 "execution": { "successRate": 0.999999 }
2932 });
2933
2934 let result = PoolManager::parse_health_result(&json);
2935 assert_eq!(result.status, "healthy");
2936 assert_eq!(result.uptime_ms, Some(999999999999));
2937 assert_eq!(result.memory, Some(9999999999));
2938 assert_eq!(result.pool_completed, Some(999999999));
2939 assert_eq!(result.pool_queued, Some(999999));
2940 assert!((result.success_rate.unwrap() - 0.999999).abs() < 0.0000001);
2941 }
2942
2943 #[test]
2944 fn test_parse_health_result_negative_values_treated_as_none() {
2945 let json = serde_json::json!({
2947 "status": "error",
2948 "uptime": -1,
2949 "memory": { "heapUsed": -100 }
2950 });
2951
2952 let result = PoolManager::parse_health_result(&json);
2953 assert_eq!(result.status, "error");
2954 assert_eq!(result.uptime_ms, None); assert_eq!(result.memory, None);
2956 }
2957
2958 #[test]
2959 fn test_parsed_health_result_debug() {
2960 let result = ParsedHealthResult {
2961 status: "test".to_string(),
2962 uptime_ms: Some(100),
2963 memory: Some(200),
2964 pool_completed: Some(50),
2965 pool_queued: Some(5),
2966 success_rate: Some(0.95),
2967 };
2968
2969 let debug_str = format!("{result:?}");
2970 assert!(debug_str.contains("test"));
2971 assert!(debug_str.contains("100"));
2972 assert!(debug_str.contains("200"));
2973 }
2974
2975 #[test]
2976 fn test_calculate_heap_size_boundary_values() {
2977 assert_eq!(PoolManager::calculate_heap_size(9), 512);
2980 assert_eq!(PoolManager::calculate_heap_size(10), 544);
2982
2983 assert_eq!(PoolManager::calculate_heap_size(2400), 8192);
2986 assert_eq!(PoolManager::calculate_heap_size(2399), 8160);
2988 }
2989
2990 #[tokio::test]
2991 async fn test_pool_manager_socket_path_format() {
2992 let manager = PoolManager::new();
2993 assert!(manager.socket_path.starts_with("/tmp/relayer-plugin-pool-"));
2995 assert!(manager.socket_path.ends_with(".sock"));
2996 let uuid_part = manager
2998 .socket_path
2999 .strip_prefix("/tmp/relayer-plugin-pool-")
3000 .unwrap()
3001 .strip_suffix(".sock")
3002 .unwrap();
3003 assert_eq!(uuid_part.len(), 36);
3004 }
3005
3006 #[tokio::test]
3007 async fn test_health_check_socket_missing() {
3008 let manager =
3009 PoolManager::with_socket_path("/tmp/nonexistent-socket-12345.sock".to_string());
3010 manager.initialized.store(true, Ordering::Release);
3012
3013 let health = manager.health_check().await.unwrap();
3014 assert!(!health.healthy);
3015 assert_eq!(health.status, "socket_missing");
3016 }
3017
3018 #[test]
3019 fn test_is_dead_server_error_embedded_patterns() {
3020 let err = PluginError::PluginExecutionError(
3022 "Error: ECONNREFUSED connection refused at 127.0.0.1:3000".to_string(),
3023 );
3024 assert!(PoolManager::is_dead_server_error(&err));
3025
3026 let err = PluginError::PluginExecutionError(
3027 "SocketError: broken pipe while writing to /tmp/socket".to_string(),
3028 );
3029 assert!(PoolManager::is_dead_server_error(&err));
3030
3031 let err = PluginError::PluginExecutionError(
3032 "IO Error: No such file or directory (os error 2)".to_string(),
3033 );
3034 assert!(PoolManager::is_dead_server_error(&err));
3035 }
3036
3037 #[test]
3038 fn test_is_dead_server_error_mixed_case_timeout_patterns() {
3039 let variants = vec![
3041 "HANDLER TIMED OUT",
3042 "Handler Timed Out after 30s",
3043 "the handler timed out waiting for response",
3044 ];
3045
3046 for msg in variants {
3047 let err = PluginError::PluginExecutionError(msg.to_string());
3048 assert!(
3049 !PoolManager::is_dead_server_error(&err),
3050 "Expected '{msg}' to NOT be dead server error"
3051 );
3052 }
3053 }
3054
3055 #[tokio::test]
3056 async fn test_ensure_started_idempotent() {
3057 let manager = PoolManager::with_socket_path("/tmp/idempotent-test-999.sock".to_string());
3058
3059 assert!(!manager.is_initialized().await);
3061
3062 manager.initialized.store(true, Ordering::Release);
3064
3065 let result = manager.ensure_started().await;
3067 assert!(result.is_ok());
3068 assert!(manager.is_initialized().await);
3069 }
3070
3071 #[test]
3072 fn test_queued_request_with_headers() {
3073 let (tx, _rx) = oneshot::channel();
3074
3075 let mut headers = HashMap::new();
3076 headers.insert(
3077 "Authorization".to_string(),
3078 vec!["Bearer token".to_string()],
3079 );
3080 headers.insert(
3081 "Content-Type".to_string(),
3082 vec!["application/json".to_string()],
3083 );
3084
3085 let request = QueuedRequest {
3086 plugin_id: "headers-test".to_string(),
3087 compiled_code: None,
3088 plugin_path: Some("/path/to/plugin.ts".to_string()),
3089 params: serde_json::json!({}),
3090 headers: Some(headers),
3091 socket_path: "/tmp/test.sock".to_string(),
3092 http_request_id: None,
3093 timeout_secs: None,
3094 route: None,
3095 config: None,
3096 method: None,
3097 query: None,
3098 response_tx: tx,
3099 };
3100
3101 assert!(request.headers.is_some());
3102 let headers = request.headers.unwrap();
3103 assert!(headers.contains_key("Authorization"));
3104 assert!(headers.contains_key("Content-Type"));
3105 }
3106
3107 #[test]
3108 fn test_plugin_error_display_formats() {
3109 let err = PluginError::SocketError("test socket error".to_string());
3111 assert!(format!("{err}").contains("Socket error"));
3112
3113 let err = PluginError::PluginExecutionError("test execution error".to_string());
3114 assert!(format!("{err}").contains("test execution error"));
3115
3116 let err = PluginError::ScriptTimeout(60);
3117 assert!(format!("{err}").contains("60"));
3118
3119 let err = PluginError::PluginError("test plugin error".to_string());
3120 assert!(format!("{err}").contains("test plugin error"));
3121 }
3122
3123 #[test]
3124 fn test_pool_log_entry_to_log_entry_conversion() {
3125 use super::super::protocol::PoolLogEntry;
3126
3127 let pool_log = PoolLogEntry {
3129 level: "info".to_string(),
3130 message: "test message".to_string(),
3131 };
3132
3133 let log_entry: LogEntry = pool_log.into();
3134 assert_eq!(log_entry.level, LogLevel::Info);
3135 assert_eq!(log_entry.message, "test message");
3136
3137 let pool_log = PoolLogEntry {
3139 level: "unknown_level".to_string(),
3140 message: "unknown level message".to_string(),
3141 };
3142
3143 let log_entry: LogEntry = pool_log.into();
3144 assert_eq!(log_entry.level, LogLevel::Log); }
3146
3147 #[tokio::test]
3148 async fn test_circuit_breaker_records_success() {
3149 let manager = PoolManager::new();
3150
3151 manager.circuit_breaker.record_success(100);
3153 manager.circuit_breaker.record_success(150);
3154 manager.circuit_breaker.record_success(200);
3155
3156 let avg = manager.avg_response_time_ms();
3158 assert!(avg > 0);
3159 }
3160
3161 #[tokio::test]
3162 async fn test_circuit_breaker_state_transitions() {
3163 let manager = PoolManager::new();
3164
3165 assert_eq!(manager.circuit_state(), CircuitState::Closed);
3167
3168 for _ in 0..20 {
3170 manager.circuit_breaker.record_failure();
3171 }
3172
3173 let state = manager.circuit_state();
3175 assert!(matches!(
3176 state,
3177 CircuitState::Closed | CircuitState::HalfOpen | CircuitState::Open
3178 ));
3179 }
3180
3181 #[tokio::test]
3182 async fn test_recovery_mode_activation() {
3183 let manager = PoolManager::new();
3184
3185 manager.recovery_allowance.store(10, Ordering::Relaxed);
3187 manager.recovery_mode.store(true, Ordering::Relaxed);
3188
3189 assert!(manager.is_recovering());
3190 assert_eq!(manager.recovery_allowance_percent(), 10);
3191
3192 manager.recovery_allowance.store(50, Ordering::Relaxed);
3194 assert_eq!(manager.recovery_allowance_percent(), 50);
3195
3196 manager.recovery_mode.store(false, Ordering::Relaxed);
3198 assert!(!manager.is_recovering());
3199 }
3200
3201 #[test]
3202 fn test_parse_pool_response_with_empty_logs() {
3203 use super::super::protocol::PoolResponse;
3204
3205 let response = PoolResponse {
3206 task_id: "empty-logs".to_string(),
3207 success: true,
3208 result: Some(serde_json::json!("done")),
3209 error: None,
3210 logs: Some(vec![]), };
3212
3213 let result = PoolManager::parse_pool_response(response).unwrap();
3214 assert!(result.logs.is_empty());
3215 assert_eq!(result.return_value, "done");
3216 }
3217
3218 #[test]
3219 fn test_handler_payload_with_complex_details() {
3220 let payload = PluginHandlerPayload {
3221 message: "Complex error".to_string(),
3222 status: 400,
3223 code: Some("VALIDATION_ERROR".to_string()),
3224 details: Some(serde_json::json!({
3225 "errors": [
3226 {"field": "email", "code": "invalid", "message": "Invalid email format"},
3227 {"field": "password", "code": "weak", "message": "Password too weak"}
3228 ],
3229 "metadata": {
3230 "requestId": "req-123",
3231 "timestamp": "2024-01-01T00:00:00Z"
3232 }
3233 })),
3234 logs: None,
3235 traces: None,
3236 };
3237
3238 assert_eq!(payload.status, 400);
3239 let details = payload.details.unwrap();
3240 assert!(details.get("errors").is_some());
3241 assert!(details.get("metadata").is_some());
3242 }
3243
3244 #[test]
3245 fn test_health_status_construction_healthy() {
3246 use super::super::health::HealthStatus;
3247
3248 let status = HealthStatus {
3249 healthy: true,
3250 status: "ok".to_string(),
3251 uptime_ms: Some(1000000),
3252 memory: Some(500000000),
3253 pool_completed: Some(1000),
3254 pool_queued: Some(5),
3255 success_rate: Some(0.99),
3256 circuit_state: Some("closed".to_string()),
3257 avg_response_time_ms: Some(50),
3258 recovering: Some(false),
3259 recovery_percent: Some(100),
3260 shared_socket_available_slots: Some(100),
3261 shared_socket_active_connections: Some(10),
3262 shared_socket_registered_executions: Some(5),
3263 connection_pool_available_slots: Some(50),
3264 connection_pool_active_connections: Some(5),
3265 };
3266
3267 assert!(status.healthy);
3268 assert_eq!(status.status, "ok");
3269 assert_eq!(status.uptime_ms, Some(1000000));
3270 assert_eq!(status.circuit_state, Some("closed".to_string()));
3271 }
3272
3273 #[test]
3274 fn test_health_status_construction_unhealthy() {
3275 use super::super::health::HealthStatus;
3276
3277 let status = HealthStatus {
3278 healthy: false,
3279 status: "connection_failed".to_string(),
3280 uptime_ms: None,
3281 memory: None,
3282 pool_completed: None,
3283 pool_queued: None,
3284 success_rate: None,
3285 circuit_state: Some("open".to_string()),
3286 avg_response_time_ms: Some(0),
3287 recovering: Some(true),
3288 recovery_percent: Some(10),
3289 shared_socket_available_slots: None,
3290 shared_socket_active_connections: None,
3291 shared_socket_registered_executions: None,
3292 connection_pool_available_slots: None,
3293 connection_pool_active_connections: None,
3294 };
3295
3296 assert!(!status.healthy);
3297 assert_eq!(status.status, "connection_failed");
3298 assert!(status.uptime_ms.is_none());
3299 }
3300
3301 #[test]
3302 fn test_health_status_debug_format() {
3303 use super::super::health::HealthStatus;
3304
3305 let status = HealthStatus {
3306 healthy: true,
3307 status: "test".to_string(),
3308 uptime_ms: Some(100),
3309 memory: None,
3310 pool_completed: None,
3311 pool_queued: None,
3312 success_rate: None,
3313 circuit_state: None,
3314 avg_response_time_ms: None,
3315 recovering: None,
3316 recovery_percent: None,
3317 shared_socket_available_slots: None,
3318 shared_socket_active_connections: None,
3319 shared_socket_registered_executions: None,
3320 connection_pool_available_slots: None,
3321 connection_pool_active_connections: None,
3322 };
3323
3324 let debug_str = format!("{status:?}");
3325 assert!(debug_str.contains("healthy: true"));
3326 assert!(debug_str.contains("test"));
3327 }
3328
3329 #[test]
3330 fn test_health_status_clone() {
3331 use super::super::health::HealthStatus;
3332
3333 let status = HealthStatus {
3334 healthy: true,
3335 status: "original".to_string(),
3336 uptime_ms: Some(500),
3337 memory: Some(100),
3338 pool_completed: Some(10),
3339 pool_queued: Some(1),
3340 success_rate: Some(0.95),
3341 circuit_state: Some("closed".to_string()),
3342 avg_response_time_ms: Some(25),
3343 recovering: Some(false),
3344 recovery_percent: Some(100),
3345 shared_socket_available_slots: Some(50),
3346 shared_socket_active_connections: Some(2),
3347 shared_socket_registered_executions: Some(1),
3348 connection_pool_available_slots: Some(25),
3349 connection_pool_active_connections: Some(1),
3350 };
3351
3352 let cloned = status.clone();
3353 assert_eq!(cloned.healthy, status.healthy);
3354 assert_eq!(cloned.status, status.status);
3355 assert_eq!(cloned.uptime_ms, status.uptime_ms);
3356 }
3357
3358 #[test]
3359 fn test_execute_request_debug() {
3360 use super::super::protocol::ExecuteRequest;
3361
3362 let request = ExecuteRequest {
3363 task_id: "debug-test".to_string(),
3364 plugin_id: "test-plugin".to_string(),
3365 compiled_code: None,
3366 plugin_path: Some("/path/to/plugin.ts".to_string()),
3367 params: serde_json::json!({"test": true}),
3368 headers: None,
3369 socket_path: "/tmp/test.sock".to_string(),
3370 http_request_id: None,
3371 timeout: None,
3372 route: None,
3373 config: None,
3374 method: None,
3375 query: None,
3376 };
3377
3378 let debug_str = format!("{request:?}");
3379 assert!(debug_str.contains("debug-test"));
3380 assert!(debug_str.contains("test-plugin"));
3381 }
3382
3383 #[test]
3384 fn test_pool_error_debug() {
3385 use super::super::protocol::PoolError;
3386
3387 let error = PoolError {
3388 message: "Test error".to_string(),
3389 code: Some("TEST_ERR".to_string()),
3390 status: Some(400),
3391 details: Some(serde_json::json!({"info": "test"})),
3392 };
3393
3394 let debug_str = format!("{error:?}");
3395 assert!(debug_str.contains("Test error"));
3396 assert!(debug_str.contains("TEST_ERR"));
3397 }
3398
3399 #[test]
3400 fn test_pool_response_debug() {
3401 use super::super::protocol::PoolResponse;
3402
3403 let response = PoolResponse {
3404 task_id: "resp-123".to_string(),
3405 success: true,
3406 result: Some(serde_json::json!("result")),
3407 error: None,
3408 logs: None,
3409 };
3410
3411 let debug_str = format!("{response:?}");
3412 assert!(debug_str.contains("resp-123"));
3413 assert!(debug_str.contains("true"));
3414 }
3415
3416 #[test]
3417 fn test_pool_log_entry_debug() {
3418 use super::super::protocol::PoolLogEntry;
3419
3420 let entry = PoolLogEntry {
3421 level: "info".to_string(),
3422 message: "Test message".to_string(),
3423 };
3424
3425 let debug_str = format!("{entry:?}");
3426 assert!(debug_str.contains("info"));
3427 assert!(debug_str.contains("Test message"));
3428 }
3429
3430 #[test]
3431 fn test_circuit_breaker_default_trait() {
3432 use super::super::health::CircuitBreaker;
3433
3434 let cb = CircuitBreaker::default();
3435 assert_eq!(cb.state(), CircuitState::Closed);
3436 }
3437
3438 #[test]
3439 fn test_circuit_breaker_set_state_all_variants() {
3440 use super::super::health::CircuitBreaker;
3441
3442 let cb = CircuitBreaker::new();
3443
3444 cb.set_state(CircuitState::HalfOpen);
3446 assert_eq!(cb.state(), CircuitState::HalfOpen);
3447
3448 cb.set_state(CircuitState::Open);
3449 assert_eq!(cb.state(), CircuitState::Open);
3450
3451 cb.set_state(CircuitState::Closed);
3452 assert_eq!(cb.state(), CircuitState::Closed);
3453 }
3454
3455 #[test]
3456 fn test_circuit_breaker_failure_rate_triggers_open() {
3457 use super::super::health::CircuitBreaker;
3458
3459 let cb = CircuitBreaker::new();
3460
3461 for _ in 0..100 {
3463 cb.record_failure();
3464 }
3465
3466 assert_eq!(cb.state(), CircuitState::Open);
3467 }
3468
3469 #[test]
3470 fn test_circuit_breaker_low_failure_rate_stays_closed() {
3471 use super::super::health::CircuitBreaker;
3472
3473 let cb = CircuitBreaker::new();
3474
3475 for _ in 0..90 {
3477 cb.record_success(50);
3478 }
3479 for _ in 0..10 {
3480 cb.record_failure();
3481 }
3482
3483 assert_eq!(cb.state(), CircuitState::Closed);
3485 }
3486
3487 #[test]
3488 fn test_circuit_breaker_ema_response_time() {
3489 use super::super::health::CircuitBreaker;
3490
3491 let cb = CircuitBreaker::new();
3492
3493 cb.record_success(100);
3495 let avg1 = cb.avg_response_time();
3496
3497 cb.record_success(100);
3498 cb.record_success(100);
3499 cb.record_success(100);
3500 let avg2 = cb.avg_response_time();
3501
3502 assert!(avg1 > 0);
3504 assert!(avg2 > 0);
3505 assert!(avg2 <= 100);
3506 }
3507
3508 #[test]
3509 fn test_circuit_breaker_force_close_resets_counters() {
3510 use super::super::health::CircuitBreaker;
3511
3512 let cb = CircuitBreaker::new();
3513 cb.set_state(CircuitState::Open);
3514
3515 cb.force_close();
3516
3517 assert_eq!(cb.state(), CircuitState::Closed);
3518 }
3519
3520 #[test]
3521 fn test_process_status_debug() {
3522 use super::super::health::ProcessStatus;
3523
3524 assert_eq!(format!("{:?}", ProcessStatus::Running), "Running");
3525 assert_eq!(format!("{:?}", ProcessStatus::Exited), "Exited");
3526 assert_eq!(format!("{:?}", ProcessStatus::Unknown), "Unknown");
3527 assert_eq!(format!("{:?}", ProcessStatus::NoProcess), "NoProcess");
3528 }
3529
3530 #[test]
3531 fn test_process_status_clone() {
3532 use super::super::health::ProcessStatus;
3533
3534 let status = ProcessStatus::Running;
3535 let cloned = status;
3536 assert_eq!(status, cloned);
3537 }
3538
3539 #[test]
3544 fn test_dead_server_indicator_all_variants() {
3545 use super::super::health::DeadServerIndicator;
3546
3547 let variants = [
3549 ("eof while parsing", DeadServerIndicator::EofWhileParsing),
3550 ("broken pipe", DeadServerIndicator::BrokenPipe),
3551 ("connection refused", DeadServerIndicator::ConnectionRefused),
3552 ("connection reset", DeadServerIndicator::ConnectionReset),
3553 ("not connected", DeadServerIndicator::NotConnected),
3554 ("failed to connect", DeadServerIndicator::FailedToConnect),
3555 (
3556 "socket file missing",
3557 DeadServerIndicator::SocketFileMissing,
3558 ),
3559 ("no such file", DeadServerIndicator::NoSuchFile),
3560 (
3561 "connection timed out",
3562 DeadServerIndicator::ConnectionTimedOut,
3563 ),
3564 ("connect timed out", DeadServerIndicator::ConnectionTimedOut),
3565 ];
3566
3567 for (pattern, expected) in variants {
3568 let result = DeadServerIndicator::from_error_str(pattern);
3569 assert_eq!(result, Some(expected), "Pattern '{pattern}' should match");
3570 }
3571 }
3572
3573 #[test]
3574 fn test_dead_server_indicator_debug_format() {
3575 use super::super::health::DeadServerIndicator;
3576
3577 let indicator = DeadServerIndicator::BrokenPipe;
3578 let debug_str = format!("{indicator:?}");
3579 assert_eq!(debug_str, "BrokenPipe");
3580 }
3581
3582 #[test]
3583 fn test_dead_server_indicator_clone_copy() {
3584 use super::super::health::DeadServerIndicator;
3585
3586 let indicator = DeadServerIndicator::ConnectionRefused;
3587 let cloned = indicator;
3588 assert_eq!(indicator, cloned);
3589 }
3590
3591 #[test]
3592 fn test_result_ring_buffer_not_enough_data() {
3593 use super::super::health::ResultRingBuffer;
3594
3595 let buffer = ResultRingBuffer::new(100);
3596
3597 for _ in 0..9 {
3599 buffer.record(false);
3600 }
3601
3602 assert_eq!(buffer.failure_rate(), 0.0);
3604 }
3605
3606 #[test]
3607 fn test_result_ring_buffer_exactly_10_samples() {
3608 use super::super::health::ResultRingBuffer;
3609
3610 let buffer = ResultRingBuffer::new(100);
3611
3612 for _ in 0..10 {
3614 buffer.record(false);
3615 }
3616
3617 assert_eq!(buffer.failure_rate(), 1.0);
3619 }
3620
3621 #[test]
3622 fn test_result_ring_buffer_wraps_correctly() {
3623 use super::super::health::ResultRingBuffer;
3624
3625 let buffer = ResultRingBuffer::new(10);
3626
3627 for _ in 0..10 {
3629 buffer.record(true);
3630 }
3631 assert_eq!(buffer.failure_rate(), 0.0);
3632
3633 for _ in 0..10 {
3635 buffer.record(false);
3636 }
3637 assert_eq!(buffer.failure_rate(), 1.0);
3638 }
3639
3640 #[test]
3641 fn test_circuit_state_equality_all_pairs() {
3642 assert_eq!(CircuitState::Closed, CircuitState::Closed);
3643 assert_eq!(CircuitState::HalfOpen, CircuitState::HalfOpen);
3644 assert_eq!(CircuitState::Open, CircuitState::Open);
3645
3646 assert_ne!(CircuitState::Closed, CircuitState::HalfOpen);
3647 assert_ne!(CircuitState::Closed, CircuitState::Open);
3648 assert_ne!(CircuitState::HalfOpen, CircuitState::Open);
3649 }
3650
3651 #[test]
3652 fn test_circuit_state_clone_copy() {
3653 let state = CircuitState::HalfOpen;
3654 let copied = state;
3655 assert_eq!(state, copied);
3656 }
3657
3658 #[test]
3659 fn test_parse_pool_response_with_null_values() {
3660 use super::super::protocol::PoolResponse;
3661
3662 let response = PoolResponse {
3663 task_id: "null-test".to_string(),
3664 success: true,
3665 result: Some(serde_json::json!(null)),
3666 error: None,
3667 logs: None,
3668 };
3669
3670 let result = PoolManager::parse_pool_response(response).unwrap();
3671 assert_eq!(result.return_value, "null");
3672 }
3673
3674 #[test]
3675 fn test_parse_pool_response_with_nested_result() {
3676 use super::super::protocol::PoolResponse;
3677
3678 let response = PoolResponse {
3679 task_id: "nested-test".to_string(),
3680 success: true,
3681 result: Some(serde_json::json!({
3682 "level1": {
3683 "level2": {
3684 "level3": "deep value"
3685 }
3686 }
3687 })),
3688 error: None,
3689 logs: None,
3690 };
3691
3692 let result = PoolManager::parse_pool_response(response).unwrap();
3693 assert!(result.return_value.contains("level1"));
3694 assert!(result.return_value.contains("level2"));
3695 assert!(result.return_value.contains("level3"));
3696 assert!(result.return_value.contains("deep value"));
3697 }
3698
3699 #[test]
3700 fn test_parse_pool_response_error_with_details() {
3701 use super::super::protocol::{PoolError, PoolResponse};
3702
3703 let response = PoolResponse {
3704 task_id: "error-details".to_string(),
3705 success: false,
3706 result: None,
3707 error: Some(PoolError {
3708 message: "Error with details".to_string(),
3709 code: Some("DETAILED_ERROR".to_string()),
3710 status: Some(422),
3711 details: Some(serde_json::json!({
3712 "field": "email",
3713 "expected": "string",
3714 "received": "number"
3715 })),
3716 }),
3717 logs: None,
3718 };
3719
3720 let err = PoolManager::parse_pool_response(response).unwrap_err();
3721 match err {
3722 PluginError::HandlerError(payload) => {
3723 assert_eq!(payload.message, "Error with details");
3724 assert_eq!(payload.code, Some("DETAILED_ERROR".to_string()));
3725 assert!(payload.details.is_some());
3726 let details = payload.details.unwrap();
3727 assert_eq!(details.get("field").unwrap(), "email");
3728 }
3729 _ => panic!("Expected HandlerError"),
3730 }
3731 }
3732
3733 #[test]
3734 fn test_parse_health_result_with_all_optional_fields() {
3735 let json = serde_json::json!({
3736 "status": "healthy",
3737 "uptime": 999999,
3738 "memory": {
3739 "heapUsed": 123456789,
3740 "heapTotal": 987654321,
3741 "external": 111111,
3742 "arrayBuffers": 222222
3743 },
3744 "pool": {
3745 "completed": 50000,
3746 "queued": 100,
3747 "active": 50,
3748 "waiting": 25
3749 },
3750 "execution": {
3751 "successRate": 0.9999,
3752 "avgDuration": 45.5,
3753 "totalExecutions": 100000
3754 }
3755 });
3756
3757 let result = PoolManager::parse_health_result(&json);
3758 assert_eq!(result.status, "healthy");
3759 assert_eq!(result.uptime_ms, Some(999999));
3760 assert_eq!(result.memory, Some(123456789));
3761 assert_eq!(result.pool_completed, Some(50000));
3762 assert_eq!(result.pool_queued, Some(100));
3763 assert!((result.success_rate.unwrap() - 0.9999).abs() < 0.0001);
3764 }
3765
3766 #[tokio::test]
3767 async fn test_pool_manager_max_queue_size() {
3768 let manager = PoolManager::new();
3769 assert!(manager.max_queue_size > 0);
3771 }
3772
3773 #[tokio::test]
3774 async fn test_pool_manager_last_restart_time_initial() {
3775 let manager = PoolManager::new();
3776 assert_eq!(manager.last_restart_time_ms.load(Ordering::Relaxed), 0);
3777 }
3778
3779 #[tokio::test]
3780 async fn test_pool_manager_connection_pool_exists() {
3781 let manager = PoolManager::new();
3782 let available = manager.connection_pool.semaphore.available_permits();
3784 assert!(available > 0);
3785 }
3786
3787 #[test]
3788 fn test_is_dead_server_error_with_whitespace() {
3789 let err = PluginError::SocketError(" connection refused ".to_string());
3791 assert!(PoolManager::is_dead_server_error(&err));
3792
3793 let err = PluginError::SocketError("error: broken pipe occurred".to_string());
3794 assert!(PoolManager::is_dead_server_error(&err));
3795 }
3796
3797 #[test]
3798 fn test_is_dead_server_error_multiline() {
3799 let err = PluginError::SocketError(
3801 "Error occurred\nConnection refused\nPlease retry".to_string(),
3802 );
3803 assert!(PoolManager::is_dead_server_error(&err));
3804 }
3805
3806 #[test]
3807 fn test_is_dead_server_error_json_in_message() {
3808 let err = PluginError::PluginExecutionError(
3810 r#"{"error": "connection refused", "code": 61}"#.to_string(),
3811 );
3812 assert!(PoolManager::is_dead_server_error(&err));
3813 }
3814
3815 #[test]
3816 fn test_format_return_value_special_json() {
3817 let value = Some(serde_json::json!(f64::MAX));
3819 let result = PoolManager::format_return_value(value);
3820 assert!(!result.is_empty());
3821
3822 let value = Some(serde_json::json!(i64::MIN));
3823 let result = PoolManager::format_return_value(value);
3824 assert!(result.contains("-"));
3825 }
3826
3827 #[test]
3828 fn test_format_return_value_with_escaped_chars() {
3829 let value = Some(serde_json::json!("line1\nline2\ttab\"quote"));
3830 let result = PoolManager::format_return_value(value);
3831 assert!(result.contains("line1"));
3832 assert!(result.contains("line2"));
3833 }
3834
3835 #[test]
3836 fn test_format_return_value_array_of_objects() {
3837 let value = Some(serde_json::json!([
3838 {"id": 1, "name": "first"},
3839 {"id": 2, "name": "second"}
3840 ]));
3841 let result = PoolManager::format_return_value(value);
3842 assert!(result.contains("first"));
3843 assert!(result.contains("second"));
3844 }
3845
3846 #[test]
3847 fn test_all_log_levels_conversion() {
3848 use super::super::protocol::PoolLogEntry;
3849
3850 let levels = [
3851 ("log", LogLevel::Log),
3852 ("debug", LogLevel::Debug),
3853 ("info", LogLevel::Info),
3854 ("warn", LogLevel::Warn),
3855 ("error", LogLevel::Error),
3856 ("result", LogLevel::Result),
3857 ("unknown_level", LogLevel::Log), ("LOG", LogLevel::Log), ("", LogLevel::Log), ];
3861
3862 for (input, expected) in levels {
3863 let entry = PoolLogEntry {
3864 level: input.to_string(),
3865 message: "test".to_string(),
3866 };
3867 let log_entry: LogEntry = entry.into();
3868 assert_eq!(
3869 log_entry.level, expected,
3870 "Level '{input}' should convert to {expected:?}"
3871 );
3872 }
3873 }
3874
3875 #[tokio::test]
3876 async fn test_pool_manager_health_check_flag_manipulation() {
3877 let manager = PoolManager::new();
3878
3879 manager.health_check_needed.store(true, Ordering::Relaxed);
3880 assert!(manager.health_check_needed.load(Ordering::Relaxed));
3881
3882 manager.health_check_needed.store(false, Ordering::Relaxed);
3883 assert!(!manager.health_check_needed.load(Ordering::Relaxed));
3884 }
3885
3886 #[tokio::test]
3887 async fn test_pool_manager_consecutive_failures_manipulation() {
3888 let manager = PoolManager::new();
3889
3890 manager.consecutive_failures.fetch_add(1, Ordering::Relaxed);
3891 assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 1);
3892
3893 manager.consecutive_failures.fetch_add(5, Ordering::Relaxed);
3894 assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 6);
3895
3896 manager.consecutive_failures.store(0, Ordering::Relaxed);
3897 assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 0);
3898 }
3899
3900 #[test]
3901 fn test_parsed_health_result_with_all_none() {
3902 let result = ParsedHealthResult {
3903 status: "minimal".to_string(),
3904 uptime_ms: None,
3905 memory: None,
3906 pool_completed: None,
3907 pool_queued: None,
3908 success_rate: None,
3909 };
3910
3911 assert_eq!(result.status, "minimal");
3912 assert!(result.uptime_ms.is_none());
3913 assert!(result.memory.is_none());
3914 }
3915
3916 #[test]
3917 fn test_parsed_health_result_with_all_some() {
3918 let result = ParsedHealthResult {
3919 status: "complete".to_string(),
3920 uptime_ms: Some(u64::MAX),
3921 memory: Some(u64::MAX),
3922 pool_completed: Some(u64::MAX),
3923 pool_queued: Some(u64::MAX),
3924 success_rate: Some(1.0),
3925 };
3926
3927 assert_eq!(result.status, "complete");
3928 assert_eq!(result.uptime_ms, Some(u64::MAX));
3929 assert_eq!(result.success_rate, Some(1.0));
3930 }
3931
3932 #[test]
3933 fn test_calculate_heap_size_extensive_values() {
3934 let test_cases = [
3936 (0, 512),
3937 (1, 512),
3938 (5, 512),
3939 (9, 512),
3940 (10, 544),
3941 (11, 544),
3942 (19, 544),
3943 (20, 576),
3944 (50, 672),
3945 (100, 832),
3946 (150, 992),
3947 (200, 1152),
3948 (250, 1312),
3949 (300, 1472),
3950 (400, 1792),
3951 (500, 2112),
3952 (1000, 3712),
3953 (2000, 6912),
3954 (2400, 8192), (3000, 8192), (5000, 8192), (10000, 8192), ];
3959
3960 for (concurrency, expected_heap) in test_cases {
3961 let heap = PoolManager::calculate_heap_size(concurrency);
3962 assert_eq!(
3963 heap, expected_heap,
3964 "Concurrency {concurrency} should give heap {expected_heap}"
3965 );
3966 }
3967 }
3968
3969 #[tokio::test]
3970 async fn test_pool_manager_drop_cleans_socket() {
3971 let socket_path = format!("/tmp/test-drop-{}.sock", uuid::Uuid::new_v4());
3972
3973 std::fs::write(&socket_path, "test").unwrap();
3975 assert!(std::path::Path::new(&socket_path).exists());
3976
3977 {
3979 let _manager = PoolManager::with_socket_path(socket_path.clone());
3980 }
3982 assert!(!std::path::Path::new(&socket_path).exists());
3986 }
3987
3988 #[test]
3989 fn test_script_result_with_traces() {
3990 let result = ScriptResult {
3991 logs: vec![],
3992 error: String::new(),
3993 return_value: "with traces".to_string(),
3994 trace: vec![
3995 serde_json::json!({"action": "GET", "url": "/api/test"}),
3996 serde_json::json!({"action": "POST", "url": "/api/submit"}),
3997 ],
3998 };
3999
4000 assert_eq!(result.trace.len(), 2);
4001 assert!(result.trace[0].get("action").is_some());
4002 }
4003
4004 #[test]
4005 fn test_script_result_with_error() {
4006 let result = ScriptResult {
4007 logs: vec![LogEntry {
4008 level: LogLevel::Error,
4009 message: "Something went wrong".to_string(),
4010 }],
4011 error: "RuntimeError: undefined is not a function".to_string(),
4012 return_value: String::new(),
4013 trace: vec![],
4014 };
4015
4016 assert!(!result.error.is_empty());
4017 assert!(result.error.contains("RuntimeError"));
4018 assert_eq!(result.logs.len(), 1);
4019 }
4020
4021 #[test]
4022 fn test_plugin_handler_payload_with_traces() {
4023 let payload = PluginHandlerPayload {
4024 message: "Error with traces".to_string(),
4025 status: 500,
4026 code: None,
4027 details: None,
4028 logs: None,
4029 traces: Some(vec![
4030 serde_json::json!({"method": "GET", "path": "/health"}),
4031 serde_json::json!({"method": "POST", "path": "/execute"}),
4032 ]),
4033 };
4034
4035 assert!(payload.traces.is_some());
4036 assert_eq!(payload.traces.as_ref().unwrap().len(), 2);
4037 }
4038
4039 #[test]
4040 fn test_queued_request_all_optional_fields() {
4041 let (tx, _rx) = oneshot::channel();
4042
4043 let mut headers = HashMap::new();
4044 headers.insert(
4045 "X-Custom".to_string(),
4046 vec!["value1".to_string(), "value2".to_string()],
4047 );
4048
4049 let request = QueuedRequest {
4050 plugin_id: "full-request".to_string(),
4051 compiled_code: Some("compiled code here".to_string()),
4052 plugin_path: Some("/path/to/plugin.ts".to_string()),
4053 params: serde_json::json!({"key": "value", "number": 42}),
4054 headers: Some(headers),
4055 socket_path: "/tmp/full.sock".to_string(),
4056 http_request_id: Some("http-123".to_string()),
4057 timeout_secs: Some(60),
4058 route: Some("/api/v1/execute".to_string()),
4059 config: Some(serde_json::json!({"setting": true})),
4060 method: Some("PUT".to_string()),
4061 query: Some(serde_json::json!({"page": 1, "limit": 10})),
4062 response_tx: tx,
4063 };
4064
4065 assert_eq!(request.plugin_id, "full-request");
4066 assert!(request.compiled_code.is_some());
4067 assert!(request.plugin_path.is_some());
4068 assert!(request.headers.is_some());
4069 assert_eq!(request.timeout_secs, Some(60));
4070 assert_eq!(request.method, Some("PUT".to_string()));
4071 }
4072}