openzeppelin_relayer/services/plugins/
pool_executor.rs

1//! Pool-based Plugin Executor
2//!
3//! This module provides execution of pre-compiled JavaScript plugins via
4//! a persistent Piscina worker pool, replacing the per-request ts-node approach.
5//!
6//! Communication with the Node.js pool server happens via Unix socket using
7//! a JSON-line protocol.
8
9use std::collections::HashMap;
10use std::process::Stdio;
11use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::io::{AsyncBufReadExt, BufReader};
15use tokio::process::{Child, Command};
16use tokio::sync::oneshot;
17use uuid::Uuid;
18
19use super::config::get_config;
20use super::connection::{ConnectionPool, PoolConnection};
21use super::health::{
22    CircuitBreaker, CircuitState, DeadServerIndicator, HealthStatus, ProcessStatus,
23};
24use super::protocol::{PoolError, PoolRequest, PoolResponse};
25use super::shared_socket::get_shared_socket_service;
26use super::{LogEntry, PluginError, PluginHandlerPayload, ScriptResult};
27
28/// Request queue entry for throttling
29struct QueuedRequest {
30    plugin_id: String,
31    compiled_code: Option<String>,
32    plugin_path: Option<String>,
33    params: serde_json::Value,
34    headers: Option<HashMap<String, Vec<String>>>,
35    socket_path: String,
36    http_request_id: Option<String>,
37    timeout_secs: Option<u64>,
38    route: Option<String>,
39    config: Option<serde_json::Value>,
40    method: Option<String>,
41    query: Option<serde_json::Value>,
42    response_tx: oneshot::Sender<Result<ScriptResult, PluginError>>,
43}
44
45/// Parsed health check result fields extracted from pool server JSON response.
46///
47/// This struct replaces a complex tuple return type to satisfy Clippy's
48/// `type_complexity` lint and improve readability.
49#[derive(Debug, Default, PartialEq)]
50pub struct ParsedHealthResult {
51    pub status: String,
52    pub uptime_ms: Option<u64>,
53    pub memory: Option<u64>,
54    pub pool_completed: Option<u64>,
55    pub pool_queued: Option<u64>,
56    pub success_rate: Option<f64>,
57}
58
59/// Manages the pool server process and connections
60pub struct PoolManager {
61    socket_path: String,
62    process: tokio::sync::Mutex<Option<Child>>,
63    initialized: Arc<AtomicBool>,
64    /// Lock to prevent concurrent restarts (thundering herd)
65    restart_lock: tokio::sync::Mutex<()>,
66    /// Connection pool for reusing connections
67    connection_pool: Arc<ConnectionPool>,
68    /// Request queue for throttling/backpressure (multi-consumer channel)
69    request_tx: async_channel::Sender<QueuedRequest>,
70    /// Actual configured queue size (for error messages)
71    max_queue_size: usize,
72    /// Flag indicating if health check is needed (set by background task)
73    health_check_needed: Arc<AtomicBool>,
74    /// Consecutive failure count for health checks
75    consecutive_failures: Arc<AtomicU32>,
76    /// Circuit breaker for automatic degradation under GC pressure
77    circuit_breaker: Arc<CircuitBreaker>,
78    /// Last successful restart time (for backoff calculation)
79    last_restart_time_ms: Arc<AtomicU64>,
80    /// Is currently in recovery mode (gradual ramp-up)
81    recovery_mode: Arc<AtomicBool>,
82    /// Requests allowed during recovery (gradual increase)
83    recovery_allowance: Arc<AtomicU32>,
84    /// Shutdown signal for background tasks (queue workers, health check, etc.)
85    shutdown_signal: Arc<tokio::sync::Notify>,
86}
87
88impl Default for PoolManager {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94impl PoolManager {
95    /// Base heap size in MB for the pool server process.
96    /// This provides the minimum memory needed for the Node.js runtime and core pool infrastructure.
97    const BASE_HEAP_MB: usize = 512;
98
99    /// Concurrency divisor for heap calculation.
100    /// Heap is incremented for every N concurrent requests to scale with load.
101    const CONCURRENCY_DIVISOR: usize = 10;
102
103    /// Heap increment in MB per CONCURRENCY_DIVISOR concurrent requests.
104    /// Formula: BASE_HEAP_MB + ((max_concurrency / CONCURRENCY_DIVISOR) * HEAP_INCREMENT_PER_DIVISOR_MB)
105    /// This accounts for additional memory needed per concurrent plugin execution context.
106    const HEAP_INCREMENT_PER_DIVISOR_MB: usize = 32;
107
108    /// Maximum heap size in MB (hard cap) for the pool server process.
109    /// Prevents excessive memory allocation that could cause system instability.
110    /// Set to 8GB (8192 MB) as a reasonable upper bound for Node.js processes.
111    const MAX_HEAP_MB: usize = 8192;
112
113    /// Calculate heap size based on concurrency level.
114    ///
115    /// Formula: BASE_HEAP_MB + ((max_concurrency / CONCURRENCY_DIVISOR) * HEAP_INCREMENT_PER_DIVISOR_MB)
116    /// Result is capped at MAX_HEAP_MB.
117    ///
118    /// This scales memory allocation with expected load while maintaining a reasonable minimum.
119    pub fn calculate_heap_size(max_concurrency: usize) -> usize {
120        let calculated = Self::BASE_HEAP_MB
121            + ((max_concurrency / Self::CONCURRENCY_DIVISOR) * Self::HEAP_INCREMENT_PER_DIVISOR_MB);
122        calculated.min(Self::MAX_HEAP_MB)
123    }
124
125    /// Format a result value from the pool response into a string.
126    ///
127    /// If the value is already a string, returns it directly.
128    /// Otherwise, serializes it to JSON.
129    pub fn format_return_value(value: Option<serde_json::Value>) -> String {
130        value
131            .map(|v| {
132                if v.is_string() {
133                    v.as_str().unwrap_or("").to_string()
134                } else {
135                    serde_json::to_string(&v).unwrap_or_default()
136                }
137            })
138            .unwrap_or_default()
139    }
140
141    /// Parse a successful pool response into a ScriptResult.
142    ///
143    /// Converts logs from PoolLogEntry to LogEntry and extracts the return value.
144    pub fn parse_success_response(response: PoolResponse) -> ScriptResult {
145        let logs: Vec<LogEntry> = response
146            .logs
147            .map(|logs| logs.into_iter().map(|l| l.into()).collect())
148            .unwrap_or_default();
149
150        ScriptResult {
151            logs,
152            error: String::new(),
153            return_value: Self::format_return_value(response.result),
154            trace: Vec::new(),
155        }
156    }
157
158    /// Parse a failed pool response into a PluginError.
159    ///
160    /// Extracts error details and converts logs for inclusion in the error payload.
161    pub fn parse_error_response(response: PoolResponse) -> PluginError {
162        let logs: Vec<LogEntry> = response
163            .logs
164            .map(|logs| logs.into_iter().map(|l| l.into()).collect())
165            .unwrap_or_default();
166
167        let error = response.error.unwrap_or(PoolError {
168            message: "Unknown error".to_string(),
169            code: None,
170            status: None,
171            details: None,
172        });
173
174        PluginError::HandlerError(Box::new(PluginHandlerPayload {
175            message: error.message,
176            status: error.status.unwrap_or(500),
177            code: error.code,
178            details: error.details,
179            logs: Some(logs),
180            traces: None,
181        }))
182    }
183
184    /// Parse a pool response into either a success result or an error.
185    ///
186    /// This is the main entry point for response parsing, dispatching to
187    /// either parse_success_response or parse_error_response based on the success flag.
188    pub fn parse_pool_response(response: PoolResponse) -> Result<ScriptResult, PluginError> {
189        if response.success {
190            Ok(Self::parse_success_response(response))
191        } else {
192            Err(Self::parse_error_response(response))
193        }
194    }
195
196    /// Parse health check result JSON into individual fields.
197    ///
198    /// Extracts status, uptime, memory usage, pool stats, and success rate
199    /// from the nested JSON structure returned by the pool server.
200    pub fn parse_health_result(result: &serde_json::Value) -> ParsedHealthResult {
201        ParsedHealthResult {
202            status: result
203                .get("status")
204                .and_then(|v| v.as_str())
205                .unwrap_or("unknown")
206                .to_string(),
207            uptime_ms: result.get("uptime").and_then(|v| v.as_u64()),
208            memory: result
209                .get("memory")
210                .and_then(|v| v.get("heapUsed"))
211                .and_then(|v| v.as_u64()),
212            pool_completed: result
213                .get("pool")
214                .and_then(|v| v.get("completed"))
215                .and_then(|v| v.as_u64()),
216            pool_queued: result
217                .get("pool")
218                .and_then(|v| v.get("queued"))
219                .and_then(|v| v.as_u64()),
220            success_rate: result
221                .get("execution")
222                .and_then(|v| v.get("successRate"))
223                .and_then(|v| v.as_f64()),
224        }
225    }
226
227    /// Create a new PoolManager with default socket path
228    pub fn new() -> Self {
229        Self::init(format!("/tmp/relayer-plugin-pool-{}.sock", Uuid::new_v4()))
230    }
231
232    /// Create a new PoolManager with custom socket path
233    pub fn with_socket_path(socket_path: String) -> Self {
234        Self::init(socket_path)
235    }
236
237    /// Common initialization logic
238    fn init(socket_path: String) -> Self {
239        let config = get_config();
240        let max_connections = config.pool_max_connections;
241        let max_queue_size = config.pool_max_queue_size;
242
243        let (tx, rx) = async_channel::bounded(max_queue_size);
244
245        let connection_pool = Arc::new(ConnectionPool::new(socket_path.clone(), max_connections));
246        let connection_pool_clone = connection_pool.clone();
247
248        let shutdown_signal = Arc::new(tokio::sync::Notify::new());
249
250        Self::spawn_queue_workers(
251            rx,
252            connection_pool_clone,
253            config.pool_workers,
254            shutdown_signal.clone(),
255        );
256
257        let health_check_needed = Arc::new(AtomicBool::new(false));
258        let consecutive_failures = Arc::new(AtomicU32::new(0));
259        let circuit_breaker = Arc::new(CircuitBreaker::new());
260        let last_restart_time_ms = Arc::new(AtomicU64::new(0));
261        let recovery_mode = Arc::new(AtomicBool::new(false));
262        let recovery_allowance = Arc::new(AtomicU32::new(0));
263
264        Self::spawn_health_check_task(
265            health_check_needed.clone(),
266            config.health_check_interval_secs,
267            shutdown_signal.clone(),
268        );
269
270        Self::spawn_recovery_task(
271            recovery_mode.clone(),
272            recovery_allowance.clone(),
273            shutdown_signal.clone(),
274        );
275
276        Self {
277            connection_pool,
278            socket_path,
279            process: tokio::sync::Mutex::new(None),
280            initialized: Arc::new(AtomicBool::new(false)),
281            restart_lock: tokio::sync::Mutex::new(()),
282            request_tx: tx,
283            max_queue_size,
284            health_check_needed,
285            consecutive_failures,
286            circuit_breaker,
287            last_restart_time_ms,
288            recovery_mode,
289            recovery_allowance,
290            shutdown_signal,
291        }
292    }
293
294    /// Spawn background task to gradually increase recovery allowance
295    fn spawn_recovery_task(
296        recovery_mode: Arc<AtomicBool>,
297        recovery_allowance: Arc<AtomicU32>,
298        shutdown_signal: Arc<tokio::sync::Notify>,
299    ) {
300        tokio::spawn(async move {
301            let mut interval = tokio::time::interval(Duration::from_millis(500));
302            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
303
304            loop {
305                tokio::select! {
306                    biased;
307
308                    _ = shutdown_signal.notified() => {
309                        tracing::debug!("Recovery task received shutdown signal");
310                        break;
311                    }
312
313                    _ = interval.tick() => {
314                        if recovery_mode.load(Ordering::Relaxed) {
315                            let current = recovery_allowance.load(Ordering::Relaxed);
316                            if current < 100 {
317                                let new_allowance = (current + 10).min(100);
318                                recovery_allowance.store(new_allowance, Ordering::Relaxed);
319                                tracing::debug!(
320                                    allowance = new_allowance,
321                                    "Recovery mode: increasing request allowance"
322                                );
323                            } else {
324                                recovery_mode.store(false, Ordering::Relaxed);
325                                tracing::info!("Recovery mode complete - full capacity restored");
326                            }
327                        }
328                    }
329                }
330            }
331        });
332    }
333
334    /// Spawn background task to set health check flag periodically
335    fn spawn_health_check_task(
336        health_check_needed: Arc<AtomicBool>,
337        interval_secs: u64,
338        shutdown_signal: Arc<tokio::sync::Notify>,
339    ) {
340        tokio::spawn(async move {
341            let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
342            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
343
344            loop {
345                tokio::select! {
346                    biased;
347
348                    _ = shutdown_signal.notified() => {
349                        tracing::debug!("Health check task received shutdown signal");
350                        break;
351                    }
352
353                    _ = interval.tick() => {
354                        health_check_needed.store(true, Ordering::Relaxed);
355                    }
356                }
357            }
358        });
359    }
360
361    /// Spawn multiple worker tasks to process queued requests concurrently
362    fn spawn_queue_workers(
363        rx: async_channel::Receiver<QueuedRequest>,
364        connection_pool: Arc<ConnectionPool>,
365        configured_workers: usize,
366        shutdown_signal: Arc<tokio::sync::Notify>,
367    ) {
368        let num_workers = if configured_workers > 0 {
369            configured_workers
370        } else {
371            std::thread::available_parallelism()
372                .map(|n| n.get().clamp(4, 32))
373                .unwrap_or(8)
374        };
375
376        tracing::info!(num_workers = num_workers, "Starting request queue workers");
377
378        for worker_id in 0..num_workers {
379            let rx_clone = rx.clone();
380            let pool_clone = connection_pool.clone();
381            let shutdown = shutdown_signal.clone();
382
383            tokio::spawn(async move {
384                loop {
385                    tokio::select! {
386                        biased;
387
388                        _ = shutdown.notified() => {
389                            tracing::debug!(worker_id = worker_id, "Request queue worker received shutdown signal");
390                            break;
391                        }
392
393                        request_result = rx_clone.recv() => {
394                            let request = match request_result {
395                                Ok(r) => r,
396                                Err(_) => break,
397                            };
398
399                            let start = std::time::Instant::now();
400                            let plugin_id = request.plugin_id.clone();
401
402                            let result = Self::execute_plugin_internal(
403                                &pool_clone,
404                                request.plugin_id,
405                                request.compiled_code,
406                                request.plugin_path,
407                                request.params,
408                                request.headers,
409                                request.socket_path,
410                                request.http_request_id,
411                                request.timeout_secs,
412                                request.route,
413                                request.config,
414                                request.method,
415                                request.query,
416                            )
417                            .await;
418
419                            let elapsed = start.elapsed();
420                            if let Err(ref e) = result {
421                                let error_str = format!("{e:?}");
422                                if error_str.contains("shutdown") || error_str.contains("Shutdown") {
423                                    tracing::debug!(
424                                        worker_id = worker_id,
425                                        plugin_id = %plugin_id,
426                                        "Plugin execution cancelled during shutdown"
427                                    );
428                                } else {
429                                    tracing::warn!(
430                                        worker_id = worker_id,
431                                        plugin_id = %plugin_id,
432                                        elapsed_ms = elapsed.as_millis() as u64,
433                                        error = ?e,
434                                        "Plugin execution failed"
435                                    );
436                                }
437                            } else if elapsed.as_secs() > 1 {
438                                tracing::debug!(
439                                    worker_id = worker_id,
440                                    plugin_id = %plugin_id,
441                                    elapsed_ms = elapsed.as_millis() as u64,
442                                    "Slow plugin execution"
443                                );
444                            }
445
446                            let _ = request.response_tx.send(result);
447                        }
448                    }
449                }
450
451                tracing::debug!(worker_id = worker_id, "Request queue worker exited");
452            });
453        }
454    }
455
456    /// Spawn a rate-limited stderr reader to prevent log flooding
457    fn spawn_rate_limited_stderr_reader(stderr: tokio::process::ChildStderr) {
458        tokio::spawn(async move {
459            let reader = BufReader::new(stderr);
460            let mut lines = reader.lines();
461
462            let mut last_log_time = std::time::Instant::now();
463            let mut suppressed_count = 0u64;
464            let min_interval = Duration::from_millis(100);
465
466            while let Ok(Some(line)) = lines.next_line().await {
467                let now = std::time::Instant::now();
468                let elapsed = now.duration_since(last_log_time);
469
470                if elapsed >= min_interval {
471                    if suppressed_count > 0 {
472                        tracing::warn!(
473                            target: "pool_server",
474                            suppressed = suppressed_count,
475                            "... ({} lines suppressed due to rate limiting)",
476                            suppressed_count
477                        );
478                        suppressed_count = 0;
479                    }
480                    tracing::error!(target: "pool_server", "{}", line);
481                    last_log_time = now;
482                } else {
483                    suppressed_count += 1;
484                    if suppressed_count % 100 == 0 {
485                        tracing::warn!(
486                            target: "pool_server",
487                            suppressed = suppressed_count,
488                            "Pool server producing excessive stderr output"
489                        );
490                    }
491                }
492            }
493
494            if suppressed_count > 0 {
495                tracing::warn!(
496                    target: "pool_server",
497                    suppressed = suppressed_count,
498                    "Pool server stderr closed ({} final lines suppressed)",
499                    suppressed_count
500                );
501            }
502        });
503    }
504
505    /// Execute plugin with optional pre-acquired permit (unified fast/slow path)
506    #[allow(clippy::too_many_arguments)]
507    async fn execute_with_permit(
508        connection_pool: &Arc<ConnectionPool>,
509        permit: Option<tokio::sync::OwnedSemaphorePermit>,
510        plugin_id: String,
511        compiled_code: Option<String>,
512        plugin_path: Option<String>,
513        params: serde_json::Value,
514        headers: Option<HashMap<String, Vec<String>>>,
515        socket_path: String,
516        http_request_id: Option<String>,
517        timeout_secs: Option<u64>,
518        route: Option<String>,
519        config: Option<serde_json::Value>,
520        method: Option<String>,
521        query: Option<serde_json::Value>,
522    ) -> Result<ScriptResult, PluginError> {
523        let mut conn = connection_pool.acquire_with_permit(permit).await?;
524
525        let request = PoolRequest::Execute(Box::new(super::protocol::ExecuteRequest {
526            task_id: Uuid::new_v4().to_string(),
527            plugin_id: plugin_id.clone(),
528            compiled_code,
529            plugin_path,
530            params,
531            headers,
532            socket_path,
533            http_request_id,
534            timeout: timeout_secs.map(|s| s * 1000),
535            route,
536            config,
537            method,
538            query,
539        }));
540
541        let timeout = timeout_secs.unwrap_or(get_config().pool_request_timeout_secs);
542        let response = conn.send_request_with_timeout(&request, timeout).await?;
543
544        // Use extracted parsing function for cleaner code and testability
545        Self::parse_pool_response(response)
546    }
547
548    /// Internal execution method (wrapper for execute_with_permit)
549    #[allow(clippy::too_many_arguments)]
550    async fn execute_plugin_internal(
551        connection_pool: &Arc<ConnectionPool>,
552        plugin_id: String,
553        compiled_code: Option<String>,
554        plugin_path: Option<String>,
555        params: serde_json::Value,
556        headers: Option<HashMap<String, Vec<String>>>,
557        socket_path: String,
558        http_request_id: Option<String>,
559        timeout_secs: Option<u64>,
560        route: Option<String>,
561        config: Option<serde_json::Value>,
562        method: Option<String>,
563        query: Option<serde_json::Value>,
564    ) -> Result<ScriptResult, PluginError> {
565        Self::execute_with_permit(
566            connection_pool,
567            None,
568            plugin_id,
569            compiled_code,
570            plugin_path,
571            params,
572            headers,
573            socket_path,
574            http_request_id,
575            timeout_secs,
576            route,
577            config,
578            method,
579            query,
580        )
581        .await
582    }
583
584    /// Check if the pool manager has been initialized.
585    ///
586    /// This is useful for health checks to determine if the plugin pool
587    /// is expected to be running.
588    pub async fn is_initialized(&self) -> bool {
589        self.initialized.load(Ordering::Acquire)
590    }
591
592    /// Start the pool server if not already running
593    pub async fn ensure_started(&self) -> Result<(), PluginError> {
594        if self.initialized.load(Ordering::Acquire) {
595            return Ok(());
596        }
597
598        let _startup_guard = self.restart_lock.lock().await;
599
600        if self.initialized.load(Ordering::Acquire) {
601            return Ok(());
602        }
603
604        self.start_pool_server().await?;
605        self.initialized.store(true, Ordering::Release);
606        Ok(())
607    }
608
609    /// Ensure pool is started and healthy, with auto-recovery on failure
610    async fn ensure_started_and_healthy(&self) -> Result<(), PluginError> {
611        self.ensure_started().await?;
612
613        if !self.health_check_needed.load(Ordering::Relaxed) {
614            return Ok(());
615        }
616
617        if self
618            .health_check_needed
619            .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed)
620            .is_err()
621        {
622            return Ok(());
623        }
624
625        self.check_and_restart_if_needed().await
626    }
627
628    /// Check process status and restart if needed
629    async fn check_and_restart_if_needed(&self) -> Result<(), PluginError> {
630        // Check process status without holding restart lock
631        let process_status = {
632            let mut process_guard = self.process.lock().await;
633            if let Some(child) = process_guard.as_mut() {
634                match child.try_wait() {
635                    Ok(Some(exit_status)) => {
636                        tracing::warn!(
637                            exit_status = ?exit_status,
638                            "Pool server process has exited"
639                        );
640                        *process_guard = None;
641                        ProcessStatus::Exited
642                    }
643                    Ok(None) => ProcessStatus::Running,
644                    Err(e) => {
645                        tracing::warn!(
646                            error = %e,
647                            "Failed to check pool server process status, assuming dead"
648                        );
649                        *process_guard = None;
650                        ProcessStatus::Unknown
651                    }
652                }
653            } else {
654                ProcessStatus::NoProcess
655            }
656        };
657
658        // Determine if restart is needed
659        let needs_restart = match process_status {
660            ProcessStatus::Running => {
661                let socket_exists = std::path::Path::new(&self.socket_path).exists();
662                if !socket_exists {
663                    tracing::warn!(
664                        socket_path = %self.socket_path,
665                        "Pool server socket file missing, needs restart"
666                    );
667                    true
668                } else {
669                    false
670                }
671            }
672            ProcessStatus::Exited | ProcessStatus::Unknown | ProcessStatus::NoProcess => {
673                tracing::warn!("Pool server not running, needs restart");
674                true
675            }
676        };
677
678        // Only acquire restart lock if restart is actually needed
679        if needs_restart {
680            let _restart_guard = self.restart_lock.lock().await;
681            self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
682            self.restart_internal().await?;
683            self.consecutive_failures.store(0, Ordering::Relaxed);
684        }
685
686        Ok(())
687    }
688
689    /// Clean up socket file with retry logic
690    async fn cleanup_socket_file(socket_path: &str) {
691        let max_cleanup_attempts = 5;
692        let mut attempts = 0;
693
694        while attempts < max_cleanup_attempts {
695            match std::fs::remove_file(socket_path) {
696                Ok(_) => break,
697                Err(e) if e.kind() == std::io::ErrorKind::NotFound => break,
698                Err(e) => {
699                    attempts += 1;
700                    if attempts >= max_cleanup_attempts {
701                        tracing::warn!(
702                            socket_path = %socket_path,
703                            error = %e,
704                            "Failed to remove socket file after {} attempts, proceeding anyway",
705                            max_cleanup_attempts
706                        );
707                        break;
708                    }
709                    let delay_ms = 10 * (1 << attempts.min(3));
710                    tokio::time::sleep(Duration::from_millis(delay_ms)).await;
711                }
712            }
713        }
714
715        tokio::time::sleep(Duration::from_millis(50)).await;
716    }
717
718    /// Spawn the pool server process with proper configuration
719    async fn spawn_pool_server_process(
720        socket_path: &str,
721        context: &str,
722    ) -> Result<Child, PluginError> {
723        let pool_server_path = std::env::current_dir()
724            .map(|cwd| cwd.join("plugins/lib/pool-server.ts").display().to_string())
725            .unwrap_or_else(|_| "plugins/lib/pool-server.ts".to_string());
726
727        let config = get_config();
728
729        // Use extracted function for heap calculation
730        let pool_server_heap_mb = Self::calculate_heap_size(config.max_concurrency);
731
732        // Log warning if heap was capped (for observability)
733        let uncapped_heap = Self::BASE_HEAP_MB
734            + ((config.max_concurrency / Self::CONCURRENCY_DIVISOR)
735                * Self::HEAP_INCREMENT_PER_DIVISOR_MB);
736        if uncapped_heap > Self::MAX_HEAP_MB {
737            tracing::warn!(
738                calculated_heap_mb = uncapped_heap,
739                capped_heap_mb = pool_server_heap_mb,
740                max_concurrency = config.max_concurrency,
741                "Pool server heap calculation exceeded 8GB cap"
742            );
743        }
744
745        tracing::info!(
746            socket_path = %socket_path,
747            heap_mb = pool_server_heap_mb,
748            max_concurrency = config.max_concurrency,
749            context = context,
750            "Spawning plugin pool server"
751        );
752
753        let node_options = format!("--max-old-space-size={pool_server_heap_mb} --expose-gc");
754
755        let mut child = Command::new("ts-node")
756            .arg("--transpile-only")
757            .arg(&pool_server_path)
758            .arg(socket_path)
759            .env("NODE_OPTIONS", node_options)
760            .env("PLUGIN_MAX_CONCURRENCY", config.max_concurrency.to_string())
761            .env(
762                "PLUGIN_POOL_MIN_THREADS",
763                config.nodejs_pool_min_threads.to_string(),
764            )
765            .env(
766                "PLUGIN_POOL_MAX_THREADS",
767                config.nodejs_pool_max_threads.to_string(),
768            )
769            .env(
770                "PLUGIN_POOL_CONCURRENT_TASKS",
771                config.nodejs_pool_concurrent_tasks.to_string(),
772            )
773            .env(
774                "PLUGIN_POOL_IDLE_TIMEOUT",
775                config.nodejs_pool_idle_timeout_ms.to_string(),
776            )
777            .env(
778                "PLUGIN_WORKER_HEAP_MB",
779                config.nodejs_worker_heap_mb.to_string(),
780            )
781            .env(
782                "PLUGIN_POOL_SOCKET_BACKLOG",
783                config.pool_socket_backlog.to_string(),
784            )
785            .stdin(Stdio::null())
786            .stdout(Stdio::piped())
787            .stderr(Stdio::piped())
788            .spawn()
789            .map_err(|e| {
790                PluginError::PluginExecutionError(format!("Failed to {context} pool server: {e}"))
791            })?;
792
793        if let Some(stderr) = child.stderr.take() {
794            Self::spawn_rate_limited_stderr_reader(stderr);
795        }
796
797        if let Some(stdout) = child.stdout.take() {
798            let reader = BufReader::new(stdout);
799            let mut lines = reader.lines();
800
801            let timeout_result = tokio::time::timeout(Duration::from_secs(10), async {
802                while let Ok(Some(line)) = lines.next_line().await {
803                    if line.contains("POOL_SERVER_READY") {
804                        return Ok(());
805                    }
806                }
807                Err(PluginError::PluginExecutionError(
808                    "Pool server did not send ready signal".to_string(),
809                ))
810            })
811            .await;
812
813            match timeout_result {
814                Ok(Ok(())) => {
815                    tracing::info!(context = context, "Plugin pool server ready");
816                }
817                Ok(Err(e)) => return Err(e),
818                Err(_) => {
819                    return Err(PluginError::PluginExecutionError(format!(
820                        "Timeout waiting for pool server to {context}"
821                    )))
822                }
823            }
824        }
825
826        Ok(child)
827    }
828
829    async fn start_pool_server(&self) -> Result<(), PluginError> {
830        let mut process_guard = self.process.lock().await;
831
832        if process_guard.is_some() {
833            return Ok(());
834        }
835
836        Self::cleanup_socket_file(&self.socket_path).await;
837
838        let child = Self::spawn_pool_server_process(&self.socket_path, "start").await?;
839
840        *process_guard = Some(child);
841        Ok(())
842    }
843
844    /// Execute a plugin via the pool
845    #[allow(clippy::too_many_arguments)]
846    pub async fn execute_plugin(
847        &self,
848        plugin_id: String,
849        compiled_code: Option<String>,
850        plugin_path: Option<String>,
851        params: serde_json::Value,
852        headers: Option<HashMap<String, Vec<String>>>,
853        socket_path: String,
854        http_request_id: Option<String>,
855        timeout_secs: Option<u64>,
856        route: Option<String>,
857        config: Option<serde_json::Value>,
858        method: Option<String>,
859        query: Option<serde_json::Value>,
860    ) -> Result<ScriptResult, PluginError> {
861        let rid = http_request_id.as_deref().unwrap_or("unknown");
862        let effective_timeout =
863            timeout_secs.unwrap_or_else(|| get_config().pool_request_timeout_secs);
864        tracing::debug!(
865            plugin_id = %plugin_id,
866            http_request_id = %rid,
867            timeout_secs = effective_timeout,
868            "Pool execute request received"
869        );
870        let recovery_allowance = if self.recovery_mode.load(Ordering::Relaxed) {
871            Some(self.recovery_allowance.load(Ordering::Relaxed))
872        } else {
873            None
874        };
875
876        if !self
877            .circuit_breaker
878            .should_allow_request(recovery_allowance)
879        {
880            let state = self.circuit_breaker.state();
881            tracing::warn!(
882                plugin_id = %plugin_id,
883                circuit_state = ?state,
884                recovery_allowance = ?recovery_allowance,
885                "Request rejected by circuit breaker"
886            );
887            return Err(PluginError::PluginExecutionError(
888                "Plugin system temporarily unavailable due to high load. Please retry shortly."
889                    .to_string(),
890            ));
891        }
892
893        let start_time = Instant::now();
894
895        self.ensure_started_and_healthy().await?;
896        tracing::debug!(
897            plugin_id = %plugin_id,
898            http_request_id = %rid,
899            "Pool execute start (healthy/started)"
900        );
901
902        let circuit_breaker = self.circuit_breaker.clone();
903        match self.connection_pool.semaphore.clone().try_acquire_owned() {
904            Ok(permit) => {
905                tracing::debug!(
906                    plugin_id = %plugin_id,
907                    http_request_id = %rid,
908                    "Pool execute acquired connection permit (fast path)"
909                );
910                let result = Self::execute_with_permit(
911                    &self.connection_pool,
912                    Some(permit),
913                    plugin_id,
914                    compiled_code,
915                    plugin_path,
916                    params,
917                    headers,
918                    socket_path,
919                    http_request_id,
920                    timeout_secs,
921                    route,
922                    config,
923                    method,
924                    query,
925                )
926                .await;
927
928                let elapsed_ms = start_time.elapsed().as_millis() as u32;
929                match &result {
930                    Ok(_) => circuit_breaker.record_success(elapsed_ms),
931                    Err(e) => {
932                        // Only count infrastructure errors for circuit breaker, not business errors
933                        // Business errors (RPC failures, plugin logic errors) mean the pool is healthy
934                        if Self::is_dead_server_error(e) {
935                            circuit_breaker.record_failure();
936                            tracing::warn!(
937                                error = %e,
938                                "Detected dead pool server error, triggering health check for restart"
939                            );
940                            self.health_check_needed.store(true, Ordering::Relaxed);
941                        } else {
942                            // Plugin executed but returned error - infrastructure is healthy
943                            circuit_breaker.record_success(elapsed_ms);
944                        }
945                    }
946                }
947
948                tracing::debug!(
949                    elapsed_ms = elapsed_ms,
950                    result_ok = result.is_ok(),
951                    "Pool execute finished (fast path)"
952                );
953                result
954            }
955            Err(_) => {
956                tracing::debug!(
957                    plugin_id = %plugin_id,
958                    http_request_id = %rid,
959                    "Pool execute queueing (no permits)"
960                );
961                let (response_tx, response_rx) = oneshot::channel();
962
963                let queued_request = QueuedRequest {
964                    plugin_id,
965                    compiled_code,
966                    plugin_path,
967                    params,
968                    headers,
969                    socket_path,
970                    http_request_id,
971                    timeout_secs,
972                    route,
973                    config,
974                    method,
975                    query,
976                    response_tx,
977                };
978
979                let result = match self.request_tx.try_send(queued_request) {
980                    Ok(()) => {
981                        let queue_len = self.request_tx.len();
982                        if queue_len > self.max_queue_size / 2 {
983                            tracing::warn!(
984                                queue_len = queue_len,
985                                max_queue_size = self.max_queue_size,
986                                "Plugin queue is over 50% capacity"
987                            );
988                        }
989                        // Add timeout to response_rx to prevent hung requests if worker crashes
990                        let response_timeout = timeout_secs
991                            .map(Duration::from_secs)
992                            .unwrap_or(Duration::from_secs(get_config().pool_request_timeout_secs))
993                            + Duration::from_secs(5); // Add 5s buffer for queue processing
994
995                        match tokio::time::timeout(response_timeout, response_rx).await {
996                            Ok(Ok(result)) => result,
997                            Ok(Err(_)) => Err(PluginError::PluginExecutionError(
998                                "Request queue processor closed".to_string(),
999                            )),
1000                            Err(_) => Err(PluginError::PluginExecutionError(format!(
1001                                "Request timed out after {}s waiting for worker response",
1002                                response_timeout.as_secs()
1003                            ))),
1004                        }
1005                    }
1006                    Err(async_channel::TrySendError::Full(req)) => {
1007                        let queue_timeout_ms = get_config().pool_queue_send_timeout_ms;
1008                        let queue_timeout = Duration::from_millis(queue_timeout_ms);
1009                        match tokio::time::timeout(queue_timeout, self.request_tx.send(req)).await {
1010                            Ok(Ok(())) => {
1011                                let queue_len = self.request_tx.len();
1012                                tracing::debug!(
1013                                    queue_len = queue_len,
1014                                    "Request queued after waiting for queue space"
1015                                );
1016                                // Add timeout to response_rx to prevent hung requests if worker crashes
1017                                let response_timeout =
1018                                    timeout_secs.map(Duration::from_secs).unwrap_or(
1019                                        Duration::from_secs(get_config().pool_request_timeout_secs),
1020                                    ) + Duration::from_secs(5); // Add 5s buffer for queue processing
1021
1022                                match tokio::time::timeout(response_timeout, response_rx).await {
1023                                    Ok(Ok(result)) => result,
1024                                    Ok(Err(_)) => Err(PluginError::PluginExecutionError(
1025                                        "Request queue processor closed".to_string(),
1026                                    )),
1027                                    Err(_) => Err(PluginError::PluginExecutionError(format!(
1028                                        "Request timed out after {}s waiting for worker response",
1029                                        response_timeout.as_secs()
1030                                    ))),
1031                                }
1032                            }
1033                            Ok(Err(async_channel::SendError(_))) => {
1034                                Err(PluginError::PluginExecutionError(
1035                                    "Plugin execution queue is closed".to_string(),
1036                                ))
1037                            }
1038                            Err(_) => {
1039                                let queue_len = self.request_tx.len();
1040                                tracing::error!(
1041                                    queue_len = queue_len,
1042                                    max_queue_size = self.max_queue_size,
1043                                    timeout_ms = queue_timeout.as_millis(),
1044                                    "Plugin execution queue is FULL - timeout waiting for space"
1045                                );
1046                                Err(PluginError::PluginExecutionError(format!(
1047                                    "Plugin execution queue is full (max: {}) and timeout waiting for space. \
1048                                    Consider increasing PLUGIN_POOL_MAX_QUEUE_SIZE or PLUGIN_POOL_MAX_CONNECTIONS.",
1049                                    self.max_queue_size
1050                                )))
1051                            }
1052                        }
1053                    }
1054                    Err(async_channel::TrySendError::Closed(_)) => {
1055                        Err(PluginError::PluginExecutionError(
1056                            "Plugin execution queue is closed".to_string(),
1057                        ))
1058                    }
1059                };
1060
1061                let elapsed_ms = start_time.elapsed().as_millis() as u32;
1062                match &result {
1063                    Ok(_) => circuit_breaker.record_success(elapsed_ms),
1064                    Err(e) => {
1065                        // Only count infrastructure errors for circuit breaker, not business errors
1066                        if Self::is_dead_server_error(e) {
1067                            circuit_breaker.record_failure();
1068                            tracing::warn!(
1069                                error = %e,
1070                                "Detected dead pool server error (queued path), triggering health check for restart"
1071                            );
1072                            self.health_check_needed.store(true, Ordering::Relaxed);
1073                        } else {
1074                            // Plugin executed but returned error - infrastructure is healthy
1075                            circuit_breaker.record_success(elapsed_ms);
1076                        }
1077                    }
1078                }
1079
1080                tracing::debug!(
1081                    elapsed_ms = elapsed_ms,
1082                    result_ok = result.is_ok(),
1083                    "Pool execute finished (queued path)"
1084                );
1085                result
1086            }
1087        }
1088    }
1089
1090    /// Check if an error indicates the pool server is dead and needs restart
1091    pub fn is_dead_server_error(err: &PluginError) -> bool {
1092        let error_str = err.to_string();
1093        let lower = error_str.to_lowercase();
1094
1095        if lower.contains("handler timed out")
1096            || (lower.contains("plugin") && lower.contains("timed out"))
1097        {
1098            return false;
1099        }
1100
1101        DeadServerIndicator::from_error_str(&error_str).is_some()
1102    }
1103
1104    /// Precompile a plugin
1105    pub async fn precompile_plugin(
1106        &self,
1107        plugin_id: String,
1108        plugin_path: Option<String>,
1109        source_code: Option<String>,
1110    ) -> Result<String, PluginError> {
1111        self.ensure_started().await?;
1112
1113        let mut conn = self.connection_pool.acquire().await?;
1114
1115        let request = PoolRequest::Precompile {
1116            task_id: Uuid::new_v4().to_string(),
1117            plugin_id: plugin_id.clone(),
1118            plugin_path,
1119            source_code,
1120        };
1121
1122        let response = conn
1123            .send_request_with_timeout(&request, get_config().pool_request_timeout_secs)
1124            .await?;
1125
1126        if response.success {
1127            response
1128                .result
1129                .and_then(|v| {
1130                    v.get("code")
1131                        .and_then(|c| c.as_str())
1132                        .map(|s| s.to_string())
1133                })
1134                .ok_or_else(|| {
1135                    PluginError::PluginExecutionError("No compiled code in response".to_string())
1136                })
1137        } else {
1138            let error = response.error.unwrap_or(PoolError {
1139                message: "Compilation failed".to_string(),
1140                code: None,
1141                status: None,
1142                details: None,
1143            });
1144            Err(PluginError::PluginExecutionError(error.message))
1145        }
1146    }
1147
1148    /// Cache compiled code in the pool
1149    pub async fn cache_compiled_code(
1150        &self,
1151        plugin_id: String,
1152        compiled_code: String,
1153    ) -> Result<(), PluginError> {
1154        self.ensure_started().await?;
1155
1156        let mut conn = self.connection_pool.acquire().await?;
1157
1158        let request = PoolRequest::Cache {
1159            task_id: Uuid::new_v4().to_string(),
1160            plugin_id: plugin_id.clone(),
1161            compiled_code,
1162        };
1163
1164        let response = conn
1165            .send_request_with_timeout(&request, get_config().pool_request_timeout_secs)
1166            .await?;
1167
1168        if response.success {
1169            Ok(())
1170        } else {
1171            let error = response.error.unwrap_or(PoolError {
1172                message: "Cache failed".to_string(),
1173                code: None,
1174                status: None,
1175                details: None,
1176            });
1177            Err(PluginError::PluginError(error.message))
1178        }
1179    }
1180
1181    /// Invalidate a cached plugin
1182    pub async fn invalidate_plugin(&self, plugin_id: String) -> Result<(), PluginError> {
1183        if !self.initialized.load(Ordering::Acquire) {
1184            return Ok(());
1185        }
1186
1187        let mut conn = self.connection_pool.acquire().await?;
1188
1189        let request = PoolRequest::Invalidate {
1190            task_id: Uuid::new_v4().to_string(),
1191            plugin_id,
1192        };
1193
1194        let _ = conn
1195            .send_request_with_timeout(&request, get_config().pool_request_timeout_secs)
1196            .await?;
1197        Ok(())
1198    }
1199
1200    /// Health check - verify the pool server is responding
1201    /// Collect socket connection statistics
1202    async fn collect_socket_stats(
1203        &self,
1204    ) -> (
1205        Option<usize>,
1206        Option<usize>,
1207        Option<usize>,
1208        Option<usize>,
1209        Option<usize>,
1210    ) {
1211        // Collect shared socket stats
1212        let (shared_available, shared_active, shared_executions) = match get_shared_socket_service()
1213        {
1214            Ok(service) => {
1215                let available = service.available_connection_slots();
1216                let active = service.active_connection_count();
1217                let executions = service.registered_executions_count().await;
1218                (Some(available), Some(active), Some(executions))
1219            }
1220            Err(_) => (None, None, None),
1221        };
1222
1223        // Collect connection pool stats (for pool server connections)
1224        let pool_available = self.connection_pool.semaphore.available_permits();
1225        let pool_max = get_config().pool_max_connections;
1226        let pool_active = pool_max.saturating_sub(pool_available);
1227
1228        (
1229            shared_available,
1230            shared_active,
1231            shared_executions,
1232            Some(pool_available),
1233            Some(pool_active),
1234        )
1235    }
1236
1237    pub async fn health_check(&self) -> Result<HealthStatus, PluginError> {
1238        let circuit_info = || {
1239            let state = match self.circuit_breaker.state() {
1240                CircuitState::Closed => "closed",
1241                CircuitState::HalfOpen => "half_open",
1242                CircuitState::Open => "open",
1243            };
1244            (
1245                Some(state.to_string()),
1246                Some(self.circuit_breaker.avg_response_time()),
1247                Some(self.recovery_mode.load(Ordering::Relaxed)),
1248                Some(self.recovery_allowance.load(Ordering::Relaxed)),
1249            )
1250        };
1251
1252        let socket_stats = self.collect_socket_stats().await;
1253
1254        if !self.initialized.load(Ordering::Acquire) {
1255            let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1256            let (shared_available, shared_active, shared_executions, pool_available, pool_active) =
1257                socket_stats;
1258            return Ok(HealthStatus {
1259                healthy: false,
1260                status: "not_initialized".to_string(),
1261                uptime_ms: None,
1262                memory: None,
1263                pool_completed: None,
1264                pool_queued: None,
1265                success_rate: None,
1266                circuit_state,
1267                avg_response_time_ms: avg_rt,
1268                recovering,
1269                recovery_percent: recovery_pct,
1270                shared_socket_available_slots: shared_available,
1271                shared_socket_active_connections: shared_active,
1272                shared_socket_registered_executions: shared_executions,
1273                connection_pool_available_slots: pool_available,
1274                connection_pool_active_connections: pool_active,
1275            });
1276        }
1277
1278        if !std::path::Path::new(&self.socket_path).exists() {
1279            let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1280            let (shared_available, shared_active, shared_executions, pool_available, pool_active) =
1281                socket_stats;
1282            return Ok(HealthStatus {
1283                healthy: false,
1284                status: "socket_missing".to_string(),
1285                uptime_ms: None,
1286                memory: None,
1287                pool_completed: None,
1288                pool_queued: None,
1289                success_rate: None,
1290                circuit_state,
1291                avg_response_time_ms: avg_rt,
1292                recovering,
1293                recovery_percent: recovery_pct,
1294                shared_socket_available_slots: shared_available,
1295                shared_socket_active_connections: shared_active,
1296                shared_socket_registered_executions: shared_executions,
1297                connection_pool_available_slots: pool_available,
1298                connection_pool_active_connections: pool_active,
1299            });
1300        }
1301
1302        let mut conn =
1303            match tokio::time::timeout(Duration::from_millis(100), self.connection_pool.acquire())
1304                .await
1305            {
1306                Ok(Ok(c)) => c,
1307                Ok(Err(e)) => {
1308                    let err_str = e.to_string();
1309                    let is_pool_exhausted =
1310                        err_str.contains("semaphore") || err_str.contains("Connection refused");
1311
1312                    // Try to check process status without blocking on lock
1313                    let process_status = match self.process.try_lock() {
1314                        Ok(guard) => {
1315                            if let Some(child) = guard.as_ref() {
1316                                format!("process_pid_{}", child.id().unwrap_or(0))
1317                            } else {
1318                                "no_process".to_string()
1319                            }
1320                        }
1321                        Err(_) => "process_lock_busy".to_string(),
1322                    };
1323
1324                    let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1325                    let (
1326                        shared_available,
1327                        shared_active,
1328                        shared_executions,
1329                        pool_available,
1330                        pool_active,
1331                    ) = socket_stats;
1332                    return Ok(HealthStatus {
1333                        healthy: is_pool_exhausted,
1334                        status: if is_pool_exhausted {
1335                            format!("pool_exhausted: {e} ({process_status})")
1336                        } else {
1337                            format!("connection_failed: {e} ({process_status})")
1338                        },
1339                        uptime_ms: None,
1340                        memory: None,
1341                        pool_completed: None,
1342                        pool_queued: None,
1343                        success_rate: None,
1344                        circuit_state,
1345                        avg_response_time_ms: avg_rt,
1346                        recovering,
1347                        recovery_percent: recovery_pct,
1348                        shared_socket_available_slots: shared_available,
1349                        shared_socket_active_connections: shared_active,
1350                        shared_socket_registered_executions: shared_executions,
1351                        connection_pool_available_slots: pool_available,
1352                        connection_pool_active_connections: pool_active,
1353                    });
1354                }
1355                Err(_) => {
1356                    let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1357                    let (
1358                        shared_available,
1359                        shared_active,
1360                        shared_executions,
1361                        pool_available,
1362                        pool_active,
1363                    ) = socket_stats;
1364                    return Ok(HealthStatus {
1365                        healthy: true,
1366                        status: "pool_busy".to_string(),
1367                        uptime_ms: None,
1368                        memory: None,
1369                        pool_completed: None,
1370                        pool_queued: None,
1371                        success_rate: None,
1372                        circuit_state,
1373                        avg_response_time_ms: avg_rt,
1374                        recovering,
1375                        recovery_percent: recovery_pct,
1376                        shared_socket_available_slots: shared_available,
1377                        shared_socket_active_connections: shared_active,
1378                        shared_socket_registered_executions: shared_executions,
1379                        connection_pool_available_slots: pool_available,
1380                        connection_pool_active_connections: pool_active,
1381                    });
1382                }
1383            };
1384
1385        let request = PoolRequest::Health {
1386            task_id: Uuid::new_v4().to_string(),
1387        };
1388
1389        let (circuit_state, avg_rt, recovering, recovery_pct) = circuit_info();
1390
1391        match conn.send_request_with_timeout(&request, 5).await {
1392            Ok(response) => {
1393                if response.success {
1394                    let result = response.result.unwrap_or_default();
1395                    // Use extracted parsing function for testability
1396                    let parsed = Self::parse_health_result(&result);
1397
1398                    {
1399                        let (
1400                            shared_available,
1401                            shared_active,
1402                            shared_executions,
1403                            pool_available,
1404                            pool_active,
1405                        ) = socket_stats;
1406                        Ok(HealthStatus {
1407                            healthy: true,
1408                            status: parsed.status,
1409                            uptime_ms: parsed.uptime_ms,
1410                            memory: parsed.memory,
1411                            pool_completed: parsed.pool_completed,
1412                            pool_queued: parsed.pool_queued,
1413                            success_rate: parsed.success_rate,
1414                            circuit_state,
1415                            avg_response_time_ms: avg_rt,
1416                            recovering,
1417                            recovery_percent: recovery_pct,
1418                            shared_socket_available_slots: shared_available,
1419                            shared_socket_active_connections: shared_active,
1420                            shared_socket_registered_executions: shared_executions,
1421                            connection_pool_available_slots: pool_available,
1422                            connection_pool_active_connections: pool_active,
1423                        })
1424                    }
1425                } else {
1426                    let (
1427                        shared_available,
1428                        shared_active,
1429                        shared_executions,
1430                        pool_available,
1431                        pool_active,
1432                    ) = socket_stats;
1433                    Ok(HealthStatus {
1434                        healthy: false,
1435                        status: response
1436                            .error
1437                            .map(|e| e.message)
1438                            .unwrap_or_else(|| "unknown_error".to_string()),
1439                        uptime_ms: None,
1440                        memory: None,
1441                        pool_completed: None,
1442                        pool_queued: None,
1443                        success_rate: None,
1444                        circuit_state,
1445                        avg_response_time_ms: avg_rt,
1446                        recovering,
1447                        recovery_percent: recovery_pct,
1448                        shared_socket_available_slots: shared_available,
1449                        shared_socket_active_connections: shared_active,
1450                        shared_socket_registered_executions: shared_executions,
1451                        connection_pool_available_slots: pool_available,
1452                        connection_pool_active_connections: pool_active,
1453                    })
1454                }
1455            }
1456            Err(e) => {
1457                let (
1458                    shared_available,
1459                    shared_active,
1460                    shared_executions,
1461                    pool_available,
1462                    pool_active,
1463                ) = socket_stats;
1464                Ok(HealthStatus {
1465                    healthy: false,
1466                    status: format!("request_failed: {e}"),
1467                    uptime_ms: None,
1468                    memory: None,
1469                    pool_completed: None,
1470                    pool_queued: None,
1471                    success_rate: None,
1472                    circuit_state,
1473                    avg_response_time_ms: avg_rt,
1474                    recovering,
1475                    recovery_percent: recovery_pct,
1476                    shared_socket_available_slots: shared_available,
1477                    shared_socket_active_connections: shared_active,
1478                    shared_socket_registered_executions: shared_executions,
1479                    connection_pool_available_slots: pool_available,
1480                    connection_pool_active_connections: pool_active,
1481                })
1482            }
1483        }
1484    }
1485
1486    /// Check health and restart if unhealthy
1487    pub async fn ensure_healthy(&self) -> Result<bool, PluginError> {
1488        let health = self.health_check().await?;
1489
1490        if health.healthy {
1491            return Ok(true);
1492        }
1493
1494        match self.restart_lock.try_lock() {
1495            Ok(_guard) => {
1496                let health_recheck = self.health_check().await?;
1497                if health_recheck.healthy {
1498                    return Ok(true);
1499                }
1500
1501                tracing::warn!(status = %health.status, "Pool server unhealthy, attempting restart");
1502                self.restart_internal().await?;
1503            }
1504            Err(_) => {
1505                tracing::debug!("Waiting for another task to complete pool server restart");
1506                let _guard = self.restart_lock.lock().await;
1507            }
1508        }
1509
1510        let health_after = self.health_check().await?;
1511        Ok(health_after.healthy)
1512    }
1513
1514    /// Force restart the pool server (public API - acquires lock)
1515    pub async fn restart(&self) -> Result<(), PluginError> {
1516        let _guard = self.restart_lock.lock().await;
1517        self.restart_internal().await
1518    }
1519
1520    /// Internal restart without lock (must be called with restart_lock held)
1521    async fn restart_internal(&self) -> Result<(), PluginError> {
1522        tracing::info!("Restarting plugin pool server");
1523
1524        {
1525            let mut process_guard = self.process.lock().await;
1526            if let Some(mut child) = process_guard.take() {
1527                let _ = child.kill().await;
1528                tokio::time::sleep(Duration::from_millis(100)).await;
1529            }
1530        }
1531
1532        Self::cleanup_socket_file(&self.socket_path).await;
1533
1534        self.initialized.store(false, Ordering::Release);
1535
1536        let mut process_guard = self.process.lock().await;
1537        if process_guard.is_some() {
1538            return Ok(());
1539        }
1540
1541        let child = Self::spawn_pool_server_process(&self.socket_path, "restart").await?;
1542        *process_guard = Some(child);
1543
1544        self.initialized.store(true, Ordering::Release);
1545
1546        self.recovery_allowance.store(10, Ordering::Relaxed);
1547        self.recovery_mode.store(true, Ordering::Relaxed);
1548
1549        self.circuit_breaker.force_close();
1550
1551        let now = std::time::SystemTime::now()
1552            .duration_since(std::time::UNIX_EPOCH)
1553            .unwrap_or_default()
1554            .as_millis() as u64;
1555        self.last_restart_time_ms.store(now, Ordering::Relaxed);
1556
1557        tracing::info!("Recovery mode enabled - requests will gradually increase from 10%");
1558
1559        Ok(())
1560    }
1561
1562    /// Get current circuit breaker state for monitoring
1563    pub fn circuit_state(&self) -> CircuitState {
1564        self.circuit_breaker.state()
1565    }
1566
1567    /// Get average response time in ms (for monitoring)
1568    pub fn avg_response_time_ms(&self) -> u32 {
1569        self.circuit_breaker.avg_response_time()
1570    }
1571
1572    /// Check if currently in recovery mode
1573    pub fn is_recovering(&self) -> bool {
1574        self.recovery_mode.load(Ordering::Relaxed)
1575    }
1576
1577    /// Get current recovery allowance percentage (0-100)
1578    pub fn recovery_allowance_percent(&self) -> u32 {
1579        self.recovery_allowance.load(Ordering::Relaxed)
1580    }
1581
1582    /// Shutdown the pool server gracefully
1583    pub async fn shutdown(&self) -> Result<(), PluginError> {
1584        if !self.initialized.load(Ordering::Acquire) {
1585            return Ok(());
1586        }
1587
1588        tracing::info!("Initiating graceful shutdown of plugin pool server");
1589
1590        self.shutdown_signal.notify_waiters();
1591
1592        let shutdown_timeout = std::time::Duration::from_secs(35);
1593        let shutdown_result = self.send_shutdown_request(shutdown_timeout).await;
1594
1595        match &shutdown_result {
1596            Ok(response) => {
1597                tracing::info!(
1598                    response = ?response,
1599                    "Pool server acknowledged shutdown, waiting for graceful exit"
1600                );
1601            }
1602            Err(e) => {
1603                tracing::warn!(
1604                    error = %e,
1605                    "Failed to send shutdown request, will force kill"
1606                );
1607            }
1608        }
1609
1610        let mut process_guard = self.process.lock().await;
1611        if let Some(ref mut child) = *process_guard {
1612            let graceful_wait = std::time::Duration::from_secs(35);
1613            let start = std::time::Instant::now();
1614
1615            loop {
1616                match child.try_wait() {
1617                    Ok(Some(status)) => {
1618                        tracing::info!(
1619                            exit_status = ?status,
1620                            elapsed_ms = start.elapsed().as_millis(),
1621                            "Pool server exited gracefully"
1622                        );
1623                        break;
1624                    }
1625                    Ok(None) => {
1626                        if start.elapsed() >= graceful_wait {
1627                            tracing::warn!(
1628                                "Pool server did not exit within graceful timeout, force killing"
1629                            );
1630                            let _ = child.kill().await;
1631                            break;
1632                        }
1633                        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1634                    }
1635                    Err(e) => {
1636                        tracing::warn!(error = %e, "Error checking pool server status");
1637                        let _ = child.kill().await;
1638                        break;
1639                    }
1640                }
1641            }
1642        }
1643        *process_guard = None;
1644
1645        let _ = std::fs::remove_file(&self.socket_path);
1646
1647        self.initialized.store(false, Ordering::Release);
1648        tracing::info!("Plugin pool server shutdown complete");
1649        Ok(())
1650    }
1651
1652    /// Send shutdown request to the pool server
1653    async fn send_shutdown_request(
1654        &self,
1655        timeout: std::time::Duration,
1656    ) -> Result<PoolResponse, PluginError> {
1657        let request = PoolRequest::Shutdown {
1658            task_id: Uuid::new_v4().to_string(),
1659        };
1660
1661        // Use the pool's connection ID counter to ensure unique IDs
1662        // even for shutdown connections that bypass the pool
1663        let connection_id = self.connection_pool.next_connection_id();
1664        let mut conn = match PoolConnection::new(&self.socket_path, connection_id).await {
1665            Ok(c) => c,
1666            Err(e) => {
1667                return Err(PluginError::PluginExecutionError(format!(
1668                    "Failed to connect for shutdown: {e}"
1669                )));
1670            }
1671        };
1672
1673        conn.send_request_with_timeout(&request, timeout.as_secs())
1674            .await
1675    }
1676}
1677
1678impl Drop for PoolManager {
1679    fn drop(&mut self) {
1680        let _ = std::fs::remove_file(&self.socket_path);
1681    }
1682}
1683
1684/// Global pool manager instance
1685static POOL_MANAGER: std::sync::OnceLock<Arc<PoolManager>> = std::sync::OnceLock::new();
1686
1687/// Get or create the global pool manager
1688pub fn get_pool_manager() -> Arc<PoolManager> {
1689    POOL_MANAGER
1690        .get_or_init(|| Arc::new(PoolManager::new()))
1691        .clone()
1692}
1693
1694#[cfg(test)]
1695mod tests {
1696    use super::*;
1697    use crate::services::plugins::script_executor::LogLevel;
1698
1699    #[test]
1700    fn test_is_dead_server_error_detects_dead_server() {
1701        let err = PluginError::PluginExecutionError("Connection refused".to_string());
1702        assert!(PoolManager::is_dead_server_error(&err));
1703
1704        let err = PluginError::PluginExecutionError("Broken pipe".to_string());
1705        assert!(PoolManager::is_dead_server_error(&err));
1706    }
1707
1708    #[test]
1709    fn test_is_dead_server_error_excludes_plugin_timeouts() {
1710        let err = PluginError::PluginExecutionError("Plugin timed out after 30s".to_string());
1711        assert!(!PoolManager::is_dead_server_error(&err));
1712
1713        let err = PluginError::PluginExecutionError("Handler timed out".to_string());
1714        assert!(!PoolManager::is_dead_server_error(&err));
1715    }
1716
1717    #[test]
1718    fn test_is_dead_server_error_normal_errors() {
1719        let err =
1720            PluginError::PluginExecutionError("TypeError: undefined is not a function".to_string());
1721        assert!(!PoolManager::is_dead_server_error(&err));
1722
1723        let err = PluginError::PluginExecutionError("Plugin returned invalid JSON".to_string());
1724        assert!(!PoolManager::is_dead_server_error(&err));
1725    }
1726
1727    #[test]
1728    fn test_is_dead_server_error_detects_all_dead_server_indicators() {
1729        // Test common DeadServerIndicator patterns
1730        let dead_server_errors = vec![
1731            "EOF while parsing JSON response",
1732            "Broken pipe when writing to socket",
1733            "Connection refused: server not running",
1734            "Connection reset by peer",
1735            "Socket not connected",
1736            "Failed to connect to pool server",
1737            "Socket file missing: /tmp/test.sock",
1738            "No such file or directory",
1739        ];
1740
1741        for error_msg in dead_server_errors {
1742            let err = PluginError::PluginExecutionError(error_msg.to_string());
1743            assert!(
1744                PoolManager::is_dead_server_error(&err),
1745                "Expected '{error_msg}' to be detected as dead server error"
1746            );
1747        }
1748    }
1749
1750    #[test]
1751    fn test_dead_server_indicator_patterns() {
1752        // Test the DeadServerIndicator pattern matching directly
1753        use super::super::health::DeadServerIndicator;
1754
1755        // These should all match
1756        assert!(DeadServerIndicator::from_error_str("eof while parsing").is_some());
1757        assert!(DeadServerIndicator::from_error_str("broken pipe").is_some());
1758        assert!(DeadServerIndicator::from_error_str("connection refused").is_some());
1759        assert!(DeadServerIndicator::from_error_str("connection reset").is_some());
1760        assert!(DeadServerIndicator::from_error_str("not connected").is_some());
1761        assert!(DeadServerIndicator::from_error_str("failed to connect").is_some());
1762        assert!(DeadServerIndicator::from_error_str("socket file missing").is_some());
1763        assert!(DeadServerIndicator::from_error_str("no such file").is_some());
1764        assert!(DeadServerIndicator::from_error_str("connection timed out").is_some());
1765        assert!(DeadServerIndicator::from_error_str("connect timed out").is_some());
1766
1767        // These should NOT match
1768        assert!(DeadServerIndicator::from_error_str("handler timed out").is_none());
1769        assert!(DeadServerIndicator::from_error_str("validation error").is_none());
1770        assert!(DeadServerIndicator::from_error_str("TypeError: undefined").is_none());
1771    }
1772
1773    #[test]
1774    fn test_is_dead_server_error_excludes_plugin_timeouts_with_connection() {
1775        // Plugin timeout should NOT be detected even if it mentions connection
1776        let plugin_timeout =
1777            PluginError::PluginExecutionError("plugin connection timed out".to_string());
1778        // This contains both "plugin" and "timed out" so it's excluded
1779        assert!(!PoolManager::is_dead_server_error(&plugin_timeout));
1780    }
1781
1782    #[test]
1783    fn test_is_dead_server_error_case_insensitive() {
1784        // Test case insensitivity
1785        let err = PluginError::PluginExecutionError("CONNECTION REFUSED".to_string());
1786        assert!(PoolManager::is_dead_server_error(&err));
1787
1788        let err = PluginError::PluginExecutionError("BROKEN PIPE".to_string());
1789        assert!(PoolManager::is_dead_server_error(&err));
1790
1791        let err = PluginError::PluginExecutionError("Connection Reset By Peer".to_string());
1792        assert!(PoolManager::is_dead_server_error(&err));
1793    }
1794
1795    #[test]
1796    fn test_is_dead_server_error_handler_timeout_variations() {
1797        // All variations of plugin/handler timeouts should NOT trigger restart
1798        let timeout_errors = vec![
1799            "Handler timed out",
1800            "handler timed out after 30000ms",
1801            "Plugin handler timed out",
1802            "plugin timed out",
1803            "Plugin execution timed out after 60s",
1804        ];
1805
1806        for error_msg in timeout_errors {
1807            let err = PluginError::PluginExecutionError(error_msg.to_string());
1808            assert!(
1809                !PoolManager::is_dead_server_error(&err),
1810                "Expected '{error_msg}' to NOT be detected as dead server error"
1811            );
1812        }
1813    }
1814
1815    #[test]
1816    fn test_is_dead_server_error_business_errors_not_detected() {
1817        // Business logic errors should not trigger restart
1818        let business_errors = vec![
1819            "ReferenceError: x is not defined",
1820            "SyntaxError: Unexpected token",
1821            "TypeError: Cannot read property 'foo' of undefined",
1822            "Plugin returned status 400: Bad Request",
1823            "Validation error: missing required field",
1824            "Authorization failed",
1825            "Rate limit exceeded",
1826            "Plugin threw an error: Invalid input",
1827        ];
1828
1829        for error_msg in business_errors {
1830            let err = PluginError::PluginExecutionError(error_msg.to_string());
1831            assert!(
1832                !PoolManager::is_dead_server_error(&err),
1833                "Expected '{error_msg}' to NOT be detected as dead server error"
1834            );
1835        }
1836    }
1837
1838    #[test]
1839    fn test_is_dead_server_error_with_handler_error_type() {
1840        // HandlerError type should also be checked
1841        let handler_payload = PluginHandlerPayload {
1842            message: "Connection refused".to_string(),
1843            status: 500,
1844            code: None,
1845            details: None,
1846            logs: None,
1847            traces: None,
1848        };
1849        let err = PluginError::HandlerError(Box::new(handler_payload));
1850        // The error message contains "Connection refused" but it's wrapped differently
1851        // This tests that we check the string representation
1852        assert!(PoolManager::is_dead_server_error(&err));
1853    }
1854
1855    // ============================================
1856    // Heap calculation tests
1857    // ============================================
1858
1859    #[test]
1860    fn test_heap_calculation_base_case() {
1861        // With default concurrency, should get base heap
1862        let base = PoolManager::BASE_HEAP_MB;
1863        let divisor = PoolManager::CONCURRENCY_DIVISOR;
1864        let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1865
1866        // For 100 concurrent requests:
1867        // 512 + (100 / 10) * 32 = 512 + 320 = 832 MB
1868        let concurrency = 100;
1869        let expected = base + ((concurrency / divisor) * increment);
1870        assert_eq!(expected, 832);
1871    }
1872
1873    #[test]
1874    fn test_heap_calculation_minimum() {
1875        // With very low concurrency, should still get base heap
1876        let base = PoolManager::BASE_HEAP_MB;
1877        let divisor = PoolManager::CONCURRENCY_DIVISOR;
1878        let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1879
1880        // For 5 concurrent requests:
1881        // 512 + (5 / 10) * 32 = 512 + 0 = 512 MB (integer division)
1882        let concurrency = 5;
1883        let expected = base + ((concurrency / divisor) * increment);
1884        assert_eq!(expected, 512);
1885    }
1886
1887    #[test]
1888    fn test_heap_calculation_high_concurrency() {
1889        // With high concurrency, should scale appropriately
1890        let base = PoolManager::BASE_HEAP_MB;
1891        let divisor = PoolManager::CONCURRENCY_DIVISOR;
1892        let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1893
1894        // For 500 concurrent requests:
1895        // 512 + (500 / 10) * 32 = 512 + 1600 = 2112 MB
1896        let concurrency = 500;
1897        let expected = base + ((concurrency / divisor) * increment);
1898        assert_eq!(expected, 2112);
1899    }
1900
1901    #[test]
1902    fn test_heap_calculation_max_cap() {
1903        // Verify max heap cap is respected
1904        let max_heap = PoolManager::MAX_HEAP_MB;
1905        assert_eq!(max_heap, 8192);
1906
1907        // For extreme concurrency that would exceed cap:
1908        // e.g., 3000 concurrent: 512 + (3000 / 10) * 32 = 512 + 9600 = 10112 MB
1909        // Should be capped to 8192 MB
1910        let base = PoolManager::BASE_HEAP_MB;
1911        let divisor = PoolManager::CONCURRENCY_DIVISOR;
1912        let increment = PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB;
1913
1914        let concurrency = 3000;
1915        let calculated = base + ((concurrency / divisor) * increment);
1916        let capped = calculated.min(max_heap);
1917
1918        assert_eq!(calculated, 10112);
1919        assert_eq!(capped, 8192);
1920    }
1921
1922    // ============================================
1923    // Constants verification tests
1924    // ============================================
1925
1926    #[test]
1927    fn test_pool_manager_constants() {
1928        // Verify important constants have reasonable values
1929        assert_eq!(PoolManager::BASE_HEAP_MB, 512);
1930        assert_eq!(PoolManager::CONCURRENCY_DIVISOR, 10);
1931        assert_eq!(PoolManager::HEAP_INCREMENT_PER_DIVISOR_MB, 32);
1932        assert_eq!(PoolManager::MAX_HEAP_MB, 8192);
1933    }
1934
1935    // ============================================
1936    // Extracted function tests: calculate_heap_size
1937    // ============================================
1938
1939    #[test]
1940    fn test_calculate_heap_size_low_concurrency() {
1941        // Low concurrency should give base heap
1942        assert_eq!(PoolManager::calculate_heap_size(5), 512);
1943        assert_eq!(PoolManager::calculate_heap_size(9), 512);
1944    }
1945
1946    #[test]
1947    fn test_calculate_heap_size_medium_concurrency() {
1948        // 10 concurrent: 512 + (10/10)*32 = 544
1949        assert_eq!(PoolManager::calculate_heap_size(10), 544);
1950        // 50 concurrent: 512 + (50/10)*32 = 672
1951        assert_eq!(PoolManager::calculate_heap_size(50), 672);
1952        // 100 concurrent: 512 + (100/10)*32 = 832
1953        assert_eq!(PoolManager::calculate_heap_size(100), 832);
1954    }
1955
1956    #[test]
1957    fn test_calculate_heap_size_high_concurrency() {
1958        // 500 concurrent: 512 + (500/10)*32 = 2112
1959        assert_eq!(PoolManager::calculate_heap_size(500), 2112);
1960        // 1000 concurrent: 512 + (1000/10)*32 = 3712
1961        assert_eq!(PoolManager::calculate_heap_size(1000), 3712);
1962    }
1963
1964    #[test]
1965    fn test_calculate_heap_size_capped_at_max() {
1966        // 3000 concurrent would be 10112, but capped at 8192
1967        assert_eq!(PoolManager::calculate_heap_size(3000), 8192);
1968        // Even higher should still be capped
1969        assert_eq!(PoolManager::calculate_heap_size(10000), 8192);
1970    }
1971
1972    #[test]
1973    fn test_calculate_heap_size_zero_concurrency() {
1974        // Zero concurrency gives base heap
1975        assert_eq!(PoolManager::calculate_heap_size(0), 512);
1976    }
1977
1978    // ============================================
1979    // Extracted function tests: format_return_value
1980    // ============================================
1981
1982    #[test]
1983    fn test_format_return_value_none() {
1984        assert_eq!(PoolManager::format_return_value(None), "");
1985    }
1986
1987    #[test]
1988    fn test_format_return_value_string() {
1989        let value = Some(serde_json::json!("hello world"));
1990        assert_eq!(PoolManager::format_return_value(value), "hello world");
1991    }
1992
1993    #[test]
1994    fn test_format_return_value_empty_string() {
1995        let value = Some(serde_json::json!(""));
1996        assert_eq!(PoolManager::format_return_value(value), "");
1997    }
1998
1999    #[test]
2000    fn test_format_return_value_object() {
2001        let value = Some(serde_json::json!({"key": "value", "num": 42}));
2002        let result = PoolManager::format_return_value(value);
2003        // JSON object gets serialized
2004        assert!(result.contains("key"));
2005        assert!(result.contains("value"));
2006        assert!(result.contains("42"));
2007    }
2008
2009    #[test]
2010    fn test_format_return_value_array() {
2011        let value = Some(serde_json::json!([1, 2, 3]));
2012        assert_eq!(PoolManager::format_return_value(value), "[1,2,3]");
2013    }
2014
2015    #[test]
2016    fn test_format_return_value_number() {
2017        let value = Some(serde_json::json!(42));
2018        assert_eq!(PoolManager::format_return_value(value), "42");
2019    }
2020
2021    #[test]
2022    fn test_format_return_value_boolean() {
2023        assert_eq!(
2024            PoolManager::format_return_value(Some(serde_json::json!(true))),
2025            "true"
2026        );
2027        assert_eq!(
2028            PoolManager::format_return_value(Some(serde_json::json!(false))),
2029            "false"
2030        );
2031    }
2032
2033    #[test]
2034    fn test_format_return_value_null() {
2035        let value = Some(serde_json::json!(null));
2036        assert_eq!(PoolManager::format_return_value(value), "null");
2037    }
2038
2039    // ============================================
2040    // Extracted function tests: parse_pool_response
2041    // ============================================
2042
2043    #[test]
2044    fn test_parse_pool_response_success_with_string_result() {
2045        use super::super::protocol::{PoolLogEntry, PoolResponse};
2046
2047        let response = PoolResponse {
2048            task_id: "test-123".to_string(),
2049            success: true,
2050            result: Some(serde_json::json!("success result")),
2051            error: None,
2052            logs: Some(vec![PoolLogEntry {
2053                level: "info".to_string(),
2054                message: "test log".to_string(),
2055            }]),
2056        };
2057
2058        let result = PoolManager::parse_pool_response(response).unwrap();
2059        assert_eq!(result.return_value, "success result");
2060        assert!(result.error.is_empty());
2061        assert_eq!(result.logs.len(), 1);
2062        assert_eq!(result.logs[0].level, LogLevel::Info);
2063        assert_eq!(result.logs[0].message, "test log");
2064    }
2065
2066    #[test]
2067    fn test_parse_pool_response_success_with_object_result() {
2068        use super::super::protocol::PoolResponse;
2069
2070        let response = PoolResponse {
2071            task_id: "test-456".to_string(),
2072            success: true,
2073            result: Some(serde_json::json!({"data": "value"})),
2074            error: None,
2075            logs: None,
2076        };
2077
2078        let result = PoolManager::parse_pool_response(response).unwrap();
2079        assert!(result.return_value.contains("data"));
2080        assert!(result.return_value.contains("value"));
2081        assert!(result.logs.is_empty());
2082    }
2083
2084    #[test]
2085    fn test_parse_pool_response_success_no_result() {
2086        use super::super::protocol::PoolResponse;
2087
2088        let response = PoolResponse {
2089            task_id: "test-789".to_string(),
2090            success: true,
2091            result: None,
2092            error: None,
2093            logs: None,
2094        };
2095
2096        let result = PoolManager::parse_pool_response(response).unwrap();
2097        assert_eq!(result.return_value, "");
2098        assert!(result.error.is_empty());
2099    }
2100
2101    #[test]
2102    fn test_parse_pool_response_failure_with_error() {
2103        use super::super::protocol::{PoolError, PoolResponse};
2104
2105        let response = PoolResponse {
2106            task_id: "test-error".to_string(),
2107            success: false,
2108            result: None,
2109            error: Some(PoolError {
2110                message: "Something went wrong".to_string(),
2111                code: Some("ERR_001".to_string()),
2112                status: Some(400),
2113                details: Some(serde_json::json!({"field": "name"})),
2114            }),
2115            logs: None,
2116        };
2117
2118        let err = PoolManager::parse_pool_response(response).unwrap_err();
2119        match err {
2120            PluginError::HandlerError(payload) => {
2121                assert_eq!(payload.message, "Something went wrong");
2122                assert_eq!(payload.status, 400);
2123                assert_eq!(payload.code, Some("ERR_001".to_string()));
2124            }
2125            _ => panic!("Expected HandlerError"),
2126        }
2127    }
2128
2129    #[test]
2130    fn test_parse_pool_response_failure_no_error_details() {
2131        use super::super::protocol::PoolResponse;
2132
2133        let response = PoolResponse {
2134            task_id: "test-unknown".to_string(),
2135            success: false,
2136            result: None,
2137            error: None,
2138            logs: None,
2139        };
2140
2141        let err = PoolManager::parse_pool_response(response).unwrap_err();
2142        match err {
2143            PluginError::HandlerError(payload) => {
2144                assert_eq!(payload.message, "Unknown error");
2145                assert_eq!(payload.status, 500);
2146            }
2147            _ => panic!("Expected HandlerError"),
2148        }
2149    }
2150
2151    #[test]
2152    fn test_parse_pool_response_failure_preserves_logs() {
2153        use super::super::protocol::{PoolError, PoolLogEntry, PoolResponse};
2154
2155        let response = PoolResponse {
2156            task_id: "test-logs".to_string(),
2157            success: false,
2158            result: None,
2159            error: Some(PoolError {
2160                message: "Error with logs".to_string(),
2161                code: None,
2162                status: None,
2163                details: None,
2164            }),
2165            logs: Some(vec![
2166                PoolLogEntry {
2167                    level: "debug".to_string(),
2168                    message: "debug message".to_string(),
2169                },
2170                PoolLogEntry {
2171                    level: "error".to_string(),
2172                    message: "error message".to_string(),
2173                },
2174            ]),
2175        };
2176
2177        let err = PoolManager::parse_pool_response(response).unwrap_err();
2178        match err {
2179            PluginError::HandlerError(payload) => {
2180                let logs = payload.logs.unwrap();
2181                assert_eq!(logs.len(), 2);
2182                assert_eq!(logs[0].level, LogLevel::Debug);
2183                assert_eq!(logs[1].level, LogLevel::Error);
2184            }
2185            _ => panic!("Expected HandlerError"),
2186        }
2187    }
2188
2189    // ============================================
2190    // Extracted function tests: parse_success_response
2191    // ============================================
2192
2193    #[test]
2194    fn test_parse_success_response_complete() {
2195        use super::super::protocol::{PoolLogEntry, PoolResponse};
2196
2197        let response = PoolResponse {
2198            task_id: "task-1".to_string(),
2199            success: true,
2200            result: Some(serde_json::json!("completed")),
2201            error: None,
2202            logs: Some(vec![
2203                PoolLogEntry {
2204                    level: "log".to_string(),
2205                    message: "starting".to_string(),
2206                },
2207                PoolLogEntry {
2208                    level: "result".to_string(),
2209                    message: "finished".to_string(),
2210                },
2211            ]),
2212        };
2213
2214        let result = PoolManager::parse_success_response(response);
2215        assert_eq!(result.return_value, "completed");
2216        assert!(result.error.is_empty());
2217        assert_eq!(result.logs.len(), 2);
2218        assert_eq!(result.logs[0].level, LogLevel::Log);
2219        assert_eq!(result.logs[1].level, LogLevel::Result);
2220    }
2221
2222    // ============================================
2223    // Extracted function tests: parse_error_response
2224    // ============================================
2225
2226    #[test]
2227    fn test_parse_error_response_with_all_fields() {
2228        use super::super::protocol::{PoolError, PoolLogEntry, PoolResponse};
2229
2230        let response = PoolResponse {
2231            task_id: "err-task".to_string(),
2232            success: false,
2233            result: None,
2234            error: Some(PoolError {
2235                message: "Validation failed".to_string(),
2236                code: Some("VALIDATION_ERROR".to_string()),
2237                status: Some(422),
2238                details: Some(serde_json::json!({"fields": ["email"]})),
2239            }),
2240            logs: Some(vec![PoolLogEntry {
2241                level: "warn".to_string(),
2242                message: "validation warning".to_string(),
2243            }]),
2244        };
2245
2246        let err = PoolManager::parse_error_response(response);
2247        match err {
2248            PluginError::HandlerError(payload) => {
2249                assert_eq!(payload.message, "Validation failed");
2250                assert_eq!(payload.status, 422);
2251                assert_eq!(payload.code, Some("VALIDATION_ERROR".to_string()));
2252                assert!(payload.details.is_some());
2253                let logs = payload.logs.unwrap();
2254                assert_eq!(logs.len(), 1);
2255                assert_eq!(logs[0].level, LogLevel::Warn);
2256            }
2257            _ => panic!("Expected HandlerError"),
2258        }
2259    }
2260
2261    // ============================================
2262    // Extracted function tests: parse_health_result
2263    // ============================================
2264
2265    #[test]
2266    fn test_parse_health_result_complete() {
2267        let json = serde_json::json!({
2268            "status": "healthy",
2269            "uptime": 123456,
2270            "memory": {
2271                "heapUsed": 50000000,
2272                "heapTotal": 100000000
2273            },
2274            "pool": {
2275                "completed": 1000,
2276                "queued": 5
2277            },
2278            "execution": {
2279                "successRate": 0.99
2280            }
2281        });
2282
2283        let result = PoolManager::parse_health_result(&json);
2284
2285        assert_eq!(result.status, "healthy");
2286        assert_eq!(result.uptime_ms, Some(123456));
2287        assert_eq!(result.memory, Some(50000000));
2288        assert_eq!(result.pool_completed, Some(1000));
2289        assert_eq!(result.pool_queued, Some(5));
2290        assert!((result.success_rate.unwrap() - 0.99).abs() < 0.001);
2291    }
2292
2293    #[test]
2294    fn test_parse_health_result_minimal() {
2295        let json = serde_json::json!({});
2296
2297        let result = PoolManager::parse_health_result(&json);
2298
2299        assert_eq!(result.status, "unknown");
2300        assert_eq!(result.uptime_ms, None);
2301        assert_eq!(result.memory, None);
2302        assert_eq!(result.pool_completed, None);
2303        assert_eq!(result.pool_queued, None);
2304        assert_eq!(result.success_rate, None);
2305    }
2306
2307    #[test]
2308    fn test_parse_health_result_partial() {
2309        let json = serde_json::json!({
2310            "status": "degraded",
2311            "uptime": 5000,
2312            "memory": {
2313                "heapTotal": 100000000
2314                // heapUsed missing
2315            }
2316        });
2317
2318        let result = PoolManager::parse_health_result(&json);
2319
2320        assert_eq!(result.status, "degraded");
2321        assert_eq!(result.uptime_ms, Some(5000));
2322        assert_eq!(result.memory, None); // heapUsed was missing
2323        assert_eq!(result.pool_completed, None);
2324        assert_eq!(result.pool_queued, None);
2325        assert_eq!(result.success_rate, None);
2326    }
2327
2328    #[test]
2329    fn test_parse_health_result_wrong_types() {
2330        let json = serde_json::json!({
2331            "status": 123,  // Should be string, will use "unknown"
2332            "uptime": "not a number",  // Should be u64, will be None
2333            "memory": "invalid"  // Should be object, will give None
2334        });
2335
2336        let result = PoolManager::parse_health_result(&json);
2337
2338        assert_eq!(result.status, "unknown"); // Falls back when not a string
2339        assert_eq!(result.uptime_ms, None);
2340        assert_eq!(result.memory, None);
2341        assert_eq!(result.pool_completed, None);
2342        assert_eq!(result.pool_queued, None);
2343        assert_eq!(result.success_rate, None);
2344    }
2345
2346    #[test]
2347    fn test_parse_health_result_nested_values() {
2348        let json = serde_json::json!({
2349            "pool": {
2350                "completed": 0,
2351                "queued": 0
2352            },
2353            "execution": {
2354                "successRate": 1.0
2355            }
2356        });
2357
2358        let result = PoolManager::parse_health_result(&json);
2359
2360        assert_eq!(result.status, "unknown");
2361        assert_eq!(result.uptime_ms, None);
2362        assert_eq!(result.memory, None);
2363        assert_eq!(result.pool_completed, Some(0));
2364        assert_eq!(result.pool_queued, Some(0));
2365        assert!((result.success_rate.unwrap() - 1.0).abs() < 0.001);
2366    }
2367
2368    // ============================================
2369    // PoolManager creation tests
2370    // ============================================
2371
2372    #[tokio::test]
2373    async fn test_pool_manager_new_creates_unique_socket_path() {
2374        // Two PoolManagers should have different socket paths
2375        let manager1 = PoolManager::new();
2376        let manager2 = PoolManager::new();
2377
2378        assert_ne!(manager1.socket_path, manager2.socket_path);
2379        assert!(manager1
2380            .socket_path
2381            .starts_with("/tmp/relayer-plugin-pool-"));
2382        assert!(manager2
2383            .socket_path
2384            .starts_with("/tmp/relayer-plugin-pool-"));
2385    }
2386
2387    #[tokio::test]
2388    async fn test_pool_manager_with_custom_socket_path() {
2389        let custom_path = "/tmp/custom-test-pool.sock".to_string();
2390        let manager = PoolManager::with_socket_path(custom_path.clone());
2391
2392        assert_eq!(manager.socket_path, custom_path);
2393    }
2394
2395    #[tokio::test]
2396    async fn test_pool_manager_default_trait() {
2397        // Verify Default trait creates a valid manager
2398        let manager = PoolManager::default();
2399        assert!(manager.socket_path.starts_with("/tmp/relayer-plugin-pool-"));
2400    }
2401
2402    // ============================================
2403    // Circuit breaker state tests
2404    // ============================================
2405
2406    #[tokio::test]
2407    async fn test_circuit_state_initial() {
2408        let manager = PoolManager::new();
2409
2410        // Initial state should be Closed
2411        assert_eq!(manager.circuit_state(), CircuitState::Closed);
2412    }
2413
2414    #[tokio::test]
2415    async fn test_avg_response_time_initial() {
2416        let manager = PoolManager::new();
2417
2418        // Initial response time should be 0
2419        assert_eq!(manager.avg_response_time_ms(), 0);
2420    }
2421
2422    // ============================================
2423    // Recovery mode tests
2424    // ============================================
2425
2426    #[tokio::test]
2427    async fn test_recovery_mode_initial() {
2428        let manager = PoolManager::new();
2429
2430        // Should not be in recovery mode initially
2431        assert!(!manager.is_recovering());
2432        assert_eq!(manager.recovery_allowance_percent(), 0);
2433    }
2434
2435    // ============================================
2436    // ScriptResult construction tests
2437    // ============================================
2438
2439    #[test]
2440    fn test_script_result_success_construction() {
2441        let result = ScriptResult {
2442            logs: vec![LogEntry {
2443                level: LogLevel::Info,
2444                message: "Test log".to_string(),
2445            }],
2446            error: String::new(),
2447            return_value: r#"{"success": true}"#.to_string(),
2448            trace: vec![],
2449        };
2450
2451        assert!(result.error.is_empty());
2452        assert_eq!(result.logs.len(), 1);
2453        assert_eq!(result.logs[0].level, LogLevel::Info);
2454    }
2455
2456    #[test]
2457    fn test_script_result_with_multiple_logs() {
2458        let result = ScriptResult {
2459            logs: vec![
2460                LogEntry {
2461                    level: LogLevel::Log,
2462                    message: "Starting execution".to_string(),
2463                },
2464                LogEntry {
2465                    level: LogLevel::Debug,
2466                    message: "Processing data".to_string(),
2467                },
2468                LogEntry {
2469                    level: LogLevel::Warn,
2470                    message: "Deprecated API used".to_string(),
2471                },
2472                LogEntry {
2473                    level: LogLevel::Error,
2474                    message: "Non-fatal error".to_string(),
2475                },
2476            ],
2477            error: String::new(),
2478            return_value: "done".to_string(),
2479            trace: vec![],
2480        };
2481
2482        assert_eq!(result.logs.len(), 4);
2483        assert_eq!(result.logs[0].level, LogLevel::Log);
2484        assert_eq!(result.logs[1].level, LogLevel::Debug);
2485        assert_eq!(result.logs[2].level, LogLevel::Warn);
2486        assert_eq!(result.logs[3].level, LogLevel::Error);
2487    }
2488
2489    // ============================================
2490    // QueuedRequest structure tests
2491    // ============================================
2492
2493    #[test]
2494    fn test_queued_request_required_fields() {
2495        let (tx, _rx) = oneshot::channel();
2496
2497        let request = QueuedRequest {
2498            plugin_id: "test-plugin".to_string(),
2499            compiled_code: Some("module.exports.handler = () => {}".to_string()),
2500            plugin_path: None,
2501            params: serde_json::json!({"key": "value"}),
2502            headers: None,
2503            socket_path: "/tmp/test.sock".to_string(),
2504            http_request_id: Some("req-123".to_string()),
2505            timeout_secs: Some(30),
2506            route: Some("/api/test".to_string()),
2507            config: Some(serde_json::json!({"setting": true})),
2508            method: Some("POST".to_string()),
2509            query: Some(serde_json::json!({"page": "1"})),
2510            response_tx: tx,
2511        };
2512
2513        assert_eq!(request.plugin_id, "test-plugin");
2514        assert!(request.compiled_code.is_some());
2515        assert!(request.plugin_path.is_none());
2516        assert_eq!(request.timeout_secs, Some(30));
2517    }
2518
2519    #[test]
2520    fn test_queued_request_minimal() {
2521        let (tx, _rx) = oneshot::channel();
2522
2523        let request = QueuedRequest {
2524            plugin_id: "minimal".to_string(),
2525            compiled_code: None,
2526            plugin_path: Some("/path/to/plugin.ts".to_string()),
2527            params: serde_json::json!(null),
2528            headers: None,
2529            socket_path: "/tmp/min.sock".to_string(),
2530            http_request_id: None,
2531            timeout_secs: None,
2532            route: None,
2533            config: None,
2534            method: None,
2535            query: None,
2536            response_tx: tx,
2537        };
2538
2539        assert_eq!(request.plugin_id, "minimal");
2540        assert!(request.compiled_code.is_none());
2541        assert!(request.plugin_path.is_some());
2542    }
2543
2544    // ============================================
2545    // Error type tests
2546    // ============================================
2547
2548    #[test]
2549    fn test_plugin_error_socket_error() {
2550        let err = PluginError::SocketError("Connection failed".to_string());
2551        let display = format!("{err}");
2552        assert!(display.contains("Socket error"));
2553        assert!(display.contains("Connection failed"));
2554    }
2555
2556    #[test]
2557    fn test_plugin_error_plugin_execution_error() {
2558        let err = PluginError::PluginExecutionError("Execution failed".to_string());
2559        let display = format!("{err}");
2560        assert!(display.contains("Execution failed"));
2561    }
2562
2563    #[test]
2564    fn test_plugin_error_handler_error() {
2565        let payload = PluginHandlerPayload {
2566            message: "Handler error".to_string(),
2567            status: 400,
2568            code: Some("BAD_REQUEST".to_string()),
2569            details: Some(serde_json::json!({"field": "name"})),
2570            logs: None,
2571            traces: None,
2572        };
2573        let err = PluginError::HandlerError(Box::new(payload));
2574
2575        // Check that it can be displayed
2576        let display = format!("{err:?}");
2577        assert!(display.contains("HandlerError"));
2578    }
2579
2580    // ============================================
2581    // Handler payload tests
2582    // ============================================
2583
2584    #[test]
2585    fn test_plugin_handler_payload_full() {
2586        let payload = PluginHandlerPayload {
2587            message: "Validation failed".to_string(),
2588            status: 422,
2589            code: Some("VALIDATION_ERROR".to_string()),
2590            details: Some(serde_json::json!({
2591                "errors": [
2592                    {"field": "email", "message": "Invalid format"}
2593                ]
2594            })),
2595            logs: Some(vec![LogEntry {
2596                level: LogLevel::Error,
2597                message: "Validation failed for email".to_string(),
2598            }]),
2599            traces: Some(vec![serde_json::json!({"stack": "Error at line 10"})]),
2600        };
2601
2602        assert_eq!(payload.status, 422);
2603        assert_eq!(payload.code, Some("VALIDATION_ERROR".to_string()));
2604        assert!(payload.logs.is_some());
2605        assert!(payload.traces.is_some());
2606    }
2607
2608    #[test]
2609    fn test_plugin_handler_payload_minimal() {
2610        let payload = PluginHandlerPayload {
2611            message: "Error".to_string(),
2612            status: 500,
2613            code: None,
2614            details: None,
2615            logs: None,
2616            traces: None,
2617        };
2618
2619        assert_eq!(payload.status, 500);
2620        assert!(payload.code.is_none());
2621        assert!(payload.details.is_none());
2622    }
2623
2624    // ============================================
2625    // Async tests (tokio runtime)
2626    // ============================================
2627
2628    #[tokio::test]
2629    async fn test_pool_manager_not_initialized_health_check() {
2630        let manager = PoolManager::with_socket_path("/tmp/test-health.sock".to_string());
2631
2632        // Health check on uninitialized manager should return not_initialized
2633        let health = manager.health_check().await.unwrap();
2634
2635        assert!(!health.healthy);
2636        assert_eq!(health.status, "not_initialized");
2637        assert!(health.uptime_ms.is_none());
2638        assert!(health.memory.is_none());
2639    }
2640
2641    #[tokio::test]
2642    async fn test_pool_manager_circuit_info_in_health_status() {
2643        let manager = PoolManager::with_socket_path("/tmp/test-circuit.sock".to_string());
2644
2645        let health = manager.health_check().await.unwrap();
2646
2647        // Circuit state info should be present even when not initialized
2648        assert!(health.circuit_state.is_some());
2649        assert_eq!(health.circuit_state, Some("closed".to_string()));
2650        assert!(health.avg_response_time_ms.is_some());
2651        assert!(health.recovering.is_some());
2652        assert!(health.recovery_percent.is_some());
2653    }
2654
2655    #[tokio::test]
2656    async fn test_invalidate_plugin_when_not_initialized() {
2657        let manager = PoolManager::with_socket_path("/tmp/test-invalidate.sock".to_string());
2658
2659        // Invalidating when not initialized should be a no-op
2660        let result = manager.invalidate_plugin("test-plugin".to_string()).await;
2661
2662        // Should succeed (no-op)
2663        assert!(result.is_ok());
2664    }
2665
2666    #[tokio::test]
2667    async fn test_shutdown_when_not_initialized() {
2668        let manager = PoolManager::with_socket_path("/tmp/test-shutdown.sock".to_string());
2669
2670        // Shutdown when not initialized should be a no-op
2671        let result = manager.shutdown().await;
2672
2673        // Should succeed (no-op)
2674        assert!(result.is_ok());
2675    }
2676
2677    // ============================================
2678    // Additional ParsedHealthResult tests
2679    // ============================================
2680
2681    #[test]
2682    fn test_parsed_health_result_default() {
2683        let result = ParsedHealthResult::default();
2684        assert_eq!(result.status, "");
2685        assert_eq!(result.uptime_ms, None);
2686        assert_eq!(result.memory, None);
2687        assert_eq!(result.pool_completed, None);
2688        assert_eq!(result.pool_queued, None);
2689        assert_eq!(result.success_rate, None);
2690    }
2691
2692    #[test]
2693    fn test_parsed_health_result_equality() {
2694        let result1 = ParsedHealthResult {
2695            status: "ok".to_string(),
2696            uptime_ms: Some(1000),
2697            memory: Some(500000),
2698            pool_completed: Some(50),
2699            pool_queued: Some(2),
2700            success_rate: Some(1.0),
2701        };
2702        let result2 = ParsedHealthResult {
2703            status: "ok".to_string(),
2704            uptime_ms: Some(1000),
2705            memory: Some(500000),
2706            pool_completed: Some(50),
2707            pool_queued: Some(2),
2708            success_rate: Some(1.0),
2709        };
2710        assert_eq!(result1, result2);
2711    }
2712
2713    #[test]
2714    fn test_format_return_value_nested_object() {
2715        let value = Some(serde_json::json!({
2716            "user": { "name": "John", "age": 30 }
2717        }));
2718        let result = PoolManager::format_return_value(value);
2719        assert!(result.contains("John"));
2720        assert!(result.contains("30"));
2721    }
2722
2723    #[test]
2724    fn test_format_return_value_empty_collections() {
2725        let value = Some(serde_json::json!({}));
2726        assert_eq!(PoolManager::format_return_value(value), "{}");
2727        let value = Some(serde_json::json!([]));
2728        assert_eq!(PoolManager::format_return_value(value), "[]");
2729    }
2730
2731    #[test]
2732    fn test_parse_health_result_zero_values() {
2733        let json = serde_json::json!({
2734            "status": "starting",
2735            "uptime": 0,
2736            "memory": { "heapUsed": 0 },
2737            "pool": { "completed": 0, "queued": 0 },
2738            "execution": { "successRate": 0.0 }
2739        });
2740        let result = PoolManager::parse_health_result(&json);
2741        assert_eq!(result.status, "starting");
2742        assert_eq!(result.uptime_ms, Some(0));
2743        assert_eq!(result.memory, Some(0));
2744        assert_eq!(result.pool_completed, Some(0));
2745        assert_eq!(result.pool_queued, Some(0));
2746        assert_eq!(result.success_rate, Some(0.0));
2747    }
2748
2749    #[test]
2750    fn test_calculate_heap_size_precise_calculations() {
2751        assert_eq!(PoolManager::calculate_heap_size(0), 512);
2752        assert_eq!(PoolManager::calculate_heap_size(1), 512);
2753        assert_eq!(PoolManager::calculate_heap_size(10), 544);
2754        assert_eq!(PoolManager::calculate_heap_size(20), 576);
2755        assert_eq!(PoolManager::calculate_heap_size(100), 832);
2756        assert_eq!(PoolManager::calculate_heap_size(200), 1152);
2757    }
2758
2759    #[tokio::test]
2760    async fn test_pool_manager_health_check_flag_initial() {
2761        let manager = PoolManager::new();
2762        assert!(!manager.health_check_needed.load(Ordering::Relaxed));
2763    }
2764
2765    #[tokio::test]
2766    async fn test_pool_manager_consecutive_failures_initial() {
2767        let manager = PoolManager::new();
2768        assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 0);
2769    }
2770
2771    #[tokio::test]
2772    async fn test_recovery_allowance_bounds() {
2773        let manager = PoolManager::new();
2774        manager.recovery_allowance.store(0, Ordering::Relaxed);
2775        assert_eq!(manager.recovery_allowance_percent(), 0);
2776        manager.recovery_allowance.store(50, Ordering::Relaxed);
2777        assert_eq!(manager.recovery_allowance_percent(), 50);
2778        manager.recovery_allowance.store(100, Ordering::Relaxed);
2779        assert_eq!(manager.recovery_allowance_percent(), 100);
2780    }
2781
2782    #[tokio::test]
2783    async fn test_is_initialized_changes_with_state() {
2784        let manager = PoolManager::with_socket_path("/tmp/init-test-123.sock".to_string());
2785        assert!(!manager.is_initialized().await);
2786        manager.initialized.store(true, Ordering::Release);
2787        assert!(manager.is_initialized().await);
2788        manager.initialized.store(false, Ordering::Release);
2789        assert!(!manager.is_initialized().await);
2790    }
2791
2792    // ============================================
2793    // Additional edge case tests for coverage
2794    // ============================================
2795
2796    #[test]
2797    fn test_is_dead_server_error_with_script_timeout() {
2798        // ScriptTimeout should NOT be a dead server error
2799        let err = PluginError::ScriptTimeout(30);
2800        assert!(!PoolManager::is_dead_server_error(&err));
2801    }
2802
2803    #[test]
2804    fn test_is_dead_server_error_with_plugin_error() {
2805        let err = PluginError::PluginError("some plugin error".to_string());
2806        assert!(!PoolManager::is_dead_server_error(&err));
2807    }
2808
2809    #[test]
2810    fn test_is_dead_server_error_with_connection_timeout_in_plugin_error() {
2811        // Note: When "connection timed out" is wrapped in PluginExecutionError,
2812        // the Display output includes "Plugin" which triggers the exclusion
2813        // for (plugin + timed out). This is expected behavior to prevent
2814        // plugin execution timeouts from triggering restarts.
2815        let err = PluginError::PluginExecutionError("connection timed out".to_string());
2816        // The error string becomes something like "Plugin execution error: connection timed out"
2817        // which contains "plugin" AND "timed out", so it's excluded
2818        assert!(!PoolManager::is_dead_server_error(&err));
2819
2820        // SocketError doesn't add "Plugin" to the display, so connection issues there
2821        // would be detected correctly
2822        let err = PluginError::SocketError("connect timed out".to_string());
2823        assert!(PoolManager::is_dead_server_error(&err));
2824    }
2825
2826    #[test]
2827    fn test_parse_pool_response_success_with_logs_various_levels() {
2828        use super::super::protocol::{PoolLogEntry, PoolResponse};
2829
2830        let response = PoolResponse {
2831            task_id: "test-levels".to_string(),
2832            success: true,
2833            result: Some(serde_json::json!("ok")),
2834            error: None,
2835            logs: Some(vec![
2836                PoolLogEntry {
2837                    level: "log".to_string(),
2838                    message: "log level".to_string(),
2839                },
2840                PoolLogEntry {
2841                    level: "debug".to_string(),
2842                    message: "debug level".to_string(),
2843                },
2844                PoolLogEntry {
2845                    level: "info".to_string(),
2846                    message: "info level".to_string(),
2847                },
2848                PoolLogEntry {
2849                    level: "warn".to_string(),
2850                    message: "warn level".to_string(),
2851                },
2852                PoolLogEntry {
2853                    level: "error".to_string(),
2854                    message: "error level".to_string(),
2855                },
2856                PoolLogEntry {
2857                    level: "result".to_string(),
2858                    message: "result level".to_string(),
2859                },
2860            ]),
2861        };
2862
2863        let result = PoolManager::parse_pool_response(response).unwrap();
2864        assert_eq!(result.logs.len(), 6);
2865        assert_eq!(result.logs[0].level, LogLevel::Log);
2866        assert_eq!(result.logs[1].level, LogLevel::Debug);
2867        assert_eq!(result.logs[2].level, LogLevel::Info);
2868        assert_eq!(result.logs[3].level, LogLevel::Warn);
2869        assert_eq!(result.logs[4].level, LogLevel::Error);
2870        assert_eq!(result.logs[5].level, LogLevel::Result);
2871    }
2872
2873    #[test]
2874    fn test_parse_error_response_defaults() {
2875        use super::super::protocol::PoolResponse;
2876
2877        // Response with no error field at all
2878        let response = PoolResponse {
2879            task_id: "no-error".to_string(),
2880            success: false,
2881            result: None,
2882            error: None,
2883            logs: None,
2884        };
2885
2886        let err = PoolManager::parse_error_response(response);
2887        match err {
2888            PluginError::HandlerError(payload) => {
2889                assert_eq!(payload.message, "Unknown error");
2890                assert_eq!(payload.status, 500);
2891                assert!(payload.code.is_none());
2892                assert!(payload.details.is_none());
2893            }
2894            _ => panic!("Expected HandlerError"),
2895        }
2896    }
2897
2898    #[test]
2899    fn test_format_return_value_float() {
2900        let value = Some(serde_json::json!(3.14159));
2901        let result = PoolManager::format_return_value(value);
2902        assert!(result.contains("3.14159"));
2903    }
2904
2905    #[test]
2906    fn test_format_return_value_large_array() {
2907        let value = Some(serde_json::json!([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
2908        let result = PoolManager::format_return_value(value);
2909        assert_eq!(result, "[1,2,3,4,5,6,7,8,9,10]");
2910    }
2911
2912    #[test]
2913    fn test_format_return_value_string_with_special_chars() {
2914        let value = Some(serde_json::json!("hello\nworld\ttab"));
2915        assert_eq!(PoolManager::format_return_value(value), "hello\nworld\ttab");
2916    }
2917
2918    #[test]
2919    fn test_format_return_value_unicode() {
2920        let value = Some(serde_json::json!("こんにちは世界 🌍"));
2921        assert_eq!(PoolManager::format_return_value(value), "こんにちは世界 🌍");
2922    }
2923
2924    #[test]
2925    fn test_parse_health_result_large_values() {
2926        let json = serde_json::json!({
2927            "status": "healthy",
2928            "uptime": 999999999999_u64,
2929            "memory": { "heapUsed": 9999999999_u64 },
2930            "pool": { "completed": 999999999_u64, "queued": 999999_u64 },
2931            "execution": { "successRate": 0.999999 }
2932        });
2933
2934        let result = PoolManager::parse_health_result(&json);
2935        assert_eq!(result.status, "healthy");
2936        assert_eq!(result.uptime_ms, Some(999999999999));
2937        assert_eq!(result.memory, Some(9999999999));
2938        assert_eq!(result.pool_completed, Some(999999999));
2939        assert_eq!(result.pool_queued, Some(999999));
2940        assert!((result.success_rate.unwrap() - 0.999999).abs() < 0.0000001);
2941    }
2942
2943    #[test]
2944    fn test_parse_health_result_negative_values_treated_as_none() {
2945        // JSON doesn't have unsigned, so negative values won't parse as u64
2946        let json = serde_json::json!({
2947            "status": "error",
2948            "uptime": -1,
2949            "memory": { "heapUsed": -100 }
2950        });
2951
2952        let result = PoolManager::parse_health_result(&json);
2953        assert_eq!(result.status, "error");
2954        assert_eq!(result.uptime_ms, None); // -1 can't be u64
2955        assert_eq!(result.memory, None);
2956    }
2957
2958    #[test]
2959    fn test_parsed_health_result_debug() {
2960        let result = ParsedHealthResult {
2961            status: "test".to_string(),
2962            uptime_ms: Some(100),
2963            memory: Some(200),
2964            pool_completed: Some(50),
2965            pool_queued: Some(5),
2966            success_rate: Some(0.95),
2967        };
2968
2969        let debug_str = format!("{result:?}");
2970        assert!(debug_str.contains("test"));
2971        assert!(debug_str.contains("100"));
2972        assert!(debug_str.contains("200"));
2973    }
2974
2975    #[test]
2976    fn test_calculate_heap_size_boundary_values() {
2977        // Test at exact boundaries
2978        // 9 should give base (9/10 = 0)
2979        assert_eq!(PoolManager::calculate_heap_size(9), 512);
2980        // 10 should give base + 32 (10/10 = 1)
2981        assert_eq!(PoolManager::calculate_heap_size(10), 544);
2982
2983        // Test boundary where cap kicks in
2984        // 2400 would be: 512 + (240 * 32) = 512 + 7680 = 8192 (at cap)
2985        assert_eq!(PoolManager::calculate_heap_size(2400), 8192);
2986        // 2399 would be: 512 + (239 * 32) = 512 + 7648 = 8160 (under cap)
2987        assert_eq!(PoolManager::calculate_heap_size(2399), 8160);
2988    }
2989
2990    #[tokio::test]
2991    async fn test_pool_manager_socket_path_format() {
2992        let manager = PoolManager::new();
2993        // Should contain UUID format
2994        assert!(manager.socket_path.starts_with("/tmp/relayer-plugin-pool-"));
2995        assert!(manager.socket_path.ends_with(".sock"));
2996        // UUID is 36 chars (32 hex + 4 dashes)
2997        let uuid_part = manager
2998            .socket_path
2999            .strip_prefix("/tmp/relayer-plugin-pool-")
3000            .unwrap()
3001            .strip_suffix(".sock")
3002            .unwrap();
3003        assert_eq!(uuid_part.len(), 36);
3004    }
3005
3006    #[tokio::test]
3007    async fn test_health_check_socket_missing() {
3008        let manager =
3009            PoolManager::with_socket_path("/tmp/nonexistent-socket-12345.sock".to_string());
3010        // Mark as initialized but socket doesn't exist
3011        manager.initialized.store(true, Ordering::Release);
3012
3013        let health = manager.health_check().await.unwrap();
3014        assert!(!health.healthy);
3015        assert_eq!(health.status, "socket_missing");
3016    }
3017
3018    #[test]
3019    fn test_is_dead_server_error_embedded_patterns() {
3020        // Patterns embedded in longer messages
3021        let err = PluginError::PluginExecutionError(
3022            "Error: ECONNREFUSED connection refused at 127.0.0.1:3000".to_string(),
3023        );
3024        assert!(PoolManager::is_dead_server_error(&err));
3025
3026        let err = PluginError::PluginExecutionError(
3027            "SocketError: broken pipe while writing to /tmp/socket".to_string(),
3028        );
3029        assert!(PoolManager::is_dead_server_error(&err));
3030
3031        let err = PluginError::PluginExecutionError(
3032            "IO Error: No such file or directory (os error 2)".to_string(),
3033        );
3034        assert!(PoolManager::is_dead_server_error(&err));
3035    }
3036
3037    #[test]
3038    fn test_is_dead_server_error_mixed_case_timeout_patterns() {
3039        // Handler timeout variants - none should be dead server errors
3040        let variants = vec![
3041            "HANDLER TIMED OUT",
3042            "Handler Timed Out after 30s",
3043            "the handler timed out waiting for response",
3044        ];
3045
3046        for msg in variants {
3047            let err = PluginError::PluginExecutionError(msg.to_string());
3048            assert!(
3049                !PoolManager::is_dead_server_error(&err),
3050                "Expected '{msg}' to NOT be dead server error"
3051            );
3052        }
3053    }
3054
3055    #[tokio::test]
3056    async fn test_ensure_started_idempotent() {
3057        let manager = PoolManager::with_socket_path("/tmp/idempotent-test-999.sock".to_string());
3058
3059        // First call when not initialized
3060        assert!(!manager.is_initialized().await);
3061
3062        // Manually set initialized without actually starting (for test)
3063        manager.initialized.store(true, Ordering::Release);
3064
3065        // ensure_started should return immediately
3066        let result = manager.ensure_started().await;
3067        assert!(result.is_ok());
3068        assert!(manager.is_initialized().await);
3069    }
3070
3071    #[test]
3072    fn test_queued_request_with_headers() {
3073        let (tx, _rx) = oneshot::channel();
3074
3075        let mut headers = HashMap::new();
3076        headers.insert(
3077            "Authorization".to_string(),
3078            vec!["Bearer token".to_string()],
3079        );
3080        headers.insert(
3081            "Content-Type".to_string(),
3082            vec!["application/json".to_string()],
3083        );
3084
3085        let request = QueuedRequest {
3086            plugin_id: "headers-test".to_string(),
3087            compiled_code: None,
3088            plugin_path: Some("/path/to/plugin.ts".to_string()),
3089            params: serde_json::json!({}),
3090            headers: Some(headers),
3091            socket_path: "/tmp/test.sock".to_string(),
3092            http_request_id: None,
3093            timeout_secs: None,
3094            route: None,
3095            config: None,
3096            method: None,
3097            query: None,
3098            response_tx: tx,
3099        };
3100
3101        assert!(request.headers.is_some());
3102        let headers = request.headers.unwrap();
3103        assert!(headers.contains_key("Authorization"));
3104        assert!(headers.contains_key("Content-Type"));
3105    }
3106
3107    #[test]
3108    fn test_plugin_error_display_formats() {
3109        // Test all PluginError variants have proper Display implementations
3110        let err = PluginError::SocketError("test socket error".to_string());
3111        assert!(format!("{err}").contains("Socket error"));
3112
3113        let err = PluginError::PluginExecutionError("test execution error".to_string());
3114        assert!(format!("{err}").contains("test execution error"));
3115
3116        let err = PluginError::ScriptTimeout(60);
3117        assert!(format!("{err}").contains("60"));
3118
3119        let err = PluginError::PluginError("test plugin error".to_string());
3120        assert!(format!("{err}").contains("test plugin error"));
3121    }
3122
3123    #[test]
3124    fn test_pool_log_entry_to_log_entry_conversion() {
3125        use super::super::protocol::PoolLogEntry;
3126
3127        // Test the From<PoolLogEntry> for LogEntry conversion
3128        let pool_log = PoolLogEntry {
3129            level: "info".to_string(),
3130            message: "test message".to_string(),
3131        };
3132
3133        let log_entry: LogEntry = pool_log.into();
3134        assert_eq!(log_entry.level, LogLevel::Info);
3135        assert_eq!(log_entry.message, "test message");
3136
3137        // Test unknown level defaults
3138        let pool_log = PoolLogEntry {
3139            level: "unknown_level".to_string(),
3140            message: "unknown level message".to_string(),
3141        };
3142
3143        let log_entry: LogEntry = pool_log.into();
3144        assert_eq!(log_entry.level, LogLevel::Log); // Should default to Log
3145    }
3146
3147    #[tokio::test]
3148    async fn test_circuit_breaker_records_success() {
3149        let manager = PoolManager::new();
3150
3151        // Record some successes
3152        manager.circuit_breaker.record_success(100);
3153        manager.circuit_breaker.record_success(150);
3154        manager.circuit_breaker.record_success(200);
3155
3156        // Average should be calculated
3157        let avg = manager.avg_response_time_ms();
3158        assert!(avg > 0);
3159    }
3160
3161    #[tokio::test]
3162    async fn test_circuit_breaker_state_transitions() {
3163        let manager = PoolManager::new();
3164
3165        // Initial state is Closed
3166        assert_eq!(manager.circuit_state(), CircuitState::Closed);
3167
3168        // Record many failures to potentially trip the breaker
3169        for _ in 0..20 {
3170            manager.circuit_breaker.record_failure();
3171        }
3172
3173        // State might have changed (depends on thresholds)
3174        let state = manager.circuit_state();
3175        assert!(matches!(
3176            state,
3177            CircuitState::Closed | CircuitState::HalfOpen | CircuitState::Open
3178        ));
3179    }
3180
3181    #[tokio::test]
3182    async fn test_recovery_mode_activation() {
3183        let manager = PoolManager::new();
3184
3185        // Manually activate recovery mode
3186        manager.recovery_allowance.store(10, Ordering::Relaxed);
3187        manager.recovery_mode.store(true, Ordering::Relaxed);
3188
3189        assert!(manager.is_recovering());
3190        assert_eq!(manager.recovery_allowance_percent(), 10);
3191
3192        // Increase allowance
3193        manager.recovery_allowance.store(50, Ordering::Relaxed);
3194        assert_eq!(manager.recovery_allowance_percent(), 50);
3195
3196        // Exit recovery mode
3197        manager.recovery_mode.store(false, Ordering::Relaxed);
3198        assert!(!manager.is_recovering());
3199    }
3200
3201    #[test]
3202    fn test_parse_pool_response_with_empty_logs() {
3203        use super::super::protocol::PoolResponse;
3204
3205        let response = PoolResponse {
3206            task_id: "empty-logs".to_string(),
3207            success: true,
3208            result: Some(serde_json::json!("done")),
3209            error: None,
3210            logs: Some(vec![]), // Empty logs array
3211        };
3212
3213        let result = PoolManager::parse_pool_response(response).unwrap();
3214        assert!(result.logs.is_empty());
3215        assert_eq!(result.return_value, "done");
3216    }
3217
3218    #[test]
3219    fn test_handler_payload_with_complex_details() {
3220        let payload = PluginHandlerPayload {
3221            message: "Complex error".to_string(),
3222            status: 400,
3223            code: Some("VALIDATION_ERROR".to_string()),
3224            details: Some(serde_json::json!({
3225                "errors": [
3226                    {"field": "email", "code": "invalid", "message": "Invalid email format"},
3227                    {"field": "password", "code": "weak", "message": "Password too weak"}
3228                ],
3229                "metadata": {
3230                    "requestId": "req-123",
3231                    "timestamp": "2024-01-01T00:00:00Z"
3232                }
3233            })),
3234            logs: None,
3235            traces: None,
3236        };
3237
3238        assert_eq!(payload.status, 400);
3239        let details = payload.details.unwrap();
3240        assert!(details.get("errors").is_some());
3241        assert!(details.get("metadata").is_some());
3242    }
3243
3244    #[test]
3245    fn test_health_status_construction_healthy() {
3246        use super::super::health::HealthStatus;
3247
3248        let status = HealthStatus {
3249            healthy: true,
3250            status: "ok".to_string(),
3251            uptime_ms: Some(1000000),
3252            memory: Some(500000000),
3253            pool_completed: Some(1000),
3254            pool_queued: Some(5),
3255            success_rate: Some(0.99),
3256            circuit_state: Some("closed".to_string()),
3257            avg_response_time_ms: Some(50),
3258            recovering: Some(false),
3259            recovery_percent: Some(100),
3260            shared_socket_available_slots: Some(100),
3261            shared_socket_active_connections: Some(10),
3262            shared_socket_registered_executions: Some(5),
3263            connection_pool_available_slots: Some(50),
3264            connection_pool_active_connections: Some(5),
3265        };
3266
3267        assert!(status.healthy);
3268        assert_eq!(status.status, "ok");
3269        assert_eq!(status.uptime_ms, Some(1000000));
3270        assert_eq!(status.circuit_state, Some("closed".to_string()));
3271    }
3272
3273    #[test]
3274    fn test_health_status_construction_unhealthy() {
3275        use super::super::health::HealthStatus;
3276
3277        let status = HealthStatus {
3278            healthy: false,
3279            status: "connection_failed".to_string(),
3280            uptime_ms: None,
3281            memory: None,
3282            pool_completed: None,
3283            pool_queued: None,
3284            success_rate: None,
3285            circuit_state: Some("open".to_string()),
3286            avg_response_time_ms: Some(0),
3287            recovering: Some(true),
3288            recovery_percent: Some(10),
3289            shared_socket_available_slots: None,
3290            shared_socket_active_connections: None,
3291            shared_socket_registered_executions: None,
3292            connection_pool_available_slots: None,
3293            connection_pool_active_connections: None,
3294        };
3295
3296        assert!(!status.healthy);
3297        assert_eq!(status.status, "connection_failed");
3298        assert!(status.uptime_ms.is_none());
3299    }
3300
3301    #[test]
3302    fn test_health_status_debug_format() {
3303        use super::super::health::HealthStatus;
3304
3305        let status = HealthStatus {
3306            healthy: true,
3307            status: "test".to_string(),
3308            uptime_ms: Some(100),
3309            memory: None,
3310            pool_completed: None,
3311            pool_queued: None,
3312            success_rate: None,
3313            circuit_state: None,
3314            avg_response_time_ms: None,
3315            recovering: None,
3316            recovery_percent: None,
3317            shared_socket_available_slots: None,
3318            shared_socket_active_connections: None,
3319            shared_socket_registered_executions: None,
3320            connection_pool_available_slots: None,
3321            connection_pool_active_connections: None,
3322        };
3323
3324        let debug_str = format!("{status:?}");
3325        assert!(debug_str.contains("healthy: true"));
3326        assert!(debug_str.contains("test"));
3327    }
3328
3329    #[test]
3330    fn test_health_status_clone() {
3331        use super::super::health::HealthStatus;
3332
3333        let status = HealthStatus {
3334            healthy: true,
3335            status: "original".to_string(),
3336            uptime_ms: Some(500),
3337            memory: Some(100),
3338            pool_completed: Some(10),
3339            pool_queued: Some(1),
3340            success_rate: Some(0.95),
3341            circuit_state: Some("closed".to_string()),
3342            avg_response_time_ms: Some(25),
3343            recovering: Some(false),
3344            recovery_percent: Some(100),
3345            shared_socket_available_slots: Some(50),
3346            shared_socket_active_connections: Some(2),
3347            shared_socket_registered_executions: Some(1),
3348            connection_pool_available_slots: Some(25),
3349            connection_pool_active_connections: Some(1),
3350        };
3351
3352        let cloned = status.clone();
3353        assert_eq!(cloned.healthy, status.healthy);
3354        assert_eq!(cloned.status, status.status);
3355        assert_eq!(cloned.uptime_ms, status.uptime_ms);
3356    }
3357
3358    #[test]
3359    fn test_execute_request_debug() {
3360        use super::super::protocol::ExecuteRequest;
3361
3362        let request = ExecuteRequest {
3363            task_id: "debug-test".to_string(),
3364            plugin_id: "test-plugin".to_string(),
3365            compiled_code: None,
3366            plugin_path: Some("/path/to/plugin.ts".to_string()),
3367            params: serde_json::json!({"test": true}),
3368            headers: None,
3369            socket_path: "/tmp/test.sock".to_string(),
3370            http_request_id: None,
3371            timeout: None,
3372            route: None,
3373            config: None,
3374            method: None,
3375            query: None,
3376        };
3377
3378        let debug_str = format!("{request:?}");
3379        assert!(debug_str.contains("debug-test"));
3380        assert!(debug_str.contains("test-plugin"));
3381    }
3382
3383    #[test]
3384    fn test_pool_error_debug() {
3385        use super::super::protocol::PoolError;
3386
3387        let error = PoolError {
3388            message: "Test error".to_string(),
3389            code: Some("TEST_ERR".to_string()),
3390            status: Some(400),
3391            details: Some(serde_json::json!({"info": "test"})),
3392        };
3393
3394        let debug_str = format!("{error:?}");
3395        assert!(debug_str.contains("Test error"));
3396        assert!(debug_str.contains("TEST_ERR"));
3397    }
3398
3399    #[test]
3400    fn test_pool_response_debug() {
3401        use super::super::protocol::PoolResponse;
3402
3403        let response = PoolResponse {
3404            task_id: "resp-123".to_string(),
3405            success: true,
3406            result: Some(serde_json::json!("result")),
3407            error: None,
3408            logs: None,
3409        };
3410
3411        let debug_str = format!("{response:?}");
3412        assert!(debug_str.contains("resp-123"));
3413        assert!(debug_str.contains("true"));
3414    }
3415
3416    #[test]
3417    fn test_pool_log_entry_debug() {
3418        use super::super::protocol::PoolLogEntry;
3419
3420        let entry = PoolLogEntry {
3421            level: "info".to_string(),
3422            message: "Test message".to_string(),
3423        };
3424
3425        let debug_str = format!("{entry:?}");
3426        assert!(debug_str.contains("info"));
3427        assert!(debug_str.contains("Test message"));
3428    }
3429
3430    #[test]
3431    fn test_circuit_breaker_default_trait() {
3432        use super::super::health::CircuitBreaker;
3433
3434        let cb = CircuitBreaker::default();
3435        assert_eq!(cb.state(), CircuitState::Closed);
3436    }
3437
3438    #[test]
3439    fn test_circuit_breaker_set_state_all_variants() {
3440        use super::super::health::CircuitBreaker;
3441
3442        let cb = CircuitBreaker::new();
3443
3444        // Test setting all states
3445        cb.set_state(CircuitState::HalfOpen);
3446        assert_eq!(cb.state(), CircuitState::HalfOpen);
3447
3448        cb.set_state(CircuitState::Open);
3449        assert_eq!(cb.state(), CircuitState::Open);
3450
3451        cb.set_state(CircuitState::Closed);
3452        assert_eq!(cb.state(), CircuitState::Closed);
3453    }
3454
3455    #[test]
3456    fn test_circuit_breaker_failure_rate_triggers_open() {
3457        use super::super::health::CircuitBreaker;
3458
3459        let cb = CircuitBreaker::new();
3460
3461        // Record enough failures to trigger circuit opening
3462        for _ in 0..100 {
3463            cb.record_failure();
3464        }
3465
3466        assert_eq!(cb.state(), CircuitState::Open);
3467    }
3468
3469    #[test]
3470    fn test_circuit_breaker_low_failure_rate_stays_closed() {
3471        use super::super::health::CircuitBreaker;
3472
3473        let cb = CircuitBreaker::new();
3474
3475        // Record mostly successes with few failures
3476        for _ in 0..90 {
3477            cb.record_success(50);
3478        }
3479        for _ in 0..10 {
3480            cb.record_failure();
3481        }
3482
3483        // Should still be closed (10% failure rate)
3484        assert_eq!(cb.state(), CircuitState::Closed);
3485    }
3486
3487    #[test]
3488    fn test_circuit_breaker_ema_response_time() {
3489        use super::super::health::CircuitBreaker;
3490
3491        let cb = CircuitBreaker::new();
3492
3493        // Record several response times
3494        cb.record_success(100);
3495        let avg1 = cb.avg_response_time();
3496
3497        cb.record_success(100);
3498        cb.record_success(100);
3499        cb.record_success(100);
3500        let avg2 = cb.avg_response_time();
3501
3502        // Average should stabilize around 100
3503        assert!(avg1 > 0);
3504        assert!(avg2 > 0);
3505        assert!(avg2 <= 100);
3506    }
3507
3508    #[test]
3509    fn test_circuit_breaker_force_close_resets_counters() {
3510        use super::super::health::CircuitBreaker;
3511
3512        let cb = CircuitBreaker::new();
3513        cb.set_state(CircuitState::Open);
3514
3515        cb.force_close();
3516
3517        assert_eq!(cb.state(), CircuitState::Closed);
3518    }
3519
3520    #[test]
3521    fn test_process_status_debug() {
3522        use super::super::health::ProcessStatus;
3523
3524        assert_eq!(format!("{:?}", ProcessStatus::Running), "Running");
3525        assert_eq!(format!("{:?}", ProcessStatus::Exited), "Exited");
3526        assert_eq!(format!("{:?}", ProcessStatus::Unknown), "Unknown");
3527        assert_eq!(format!("{:?}", ProcessStatus::NoProcess), "NoProcess");
3528    }
3529
3530    #[test]
3531    fn test_process_status_clone() {
3532        use super::super::health::ProcessStatus;
3533
3534        let status = ProcessStatus::Running;
3535        let cloned = status;
3536        assert_eq!(status, cloned);
3537    }
3538
3539    // ============================================
3540    // Additional coverage tests - DeadServerIndicator
3541    // ============================================
3542
3543    #[test]
3544    fn test_dead_server_indicator_all_variants() {
3545        use super::super::health::DeadServerIndicator;
3546
3547        // Test all enum variants exist and are properly matched
3548        let variants = [
3549            ("eof while parsing", DeadServerIndicator::EofWhileParsing),
3550            ("broken pipe", DeadServerIndicator::BrokenPipe),
3551            ("connection refused", DeadServerIndicator::ConnectionRefused),
3552            ("connection reset", DeadServerIndicator::ConnectionReset),
3553            ("not connected", DeadServerIndicator::NotConnected),
3554            ("failed to connect", DeadServerIndicator::FailedToConnect),
3555            (
3556                "socket file missing",
3557                DeadServerIndicator::SocketFileMissing,
3558            ),
3559            ("no such file", DeadServerIndicator::NoSuchFile),
3560            (
3561                "connection timed out",
3562                DeadServerIndicator::ConnectionTimedOut,
3563            ),
3564            ("connect timed out", DeadServerIndicator::ConnectionTimedOut),
3565        ];
3566
3567        for (pattern, expected) in variants {
3568            let result = DeadServerIndicator::from_error_str(pattern);
3569            assert_eq!(result, Some(expected), "Pattern '{pattern}' should match");
3570        }
3571    }
3572
3573    #[test]
3574    fn test_dead_server_indicator_debug_format() {
3575        use super::super::health::DeadServerIndicator;
3576
3577        let indicator = DeadServerIndicator::BrokenPipe;
3578        let debug_str = format!("{indicator:?}");
3579        assert_eq!(debug_str, "BrokenPipe");
3580    }
3581
3582    #[test]
3583    fn test_dead_server_indicator_clone_copy() {
3584        use super::super::health::DeadServerIndicator;
3585
3586        let indicator = DeadServerIndicator::ConnectionRefused;
3587        let cloned = indicator;
3588        assert_eq!(indicator, cloned);
3589    }
3590
3591    #[test]
3592    fn test_result_ring_buffer_not_enough_data() {
3593        use super::super::health::ResultRingBuffer;
3594
3595        let buffer = ResultRingBuffer::new(100);
3596
3597        // Record less than 10 results
3598        for _ in 0..9 {
3599            buffer.record(false);
3600        }
3601
3602        // Should return 0.0 because not enough data
3603        assert_eq!(buffer.failure_rate(), 0.0);
3604    }
3605
3606    #[test]
3607    fn test_result_ring_buffer_exactly_10_samples() {
3608        use super::super::health::ResultRingBuffer;
3609
3610        let buffer = ResultRingBuffer::new(100);
3611
3612        // Record exactly 10 failures
3613        for _ in 0..10 {
3614            buffer.record(false);
3615        }
3616
3617        // Should return 1.0 (100% failure)
3618        assert_eq!(buffer.failure_rate(), 1.0);
3619    }
3620
3621    #[test]
3622    fn test_result_ring_buffer_wraps_correctly() {
3623        use super::super::health::ResultRingBuffer;
3624
3625        let buffer = ResultRingBuffer::new(10);
3626
3627        // Fill buffer with successes
3628        for _ in 0..10 {
3629            buffer.record(true);
3630        }
3631        assert_eq!(buffer.failure_rate(), 0.0);
3632
3633        // Overwrite with failures
3634        for _ in 0..10 {
3635            buffer.record(false);
3636        }
3637        assert_eq!(buffer.failure_rate(), 1.0);
3638    }
3639
3640    #[test]
3641    fn test_circuit_state_equality_all_pairs() {
3642        assert_eq!(CircuitState::Closed, CircuitState::Closed);
3643        assert_eq!(CircuitState::HalfOpen, CircuitState::HalfOpen);
3644        assert_eq!(CircuitState::Open, CircuitState::Open);
3645
3646        assert_ne!(CircuitState::Closed, CircuitState::HalfOpen);
3647        assert_ne!(CircuitState::Closed, CircuitState::Open);
3648        assert_ne!(CircuitState::HalfOpen, CircuitState::Open);
3649    }
3650
3651    #[test]
3652    fn test_circuit_state_clone_copy() {
3653        let state = CircuitState::HalfOpen;
3654        let copied = state;
3655        assert_eq!(state, copied);
3656    }
3657
3658    #[test]
3659    fn test_parse_pool_response_with_null_values() {
3660        use super::super::protocol::PoolResponse;
3661
3662        let response = PoolResponse {
3663            task_id: "null-test".to_string(),
3664            success: true,
3665            result: Some(serde_json::json!(null)),
3666            error: None,
3667            logs: None,
3668        };
3669
3670        let result = PoolManager::parse_pool_response(response).unwrap();
3671        assert_eq!(result.return_value, "null");
3672    }
3673
3674    #[test]
3675    fn test_parse_pool_response_with_nested_result() {
3676        use super::super::protocol::PoolResponse;
3677
3678        let response = PoolResponse {
3679            task_id: "nested-test".to_string(),
3680            success: true,
3681            result: Some(serde_json::json!({
3682                "level1": {
3683                    "level2": {
3684                        "level3": "deep value"
3685                    }
3686                }
3687            })),
3688            error: None,
3689            logs: None,
3690        };
3691
3692        let result = PoolManager::parse_pool_response(response).unwrap();
3693        assert!(result.return_value.contains("level1"));
3694        assert!(result.return_value.contains("level2"));
3695        assert!(result.return_value.contains("level3"));
3696        assert!(result.return_value.contains("deep value"));
3697    }
3698
3699    #[test]
3700    fn test_parse_pool_response_error_with_details() {
3701        use super::super::protocol::{PoolError, PoolResponse};
3702
3703        let response = PoolResponse {
3704            task_id: "error-details".to_string(),
3705            success: false,
3706            result: None,
3707            error: Some(PoolError {
3708                message: "Error with details".to_string(),
3709                code: Some("DETAILED_ERROR".to_string()),
3710                status: Some(422),
3711                details: Some(serde_json::json!({
3712                    "field": "email",
3713                    "expected": "string",
3714                    "received": "number"
3715                })),
3716            }),
3717            logs: None,
3718        };
3719
3720        let err = PoolManager::parse_pool_response(response).unwrap_err();
3721        match err {
3722            PluginError::HandlerError(payload) => {
3723                assert_eq!(payload.message, "Error with details");
3724                assert_eq!(payload.code, Some("DETAILED_ERROR".to_string()));
3725                assert!(payload.details.is_some());
3726                let details = payload.details.unwrap();
3727                assert_eq!(details.get("field").unwrap(), "email");
3728            }
3729            _ => panic!("Expected HandlerError"),
3730        }
3731    }
3732
3733    #[test]
3734    fn test_parse_health_result_with_all_optional_fields() {
3735        let json = serde_json::json!({
3736            "status": "healthy",
3737            "uptime": 999999,
3738            "memory": {
3739                "heapUsed": 123456789,
3740                "heapTotal": 987654321,
3741                "external": 111111,
3742                "arrayBuffers": 222222
3743            },
3744            "pool": {
3745                "completed": 50000,
3746                "queued": 100,
3747                "active": 50,
3748                "waiting": 25
3749            },
3750            "execution": {
3751                "successRate": 0.9999,
3752                "avgDuration": 45.5,
3753                "totalExecutions": 100000
3754            }
3755        });
3756
3757        let result = PoolManager::parse_health_result(&json);
3758        assert_eq!(result.status, "healthy");
3759        assert_eq!(result.uptime_ms, Some(999999));
3760        assert_eq!(result.memory, Some(123456789));
3761        assert_eq!(result.pool_completed, Some(50000));
3762        assert_eq!(result.pool_queued, Some(100));
3763        assert!((result.success_rate.unwrap() - 0.9999).abs() < 0.0001);
3764    }
3765
3766    #[tokio::test]
3767    async fn test_pool_manager_max_queue_size() {
3768        let manager = PoolManager::new();
3769        // max_queue_size should be set from config
3770        assert!(manager.max_queue_size > 0);
3771    }
3772
3773    #[tokio::test]
3774    async fn test_pool_manager_last_restart_time_initial() {
3775        let manager = PoolManager::new();
3776        assert_eq!(manager.last_restart_time_ms.load(Ordering::Relaxed), 0);
3777    }
3778
3779    #[tokio::test]
3780    async fn test_pool_manager_connection_pool_exists() {
3781        let manager = PoolManager::new();
3782        // Connection pool should be initialized
3783        let available = manager.connection_pool.semaphore.available_permits();
3784        assert!(available > 0);
3785    }
3786
3787    #[test]
3788    fn test_is_dead_server_error_with_whitespace() {
3789        // Patterns with extra whitespace
3790        let err = PluginError::SocketError("  connection refused  ".to_string());
3791        assert!(PoolManager::is_dead_server_error(&err));
3792
3793        let err = PluginError::SocketError("error: broken pipe occurred".to_string());
3794        assert!(PoolManager::is_dead_server_error(&err));
3795    }
3796
3797    #[test]
3798    fn test_is_dead_server_error_multiline() {
3799        // Multiline error messages
3800        let err = PluginError::SocketError(
3801            "Error occurred\nConnection refused\nPlease retry".to_string(),
3802        );
3803        assert!(PoolManager::is_dead_server_error(&err));
3804    }
3805
3806    #[test]
3807    fn test_is_dead_server_error_json_in_message() {
3808        // Error with JSON content
3809        let err = PluginError::PluginExecutionError(
3810            r#"{"error": "connection refused", "code": 61}"#.to_string(),
3811        );
3812        assert!(PoolManager::is_dead_server_error(&err));
3813    }
3814
3815    #[test]
3816    fn test_format_return_value_special_json() {
3817        // Test with special JSON values
3818        let value = Some(serde_json::json!(f64::MAX));
3819        let result = PoolManager::format_return_value(value);
3820        assert!(!result.is_empty());
3821
3822        let value = Some(serde_json::json!(i64::MIN));
3823        let result = PoolManager::format_return_value(value);
3824        assert!(result.contains("-"));
3825    }
3826
3827    #[test]
3828    fn test_format_return_value_with_escaped_chars() {
3829        let value = Some(serde_json::json!("line1\nline2\ttab\"quote"));
3830        let result = PoolManager::format_return_value(value);
3831        assert!(result.contains("line1"));
3832        assert!(result.contains("line2"));
3833    }
3834
3835    #[test]
3836    fn test_format_return_value_array_of_objects() {
3837        let value = Some(serde_json::json!([
3838            {"id": 1, "name": "first"},
3839            {"id": 2, "name": "second"}
3840        ]));
3841        let result = PoolManager::format_return_value(value);
3842        assert!(result.contains("first"));
3843        assert!(result.contains("second"));
3844    }
3845
3846    #[test]
3847    fn test_all_log_levels_conversion() {
3848        use super::super::protocol::PoolLogEntry;
3849
3850        let levels = [
3851            ("log", LogLevel::Log),
3852            ("debug", LogLevel::Debug),
3853            ("info", LogLevel::Info),
3854            ("warn", LogLevel::Warn),
3855            ("error", LogLevel::Error),
3856            ("result", LogLevel::Result),
3857            ("unknown_level", LogLevel::Log), // Unknown defaults to Log
3858            ("LOG", LogLevel::Log),           // Case matters - uppercase goes to default
3859            ("", LogLevel::Log),              // Empty string goes to default
3860        ];
3861
3862        for (input, expected) in levels {
3863            let entry = PoolLogEntry {
3864                level: input.to_string(),
3865                message: "test".to_string(),
3866            };
3867            let log_entry: LogEntry = entry.into();
3868            assert_eq!(
3869                log_entry.level, expected,
3870                "Level '{input}' should convert to {expected:?}"
3871            );
3872        }
3873    }
3874
3875    #[tokio::test]
3876    async fn test_pool_manager_health_check_flag_manipulation() {
3877        let manager = PoolManager::new();
3878
3879        manager.health_check_needed.store(true, Ordering::Relaxed);
3880        assert!(manager.health_check_needed.load(Ordering::Relaxed));
3881
3882        manager.health_check_needed.store(false, Ordering::Relaxed);
3883        assert!(!manager.health_check_needed.load(Ordering::Relaxed));
3884    }
3885
3886    #[tokio::test]
3887    async fn test_pool_manager_consecutive_failures_manipulation() {
3888        let manager = PoolManager::new();
3889
3890        manager.consecutive_failures.fetch_add(1, Ordering::Relaxed);
3891        assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 1);
3892
3893        manager.consecutive_failures.fetch_add(5, Ordering::Relaxed);
3894        assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 6);
3895
3896        manager.consecutive_failures.store(0, Ordering::Relaxed);
3897        assert_eq!(manager.consecutive_failures.load(Ordering::Relaxed), 0);
3898    }
3899
3900    #[test]
3901    fn test_parsed_health_result_with_all_none() {
3902        let result = ParsedHealthResult {
3903            status: "minimal".to_string(),
3904            uptime_ms: None,
3905            memory: None,
3906            pool_completed: None,
3907            pool_queued: None,
3908            success_rate: None,
3909        };
3910
3911        assert_eq!(result.status, "minimal");
3912        assert!(result.uptime_ms.is_none());
3913        assert!(result.memory.is_none());
3914    }
3915
3916    #[test]
3917    fn test_parsed_health_result_with_all_some() {
3918        let result = ParsedHealthResult {
3919            status: "complete".to_string(),
3920            uptime_ms: Some(u64::MAX),
3921            memory: Some(u64::MAX),
3922            pool_completed: Some(u64::MAX),
3923            pool_queued: Some(u64::MAX),
3924            success_rate: Some(1.0),
3925        };
3926
3927        assert_eq!(result.status, "complete");
3928        assert_eq!(result.uptime_ms, Some(u64::MAX));
3929        assert_eq!(result.success_rate, Some(1.0));
3930    }
3931
3932    #[test]
3933    fn test_calculate_heap_size_extensive_values() {
3934        // Test many different concurrency values
3935        let test_cases = [
3936            (0, 512),
3937            (1, 512),
3938            (5, 512),
3939            (9, 512),
3940            (10, 544),
3941            (11, 544),
3942            (19, 544),
3943            (20, 576),
3944            (50, 672),
3945            (100, 832),
3946            (150, 992),
3947            (200, 1152),
3948            (250, 1312),
3949            (300, 1472),
3950            (400, 1792),
3951            (500, 2112),
3952            (1000, 3712),
3953            (2000, 6912),
3954            (2400, 8192),  // At cap
3955            (3000, 8192),  // Capped
3956            (5000, 8192),  // Capped
3957            (10000, 8192), // Capped
3958        ];
3959
3960        for (concurrency, expected_heap) in test_cases {
3961            let heap = PoolManager::calculate_heap_size(concurrency);
3962            assert_eq!(
3963                heap, expected_heap,
3964                "Concurrency {concurrency} should give heap {expected_heap}"
3965            );
3966        }
3967    }
3968
3969    #[tokio::test]
3970    async fn test_pool_manager_drop_cleans_socket() {
3971        let socket_path = format!("/tmp/test-drop-{}.sock", uuid::Uuid::new_v4());
3972
3973        // Create a file at the socket path
3974        std::fs::write(&socket_path, "test").unwrap();
3975        assert!(std::path::Path::new(&socket_path).exists());
3976
3977        // Create manager with this socket path
3978        {
3979            let _manager = PoolManager::with_socket_path(socket_path.clone());
3980            // Manager exists here
3981        }
3982        // Manager dropped here - should clean up socket
3983
3984        // Socket should be removed
3985        assert!(!std::path::Path::new(&socket_path).exists());
3986    }
3987
3988    #[test]
3989    fn test_script_result_with_traces() {
3990        let result = ScriptResult {
3991            logs: vec![],
3992            error: String::new(),
3993            return_value: "with traces".to_string(),
3994            trace: vec![
3995                serde_json::json!({"action": "GET", "url": "/api/test"}),
3996                serde_json::json!({"action": "POST", "url": "/api/submit"}),
3997            ],
3998        };
3999
4000        assert_eq!(result.trace.len(), 2);
4001        assert!(result.trace[0].get("action").is_some());
4002    }
4003
4004    #[test]
4005    fn test_script_result_with_error() {
4006        let result = ScriptResult {
4007            logs: vec![LogEntry {
4008                level: LogLevel::Error,
4009                message: "Something went wrong".to_string(),
4010            }],
4011            error: "RuntimeError: undefined is not a function".to_string(),
4012            return_value: String::new(),
4013            trace: vec![],
4014        };
4015
4016        assert!(!result.error.is_empty());
4017        assert!(result.error.contains("RuntimeError"));
4018        assert_eq!(result.logs.len(), 1);
4019    }
4020
4021    #[test]
4022    fn test_plugin_handler_payload_with_traces() {
4023        let payload = PluginHandlerPayload {
4024            message: "Error with traces".to_string(),
4025            status: 500,
4026            code: None,
4027            details: None,
4028            logs: None,
4029            traces: Some(vec![
4030                serde_json::json!({"method": "GET", "path": "/health"}),
4031                serde_json::json!({"method": "POST", "path": "/execute"}),
4032            ]),
4033        };
4034
4035        assert!(payload.traces.is_some());
4036        assert_eq!(payload.traces.as_ref().unwrap().len(), 2);
4037    }
4038
4039    #[test]
4040    fn test_queued_request_all_optional_fields() {
4041        let (tx, _rx) = oneshot::channel();
4042
4043        let mut headers = HashMap::new();
4044        headers.insert(
4045            "X-Custom".to_string(),
4046            vec!["value1".to_string(), "value2".to_string()],
4047        );
4048
4049        let request = QueuedRequest {
4050            plugin_id: "full-request".to_string(),
4051            compiled_code: Some("compiled code here".to_string()),
4052            plugin_path: Some("/path/to/plugin.ts".to_string()),
4053            params: serde_json::json!({"key": "value", "number": 42}),
4054            headers: Some(headers),
4055            socket_path: "/tmp/full.sock".to_string(),
4056            http_request_id: Some("http-123".to_string()),
4057            timeout_secs: Some(60),
4058            route: Some("/api/v1/execute".to_string()),
4059            config: Some(serde_json::json!({"setting": true})),
4060            method: Some("PUT".to_string()),
4061            query: Some(serde_json::json!({"page": 1, "limit": 10})),
4062            response_tx: tx,
4063        };
4064
4065        assert_eq!(request.plugin_id, "full-request");
4066        assert!(request.compiled_code.is_some());
4067        assert!(request.plugin_path.is_some());
4068        assert!(request.headers.is_some());
4069        assert_eq!(request.timeout_secs, Some(60));
4070        assert_eq!(request.method, Some("PUT".to_string()));
4071    }
4072}