openzeppelin_relayer/services/plugins/
health.rs

1//! Health monitoring and circuit breaker for the plugin pool.
2//!
3//! This module provides:
4//! - Circuit breaker pattern for automatic degradation under stress
5//! - Health status reporting for monitoring
6//! - Dead server detection for automatic recovery
7
8use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, AtomicUsize, Ordering};
9use std::sync::Arc;
10
11/// Lock-free ring buffer for tracking recent results (sliding window)
12pub struct ResultRingBuffer {
13    buffer: Vec<AtomicU8>, // 0 = empty, 1 = success, 2 = failure
14    index: AtomicUsize,
15    size: usize,
16}
17
18impl ResultRingBuffer {
19    pub fn new(size: usize) -> Self {
20        assert!(size > 0, "ResultRingBuffer size must be greater than 0");
21        let mut buffer = Vec::with_capacity(size);
22        for _ in 0..size {
23            buffer.push(AtomicU8::new(0));
24        }
25        Self {
26            buffer,
27            index: AtomicUsize::new(0),
28            size,
29        }
30    }
31
32    pub fn record(&self, success: bool) {
33        let idx = self.index.fetch_add(1, Ordering::Relaxed) % self.size;
34        self.buffer[idx].store(if success { 1 } else { 2 }, Ordering::Relaxed);
35    }
36
37    pub fn failure_rate(&self) -> f32 {
38        let mut total = 0;
39        let mut failures = 0;
40
41        for slot in &self.buffer {
42            match slot.load(Ordering::Relaxed) {
43                0 => {}          // Empty slot
44                1 => total += 1, // Success
45                2 => {
46                    total += 1;
47                    failures += 1;
48                }
49                _ => {}
50            }
51        }
52
53        if total < 10 {
54            return 0.0; // Not enough data to make decision
55        }
56
57        (failures as f32) / (total as f32)
58    }
59}
60
61/// Circuit breaker state for automatic degradation under stress
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum CircuitState {
64    /// Normal operation - all requests allowed
65    Closed,
66    /// Degraded - some requests rejected to reduce load
67    HalfOpen,
68    /// Fully open - most requests rejected, recovery in progress
69    Open,
70}
71
72/// Indicators that the pool server is dead or unreachable.
73/// Using an enum instead of string matching for type safety and documentation.
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum DeadServerIndicator {
76    /// JSON parse error - connection closed mid-message
77    EofWhileParsing,
78    /// Write to closed socket
79    BrokenPipe,
80    /// Server not listening
81    ConnectionRefused,
82    /// Server forcefully closed connection
83    ConnectionReset,
84    /// Socket not connected
85    NotConnected,
86    /// Connection establishment failed
87    FailedToConnect,
88    /// Unix socket file deleted
89    SocketFileMissing,
90    /// Socket file doesn't exist
91    NoSuchFile,
92    /// Connection timeout (not execution timeout)
93    ConnectionTimedOut,
94}
95
96impl DeadServerIndicator {
97    /// All patterns that indicate a dead server
98    const ALL: &'static [(&'static str, DeadServerIndicator)] = &[
99        ("eof while parsing", DeadServerIndicator::EofWhileParsing),
100        ("broken pipe", DeadServerIndicator::BrokenPipe),
101        ("connection refused", DeadServerIndicator::ConnectionRefused),
102        ("connection reset", DeadServerIndicator::ConnectionReset),
103        ("not connected", DeadServerIndicator::NotConnected),
104        ("failed to connect", DeadServerIndicator::FailedToConnect),
105        (
106            "socket file missing",
107            DeadServerIndicator::SocketFileMissing,
108        ),
109        ("no such file", DeadServerIndicator::NoSuchFile),
110        (
111            "connection timed out",
112            DeadServerIndicator::ConnectionTimedOut,
113        ),
114        ("connect timed out", DeadServerIndicator::ConnectionTimedOut),
115    ];
116
117    /// Check if an error string matches any dead server indicator
118    pub fn from_error_str(error_str: &str) -> Option<DeadServerIndicator> {
119        let lower = error_str.to_lowercase();
120        Self::ALL
121            .iter()
122            .find(|(pattern, _)| lower.contains(pattern))
123            .map(|(_, indicator)| *indicator)
124    }
125}
126
127/// Process status for health check decisions
128#[derive(Debug, Clone, Copy, PartialEq, Eq)]
129pub enum ProcessStatus {
130    /// Process is running normally
131    Running,
132    /// Process has exited
133    Exited,
134    /// Could not determine status
135    Unknown,
136    /// No process handle exists
137    NoProcess,
138}
139
140/// Circuit breaker for managing pool health and automatic recovery.
141/// Tracks failure rates and response times to detect GC pressure.
142pub struct CircuitBreaker {
143    /// Current circuit state (encoded as u8 for atomic access)
144    /// 0 = Closed, 1 = HalfOpen, 2 = Open
145    state: AtomicU32,
146    /// Time when circuit opened (for recovery timing)
147    opened_at_ms: AtomicU64,
148    /// Time of last activity (for idle auto-close)
149    last_activity_ms: AtomicU64,
150    /// Consecutive successful requests in half-open state
151    recovery_successes: AtomicU32,
152    /// Average response time in ms (exponential moving average)
153    avg_response_time_ms: AtomicU32,
154    /// Number of restart attempts
155    restart_attempts: AtomicU32,
156    /// Sliding window of recent results (100 most recent)
157    recent_results: Arc<ResultRingBuffer>,
158}
159
160impl Default for CircuitBreaker {
161    fn default() -> Self {
162        Self::new()
163    }
164}
165
166impl CircuitBreaker {
167    /// Idle timeout for auto-closing circuit (2 minutes of no activity)
168    const IDLE_AUTO_CLOSE_MS: u64 = 120_000;
169
170    /// Initial backoff delay in milliseconds before attempting recovery.
171    /// First recovery attempt waits this long after circuit opens.
172    const INITIAL_BACKOFF_MS: u64 = 5000;
173
174    /// Maximum number of backoff attempts before capping the delay.
175    /// Exponential backoff doubles each attempt: 5s, 10s, 20s, 40s, then capped.
176    const MAX_BACKOFF_ATTEMPTS: u32 = 4;
177
178    /// Maximum backoff delay in milliseconds (hard cap).
179    /// Prevents excessive wait times between recovery attempts.
180    const MAX_BACKOFF_MS: u64 = 60_000;
181
182    pub fn new() -> Self {
183        Self {
184            state: AtomicU32::new(0), // Closed
185            opened_at_ms: AtomicU64::new(0),
186            last_activity_ms: AtomicU64::new(0),
187            recovery_successes: AtomicU32::new(0),
188            avg_response_time_ms: AtomicU32::new(0),
189            restart_attempts: AtomicU32::new(0),
190            recent_results: Arc::new(ResultRingBuffer::new(100)),
191        }
192    }
193
194    fn now_ms() -> u64 {
195        std::time::SystemTime::now()
196            .duration_since(std::time::UNIX_EPOCH)
197            .unwrap_or_default()
198            .as_millis() as u64
199    }
200
201    fn update_last_activity(&self) {
202        self.last_activity_ms
203            .store(Self::now_ms(), Ordering::Relaxed);
204    }
205
206    pub fn state(&self) -> CircuitState {
207        match self.state.load(Ordering::Relaxed) {
208            0 => CircuitState::Closed,
209            1 => CircuitState::HalfOpen,
210            _ => CircuitState::Open,
211        }
212    }
213
214    pub fn set_state(&self, state: CircuitState) {
215        let val = match state {
216            CircuitState::Closed => 0,
217            CircuitState::HalfOpen => 1,
218            CircuitState::Open => 2,
219        };
220        self.state.store(val, Ordering::Relaxed);
221    }
222
223    /// Record a successful request with response time
224    pub fn record_success(&self, response_time_ms: u32) {
225        self.update_last_activity();
226        self.recent_results.record(true);
227
228        // Update exponential moving average (alpha = 0.1)
229        let current = self.avg_response_time_ms.load(Ordering::Relaxed);
230        let new_avg = if current == 0 {
231            response_time_ms
232        } else {
233            (current * 9 + response_time_ms) / 10
234        };
235        self.avg_response_time_ms.store(new_avg, Ordering::Relaxed);
236
237        // Handle state transitions on success
238        match self.state() {
239            CircuitState::HalfOpen => {
240                let successes = self.recovery_successes.fetch_add(1, Ordering::Relaxed) + 1;
241                // Require 10 consecutive successes to close circuit
242                if successes >= 10 {
243                    tracing::info!("Circuit breaker closing - recovery successful");
244                    self.set_state(CircuitState::Closed);
245                    self.recovery_successes.store(0, Ordering::Relaxed);
246                    self.restart_attempts.store(0, Ordering::Relaxed);
247                }
248            }
249            CircuitState::Open => {
250                // Check if enough time has passed to try half-open
251                self.maybe_transition_to_half_open();
252            }
253            CircuitState::Closed => {}
254        }
255    }
256
257    /// Record a failed request
258    pub fn record_failure(&self) {
259        self.update_last_activity();
260        self.recent_results.record(false);
261        self.recovery_successes.store(0, Ordering::Relaxed);
262
263        let failure_rate = self.recent_results.failure_rate();
264
265        match self.state() {
266            CircuitState::Closed => {
267                // Open circuit if failure rate > 50% (ring buffer requires at least 10 samples)
268                if failure_rate > 0.5 {
269                    tracing::warn!(
270                        failure_rate = %format!("{:.1}%", failure_rate * 100.0),
271                        "Circuit breaker opening - high failure rate"
272                    );
273                    self.open_circuit();
274                }
275            }
276            CircuitState::HalfOpen => {
277                // Any failure in half-open sends back to open
278                tracing::warn!("Circuit breaker reopening - failure during recovery");
279                self.open_circuit();
280            }
281            CircuitState::Open => {
282                // Already open, just check for transition
283                self.maybe_transition_to_half_open();
284            }
285        }
286    }
287
288    fn open_circuit(&self) {
289        self.set_state(CircuitState::Open);
290        let now = std::time::SystemTime::now()
291            .duration_since(std::time::UNIX_EPOCH)
292            .unwrap_or_default()
293            .as_millis() as u64;
294        self.opened_at_ms.store(now, Ordering::Relaxed);
295        self.recovery_successes.store(0, Ordering::Relaxed);
296    }
297
298    fn maybe_transition_to_half_open(&self) {
299        let opened_at = self.opened_at_ms.load(Ordering::Relaxed);
300        let now = std::time::SystemTime::now()
301            .duration_since(std::time::UNIX_EPOCH)
302            .unwrap_or_default()
303            .as_millis() as u64;
304
305        // Exponential backoff: INITIAL_BACKOFF_MS, 2x, 4x, 8x, 16x, capped at MAX_BACKOFF_MS
306        // This prevents rapid restart attempts that could worsen the situation
307        let attempts = self.restart_attempts.load(Ordering::Relaxed);
308        let backoff_ms = (Self::INITIAL_BACKOFF_MS
309            * (1 << attempts.min(Self::MAX_BACKOFF_ATTEMPTS)))
310        .min(Self::MAX_BACKOFF_MS);
311
312        if now - opened_at >= backoff_ms {
313            tracing::info!(
314                backoff_ms = backoff_ms,
315                "Circuit breaker transitioning to half-open"
316            );
317            self.set_state(CircuitState::HalfOpen);
318            self.restart_attempts.fetch_add(1, Ordering::Relaxed);
319        }
320    }
321
322    /// Check if request should be allowed based on circuit state.
323    /// If recovery_allowance is provided, use it in HalfOpen state instead of default 10%.
324    /// Auto-closes circuit if idle for longer than IDLE_AUTO_CLOSE_MS.
325    pub fn should_allow_request(&self, recovery_allowance: Option<u32>) -> bool {
326        // Auto-close if idle for too long (no failures means system likely recovered)
327        if !matches!(self.state(), CircuitState::Closed) {
328            let last_activity = self.last_activity_ms.load(Ordering::Relaxed);
329            let now = Self::now_ms();
330            if last_activity > 0 && now - last_activity >= Self::IDLE_AUTO_CLOSE_MS {
331                tracing::info!(
332                    idle_ms = now - last_activity,
333                    "Circuit breaker auto-closing after idle period"
334                );
335                self.force_close();
336                return true;
337            }
338        }
339
340        match self.state() {
341            CircuitState::Closed => true,
342            CircuitState::HalfOpen => {
343                // Use recovery allowance if provided, otherwise default to 10%
344                let allowance = recovery_allowance.unwrap_or(10);
345                (rand::random::<u32>() % 100) < allowance
346            }
347            CircuitState::Open => {
348                // Check if we should transition to half-open
349                self.maybe_transition_to_half_open();
350                // Recheck state after potential transition
351                matches!(self.state(), CircuitState::HalfOpen)
352            }
353        }
354    }
355
356    /// Get current response time average for monitoring
357    pub fn avg_response_time(&self) -> u32 {
358        self.avg_response_time_ms.load(Ordering::Relaxed)
359    }
360
361    /// Force circuit to closed state (for manual recovery)
362    pub fn force_close(&self) {
363        self.set_state(CircuitState::Closed);
364        self.recovery_successes.store(0, Ordering::Relaxed);
365        self.restart_attempts.store(0, Ordering::Relaxed);
366    }
367
368    /// Access recovery_successes for testing
369    #[cfg(test)]
370    pub fn recovery_successes(&self) -> &AtomicU32 {
371        &self.recovery_successes
372    }
373}
374
375/// Health status information from the pool server
376#[derive(Debug, Clone)]
377pub struct HealthStatus {
378    pub healthy: bool,
379    pub status: String,
380    pub uptime_ms: Option<u64>,
381    pub memory: Option<u64>,
382    pub pool_completed: Option<u64>,
383    pub pool_queued: Option<u64>,
384    pub success_rate: Option<f64>,
385    /// Circuit breaker state (Closed/HalfOpen/Open)
386    pub circuit_state: Option<String>,
387    /// Average response time in ms
388    pub avg_response_time_ms: Option<u32>,
389    /// Whether recovery mode is active
390    pub recovering: Option<bool>,
391    /// Current recovery allowance percentage
392    pub recovery_percent: Option<u32>,
393    /// Shared socket available connection slots
394    pub shared_socket_available_slots: Option<usize>,
395    /// Shared socket active connection count
396    pub shared_socket_active_connections: Option<usize>,
397    /// Shared socket registered execution count
398    pub shared_socket_registered_executions: Option<usize>,
399    /// Connection pool available slots (for pool server connections)
400    pub connection_pool_available_slots: Option<usize>,
401    /// Connection pool active connections (for pool server connections)
402    pub connection_pool_active_connections: Option<usize>,
403}
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408
409    // =========================================================================
410    // DeadServerIndicator Tests
411    // =========================================================================
412
413    #[test]
414    fn test_dead_server_indicator_eof_while_parsing() {
415        let result = DeadServerIndicator::from_error_str("eof while parsing a value");
416        assert_eq!(result, Some(DeadServerIndicator::EofWhileParsing));
417    }
418
419    #[test]
420    fn test_dead_server_indicator_broken_pipe() {
421        let result = DeadServerIndicator::from_error_str("write failed: Broken pipe");
422        assert_eq!(result, Some(DeadServerIndicator::BrokenPipe));
423    }
424
425    #[test]
426    fn test_dead_server_indicator_connection_refused() {
427        let result = DeadServerIndicator::from_error_str("Connection refused (os error 61)");
428        assert_eq!(result, Some(DeadServerIndicator::ConnectionRefused));
429    }
430
431    #[test]
432    fn test_dead_server_indicator_connection_reset() {
433        let result = DeadServerIndicator::from_error_str("Connection reset by peer");
434        assert_eq!(result, Some(DeadServerIndicator::ConnectionReset));
435    }
436
437    #[test]
438    fn test_dead_server_indicator_not_connected() {
439        let result = DeadServerIndicator::from_error_str("Socket is not connected");
440        assert_eq!(result, Some(DeadServerIndicator::NotConnected));
441    }
442
443    #[test]
444    fn test_dead_server_indicator_failed_to_connect() {
445        let result = DeadServerIndicator::from_error_str("Failed to connect to server");
446        assert_eq!(result, Some(DeadServerIndicator::FailedToConnect));
447    }
448
449    #[test]
450    fn test_dead_server_indicator_socket_file_missing() {
451        let result = DeadServerIndicator::from_error_str("Socket file missing at /tmp/pool.sock");
452        assert_eq!(result, Some(DeadServerIndicator::SocketFileMissing));
453    }
454
455    #[test]
456    fn test_dead_server_indicator_no_such_file() {
457        let result = DeadServerIndicator::from_error_str("No such file or directory");
458        assert_eq!(result, Some(DeadServerIndicator::NoSuchFile));
459    }
460
461    #[test]
462    fn test_dead_server_indicator_connection_timed_out() {
463        let result = DeadServerIndicator::from_error_str("Connection timed out after 5s");
464        assert_eq!(result, Some(DeadServerIndicator::ConnectionTimedOut));
465    }
466
467    #[test]
468    fn test_dead_server_indicator_connect_timed_out() {
469        let result = DeadServerIndicator::from_error_str("Connect timed out");
470        assert_eq!(result, Some(DeadServerIndicator::ConnectionTimedOut));
471    }
472
473    #[test]
474    fn test_dead_server_indicator_case_insensitive() {
475        let result = DeadServerIndicator::from_error_str("BROKEN PIPE ERROR");
476        assert_eq!(result, Some(DeadServerIndicator::BrokenPipe));
477
478        let result = DeadServerIndicator::from_error_str("EOF While Parsing");
479        assert_eq!(result, Some(DeadServerIndicator::EofWhileParsing));
480    }
481
482    #[test]
483    fn test_dead_server_indicator_no_match() {
484        let result = DeadServerIndicator::from_error_str("Plugin execution failed");
485        assert_eq!(result, None);
486
487        let result = DeadServerIndicator::from_error_str("Timeout exceeded");
488        assert_eq!(result, None);
489
490        let result = DeadServerIndicator::from_error_str("");
491        assert_eq!(result, None);
492    }
493
494    // =========================================================================
495    // ResultRingBuffer Tests
496    // =========================================================================
497
498    #[test]
499    fn test_result_ring_buffer_empty() {
500        let buffer = ResultRingBuffer::new(100);
501        assert_eq!(buffer.failure_rate(), 0.0);
502    }
503
504    #[test]
505    fn test_result_ring_buffer_all_successes() {
506        let buffer = ResultRingBuffer::new(100);
507        for _ in 0..50 {
508            buffer.record(true);
509        }
510        assert_eq!(buffer.failure_rate(), 0.0);
511    }
512
513    #[test]
514    fn test_result_ring_buffer_all_failures() {
515        let buffer = ResultRingBuffer::new(100);
516        for _ in 0..50 {
517            buffer.record(false);
518        }
519        assert_eq!(buffer.failure_rate(), 1.0);
520    }
521
522    #[test]
523    fn test_result_ring_buffer_mixed_results() {
524        let buffer = ResultRingBuffer::new(100);
525        for _ in 0..25 {
526            buffer.record(true);
527        }
528        for _ in 0..25 {
529            buffer.record(false);
530        }
531        assert!((buffer.failure_rate() - 0.5).abs() < 0.01);
532    }
533
534    #[test]
535    fn test_result_ring_buffer_wraps_around() {
536        let buffer = ResultRingBuffer::new(100);
537        for _ in 0..100 {
538            buffer.record(true);
539        }
540        for _ in 0..50 {
541            buffer.record(false);
542        }
543        assert!((buffer.failure_rate() - 0.5).abs() < 0.01);
544    }
545
546    // =========================================================================
547    // CircuitBreaker Tests
548    // =========================================================================
549
550    #[test]
551    fn test_circuit_breaker_initial_state() {
552        let cb = CircuitBreaker::new();
553        assert_eq!(cb.state(), CircuitState::Closed);
554        assert!(cb.should_allow_request(None));
555    }
556
557    #[test]
558    fn test_circuit_breaker_stays_closed_on_successes() {
559        let cb = CircuitBreaker::new();
560        for _ in 0..100 {
561            cb.record_success(50);
562        }
563        assert_eq!(cb.state(), CircuitState::Closed);
564        assert!(cb.should_allow_request(None));
565    }
566
567    #[test]
568    fn test_circuit_breaker_opens_on_high_failure_rate() {
569        let cb = CircuitBreaker::new();
570        for _ in 0..100 {
571            cb.record_failure();
572        }
573        assert_eq!(cb.state(), CircuitState::Open);
574    }
575
576    #[test]
577    fn test_circuit_breaker_half_open_allows_some_requests() {
578        let cb = CircuitBreaker::new();
579        cb.set_state(CircuitState::HalfOpen);
580
581        let mut allowed = 0;
582        for _ in 0..100 {
583            if cb.should_allow_request(Some(10)) {
584                allowed += 1;
585            }
586        }
587        assert!(allowed > 0, "Half-open should allow some requests");
588        assert!(allowed < 50, "Half-open should not allow too many requests");
589    }
590
591    #[test]
592    fn test_circuit_breaker_force_close() {
593        let cb = CircuitBreaker::new();
594        cb.set_state(CircuitState::Open);
595        assert_eq!(cb.state(), CircuitState::Open);
596
597        cb.force_close();
598        assert_eq!(cb.state(), CircuitState::Closed);
599    }
600
601    #[test]
602    fn test_circuit_breaker_half_open_to_closed_on_successes() {
603        let cb = CircuitBreaker::new();
604        cb.set_state(CircuitState::HalfOpen);
605        cb.recovery_successes().store(0, Ordering::Relaxed);
606
607        for _ in 0..10 {
608            cb.record_success(50);
609        }
610
611        assert_eq!(cb.state(), CircuitState::Closed);
612    }
613
614    #[test]
615    fn test_circuit_breaker_half_open_to_open_on_failure() {
616        let cb = CircuitBreaker::new();
617        cb.set_state(CircuitState::HalfOpen);
618        cb.recovery_successes().store(5, Ordering::Relaxed);
619
620        cb.record_failure();
621
622        assert_eq!(cb.state(), CircuitState::Open);
623    }
624
625    #[test]
626    fn test_circuit_breaker_response_time_tracking() {
627        let cb = CircuitBreaker::new();
628
629        cb.record_success(100);
630        cb.record_success(200);
631        cb.record_success(150);
632
633        let avg = cb.avg_response_time();
634        assert!(avg > 0, "Average should be positive");
635        assert!(avg < 300, "Average should be reasonable");
636    }
637
638    // =========================================================================
639    // CircuitState Tests
640    // =========================================================================
641
642    #[test]
643    fn test_circuit_state_debug() {
644        assert_eq!(format!("{:?}", CircuitState::Closed), "Closed");
645        assert_eq!(format!("{:?}", CircuitState::HalfOpen), "HalfOpen");
646        assert_eq!(format!("{:?}", CircuitState::Open), "Open");
647    }
648
649    #[test]
650    fn test_circuit_state_equality() {
651        assert_eq!(CircuitState::Closed, CircuitState::Closed);
652        assert_ne!(CircuitState::Closed, CircuitState::Open);
653        assert_ne!(CircuitState::HalfOpen, CircuitState::Open);
654    }
655
656    // =========================================================================
657    // ProcessStatus Tests
658    // =========================================================================
659
660    #[test]
661    fn test_process_status_variants() {
662        assert_eq!(ProcessStatus::Running, ProcessStatus::Running);
663        assert_eq!(ProcessStatus::Exited, ProcessStatus::Exited);
664        assert_eq!(ProcessStatus::Unknown, ProcessStatus::Unknown);
665        assert_ne!(ProcessStatus::Running, ProcessStatus::Exited);
666    }
667}