1use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, AtomicUsize, Ordering};
9use std::sync::Arc;
10
11pub struct ResultRingBuffer {
13 buffer: Vec<AtomicU8>, index: AtomicUsize,
15 size: usize,
16}
17
18impl ResultRingBuffer {
19 pub fn new(size: usize) -> Self {
20 assert!(size > 0, "ResultRingBuffer size must be greater than 0");
21 let mut buffer = Vec::with_capacity(size);
22 for _ in 0..size {
23 buffer.push(AtomicU8::new(0));
24 }
25 Self {
26 buffer,
27 index: AtomicUsize::new(0),
28 size,
29 }
30 }
31
32 pub fn record(&self, success: bool) {
33 let idx = self.index.fetch_add(1, Ordering::Relaxed) % self.size;
34 self.buffer[idx].store(if success { 1 } else { 2 }, Ordering::Relaxed);
35 }
36
37 pub fn failure_rate(&self) -> f32 {
38 let mut total = 0;
39 let mut failures = 0;
40
41 for slot in &self.buffer {
42 match slot.load(Ordering::Relaxed) {
43 0 => {} 1 => total += 1, 2 => {
46 total += 1;
47 failures += 1;
48 }
49 _ => {}
50 }
51 }
52
53 if total < 10 {
54 return 0.0; }
56
57 (failures as f32) / (total as f32)
58 }
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum CircuitState {
64 Closed,
66 HalfOpen,
68 Open,
70}
71
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub enum DeadServerIndicator {
76 EofWhileParsing,
78 BrokenPipe,
80 ConnectionRefused,
82 ConnectionReset,
84 NotConnected,
86 FailedToConnect,
88 SocketFileMissing,
90 NoSuchFile,
92 ConnectionTimedOut,
94}
95
96impl DeadServerIndicator {
97 const ALL: &'static [(&'static str, DeadServerIndicator)] = &[
99 ("eof while parsing", DeadServerIndicator::EofWhileParsing),
100 ("broken pipe", DeadServerIndicator::BrokenPipe),
101 ("connection refused", DeadServerIndicator::ConnectionRefused),
102 ("connection reset", DeadServerIndicator::ConnectionReset),
103 ("not connected", DeadServerIndicator::NotConnected),
104 ("failed to connect", DeadServerIndicator::FailedToConnect),
105 (
106 "socket file missing",
107 DeadServerIndicator::SocketFileMissing,
108 ),
109 ("no such file", DeadServerIndicator::NoSuchFile),
110 (
111 "connection timed out",
112 DeadServerIndicator::ConnectionTimedOut,
113 ),
114 ("connect timed out", DeadServerIndicator::ConnectionTimedOut),
115 ];
116
117 pub fn from_error_str(error_str: &str) -> Option<DeadServerIndicator> {
119 let lower = error_str.to_lowercase();
120 Self::ALL
121 .iter()
122 .find(|(pattern, _)| lower.contains(pattern))
123 .map(|(_, indicator)| *indicator)
124 }
125}
126
127#[derive(Debug, Clone, Copy, PartialEq, Eq)]
129pub enum ProcessStatus {
130 Running,
132 Exited,
134 Unknown,
136 NoProcess,
138}
139
140pub struct CircuitBreaker {
143 state: AtomicU32,
146 opened_at_ms: AtomicU64,
148 last_activity_ms: AtomicU64,
150 recovery_successes: AtomicU32,
152 avg_response_time_ms: AtomicU32,
154 restart_attempts: AtomicU32,
156 recent_results: Arc<ResultRingBuffer>,
158}
159
160impl Default for CircuitBreaker {
161 fn default() -> Self {
162 Self::new()
163 }
164}
165
166impl CircuitBreaker {
167 const IDLE_AUTO_CLOSE_MS: u64 = 120_000;
169
170 const INITIAL_BACKOFF_MS: u64 = 5000;
173
174 const MAX_BACKOFF_ATTEMPTS: u32 = 4;
177
178 const MAX_BACKOFF_MS: u64 = 60_000;
181
182 pub fn new() -> Self {
183 Self {
184 state: AtomicU32::new(0), opened_at_ms: AtomicU64::new(0),
186 last_activity_ms: AtomicU64::new(0),
187 recovery_successes: AtomicU32::new(0),
188 avg_response_time_ms: AtomicU32::new(0),
189 restart_attempts: AtomicU32::new(0),
190 recent_results: Arc::new(ResultRingBuffer::new(100)),
191 }
192 }
193
194 fn now_ms() -> u64 {
195 std::time::SystemTime::now()
196 .duration_since(std::time::UNIX_EPOCH)
197 .unwrap_or_default()
198 .as_millis() as u64
199 }
200
201 fn update_last_activity(&self) {
202 self.last_activity_ms
203 .store(Self::now_ms(), Ordering::Relaxed);
204 }
205
206 pub fn state(&self) -> CircuitState {
207 match self.state.load(Ordering::Relaxed) {
208 0 => CircuitState::Closed,
209 1 => CircuitState::HalfOpen,
210 _ => CircuitState::Open,
211 }
212 }
213
214 pub fn set_state(&self, state: CircuitState) {
215 let val = match state {
216 CircuitState::Closed => 0,
217 CircuitState::HalfOpen => 1,
218 CircuitState::Open => 2,
219 };
220 self.state.store(val, Ordering::Relaxed);
221 }
222
223 pub fn record_success(&self, response_time_ms: u32) {
225 self.update_last_activity();
226 self.recent_results.record(true);
227
228 let current = self.avg_response_time_ms.load(Ordering::Relaxed);
230 let new_avg = if current == 0 {
231 response_time_ms
232 } else {
233 (current * 9 + response_time_ms) / 10
234 };
235 self.avg_response_time_ms.store(new_avg, Ordering::Relaxed);
236
237 match self.state() {
239 CircuitState::HalfOpen => {
240 let successes = self.recovery_successes.fetch_add(1, Ordering::Relaxed) + 1;
241 if successes >= 10 {
243 tracing::info!("Circuit breaker closing - recovery successful");
244 self.set_state(CircuitState::Closed);
245 self.recovery_successes.store(0, Ordering::Relaxed);
246 self.restart_attempts.store(0, Ordering::Relaxed);
247 }
248 }
249 CircuitState::Open => {
250 self.maybe_transition_to_half_open();
252 }
253 CircuitState::Closed => {}
254 }
255 }
256
257 pub fn record_failure(&self) {
259 self.update_last_activity();
260 self.recent_results.record(false);
261 self.recovery_successes.store(0, Ordering::Relaxed);
262
263 let failure_rate = self.recent_results.failure_rate();
264
265 match self.state() {
266 CircuitState::Closed => {
267 if failure_rate > 0.5 {
269 tracing::warn!(
270 failure_rate = %format!("{:.1}%", failure_rate * 100.0),
271 "Circuit breaker opening - high failure rate"
272 );
273 self.open_circuit();
274 }
275 }
276 CircuitState::HalfOpen => {
277 tracing::warn!("Circuit breaker reopening - failure during recovery");
279 self.open_circuit();
280 }
281 CircuitState::Open => {
282 self.maybe_transition_to_half_open();
284 }
285 }
286 }
287
288 fn open_circuit(&self) {
289 self.set_state(CircuitState::Open);
290 let now = std::time::SystemTime::now()
291 .duration_since(std::time::UNIX_EPOCH)
292 .unwrap_or_default()
293 .as_millis() as u64;
294 self.opened_at_ms.store(now, Ordering::Relaxed);
295 self.recovery_successes.store(0, Ordering::Relaxed);
296 }
297
298 fn maybe_transition_to_half_open(&self) {
299 let opened_at = self.opened_at_ms.load(Ordering::Relaxed);
300 let now = std::time::SystemTime::now()
301 .duration_since(std::time::UNIX_EPOCH)
302 .unwrap_or_default()
303 .as_millis() as u64;
304
305 let attempts = self.restart_attempts.load(Ordering::Relaxed);
308 let backoff_ms = (Self::INITIAL_BACKOFF_MS
309 * (1 << attempts.min(Self::MAX_BACKOFF_ATTEMPTS)))
310 .min(Self::MAX_BACKOFF_MS);
311
312 if now - opened_at >= backoff_ms {
313 tracing::info!(
314 backoff_ms = backoff_ms,
315 "Circuit breaker transitioning to half-open"
316 );
317 self.set_state(CircuitState::HalfOpen);
318 self.restart_attempts.fetch_add(1, Ordering::Relaxed);
319 }
320 }
321
322 pub fn should_allow_request(&self, recovery_allowance: Option<u32>) -> bool {
326 if !matches!(self.state(), CircuitState::Closed) {
328 let last_activity = self.last_activity_ms.load(Ordering::Relaxed);
329 let now = Self::now_ms();
330 if last_activity > 0 && now - last_activity >= Self::IDLE_AUTO_CLOSE_MS {
331 tracing::info!(
332 idle_ms = now - last_activity,
333 "Circuit breaker auto-closing after idle period"
334 );
335 self.force_close();
336 return true;
337 }
338 }
339
340 match self.state() {
341 CircuitState::Closed => true,
342 CircuitState::HalfOpen => {
343 let allowance = recovery_allowance.unwrap_or(10);
345 (rand::random::<u32>() % 100) < allowance
346 }
347 CircuitState::Open => {
348 self.maybe_transition_to_half_open();
350 matches!(self.state(), CircuitState::HalfOpen)
352 }
353 }
354 }
355
356 pub fn avg_response_time(&self) -> u32 {
358 self.avg_response_time_ms.load(Ordering::Relaxed)
359 }
360
361 pub fn force_close(&self) {
363 self.set_state(CircuitState::Closed);
364 self.recovery_successes.store(0, Ordering::Relaxed);
365 self.restart_attempts.store(0, Ordering::Relaxed);
366 }
367
368 #[cfg(test)]
370 pub fn recovery_successes(&self) -> &AtomicU32 {
371 &self.recovery_successes
372 }
373}
374
375#[derive(Debug, Clone)]
377pub struct HealthStatus {
378 pub healthy: bool,
379 pub status: String,
380 pub uptime_ms: Option<u64>,
381 pub memory: Option<u64>,
382 pub pool_completed: Option<u64>,
383 pub pool_queued: Option<u64>,
384 pub success_rate: Option<f64>,
385 pub circuit_state: Option<String>,
387 pub avg_response_time_ms: Option<u32>,
389 pub recovering: Option<bool>,
391 pub recovery_percent: Option<u32>,
393 pub shared_socket_available_slots: Option<usize>,
395 pub shared_socket_active_connections: Option<usize>,
397 pub shared_socket_registered_executions: Option<usize>,
399 pub connection_pool_available_slots: Option<usize>,
401 pub connection_pool_active_connections: Option<usize>,
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408
409 #[test]
414 fn test_dead_server_indicator_eof_while_parsing() {
415 let result = DeadServerIndicator::from_error_str("eof while parsing a value");
416 assert_eq!(result, Some(DeadServerIndicator::EofWhileParsing));
417 }
418
419 #[test]
420 fn test_dead_server_indicator_broken_pipe() {
421 let result = DeadServerIndicator::from_error_str("write failed: Broken pipe");
422 assert_eq!(result, Some(DeadServerIndicator::BrokenPipe));
423 }
424
425 #[test]
426 fn test_dead_server_indicator_connection_refused() {
427 let result = DeadServerIndicator::from_error_str("Connection refused (os error 61)");
428 assert_eq!(result, Some(DeadServerIndicator::ConnectionRefused));
429 }
430
431 #[test]
432 fn test_dead_server_indicator_connection_reset() {
433 let result = DeadServerIndicator::from_error_str("Connection reset by peer");
434 assert_eq!(result, Some(DeadServerIndicator::ConnectionReset));
435 }
436
437 #[test]
438 fn test_dead_server_indicator_not_connected() {
439 let result = DeadServerIndicator::from_error_str("Socket is not connected");
440 assert_eq!(result, Some(DeadServerIndicator::NotConnected));
441 }
442
443 #[test]
444 fn test_dead_server_indicator_failed_to_connect() {
445 let result = DeadServerIndicator::from_error_str("Failed to connect to server");
446 assert_eq!(result, Some(DeadServerIndicator::FailedToConnect));
447 }
448
449 #[test]
450 fn test_dead_server_indicator_socket_file_missing() {
451 let result = DeadServerIndicator::from_error_str("Socket file missing at /tmp/pool.sock");
452 assert_eq!(result, Some(DeadServerIndicator::SocketFileMissing));
453 }
454
455 #[test]
456 fn test_dead_server_indicator_no_such_file() {
457 let result = DeadServerIndicator::from_error_str("No such file or directory");
458 assert_eq!(result, Some(DeadServerIndicator::NoSuchFile));
459 }
460
461 #[test]
462 fn test_dead_server_indicator_connection_timed_out() {
463 let result = DeadServerIndicator::from_error_str("Connection timed out after 5s");
464 assert_eq!(result, Some(DeadServerIndicator::ConnectionTimedOut));
465 }
466
467 #[test]
468 fn test_dead_server_indicator_connect_timed_out() {
469 let result = DeadServerIndicator::from_error_str("Connect timed out");
470 assert_eq!(result, Some(DeadServerIndicator::ConnectionTimedOut));
471 }
472
473 #[test]
474 fn test_dead_server_indicator_case_insensitive() {
475 let result = DeadServerIndicator::from_error_str("BROKEN PIPE ERROR");
476 assert_eq!(result, Some(DeadServerIndicator::BrokenPipe));
477
478 let result = DeadServerIndicator::from_error_str("EOF While Parsing");
479 assert_eq!(result, Some(DeadServerIndicator::EofWhileParsing));
480 }
481
482 #[test]
483 fn test_dead_server_indicator_no_match() {
484 let result = DeadServerIndicator::from_error_str("Plugin execution failed");
485 assert_eq!(result, None);
486
487 let result = DeadServerIndicator::from_error_str("Timeout exceeded");
488 assert_eq!(result, None);
489
490 let result = DeadServerIndicator::from_error_str("");
491 assert_eq!(result, None);
492 }
493
494 #[test]
499 fn test_result_ring_buffer_empty() {
500 let buffer = ResultRingBuffer::new(100);
501 assert_eq!(buffer.failure_rate(), 0.0);
502 }
503
504 #[test]
505 fn test_result_ring_buffer_all_successes() {
506 let buffer = ResultRingBuffer::new(100);
507 for _ in 0..50 {
508 buffer.record(true);
509 }
510 assert_eq!(buffer.failure_rate(), 0.0);
511 }
512
513 #[test]
514 fn test_result_ring_buffer_all_failures() {
515 let buffer = ResultRingBuffer::new(100);
516 for _ in 0..50 {
517 buffer.record(false);
518 }
519 assert_eq!(buffer.failure_rate(), 1.0);
520 }
521
522 #[test]
523 fn test_result_ring_buffer_mixed_results() {
524 let buffer = ResultRingBuffer::new(100);
525 for _ in 0..25 {
526 buffer.record(true);
527 }
528 for _ in 0..25 {
529 buffer.record(false);
530 }
531 assert!((buffer.failure_rate() - 0.5).abs() < 0.01);
532 }
533
534 #[test]
535 fn test_result_ring_buffer_wraps_around() {
536 let buffer = ResultRingBuffer::new(100);
537 for _ in 0..100 {
538 buffer.record(true);
539 }
540 for _ in 0..50 {
541 buffer.record(false);
542 }
543 assert!((buffer.failure_rate() - 0.5).abs() < 0.01);
544 }
545
546 #[test]
551 fn test_circuit_breaker_initial_state() {
552 let cb = CircuitBreaker::new();
553 assert_eq!(cb.state(), CircuitState::Closed);
554 assert!(cb.should_allow_request(None));
555 }
556
557 #[test]
558 fn test_circuit_breaker_stays_closed_on_successes() {
559 let cb = CircuitBreaker::new();
560 for _ in 0..100 {
561 cb.record_success(50);
562 }
563 assert_eq!(cb.state(), CircuitState::Closed);
564 assert!(cb.should_allow_request(None));
565 }
566
567 #[test]
568 fn test_circuit_breaker_opens_on_high_failure_rate() {
569 let cb = CircuitBreaker::new();
570 for _ in 0..100 {
571 cb.record_failure();
572 }
573 assert_eq!(cb.state(), CircuitState::Open);
574 }
575
576 #[test]
577 fn test_circuit_breaker_half_open_allows_some_requests() {
578 let cb = CircuitBreaker::new();
579 cb.set_state(CircuitState::HalfOpen);
580
581 let mut allowed = 0;
582 for _ in 0..100 {
583 if cb.should_allow_request(Some(10)) {
584 allowed += 1;
585 }
586 }
587 assert!(allowed > 0, "Half-open should allow some requests");
588 assert!(allowed < 50, "Half-open should not allow too many requests");
589 }
590
591 #[test]
592 fn test_circuit_breaker_force_close() {
593 let cb = CircuitBreaker::new();
594 cb.set_state(CircuitState::Open);
595 assert_eq!(cb.state(), CircuitState::Open);
596
597 cb.force_close();
598 assert_eq!(cb.state(), CircuitState::Closed);
599 }
600
601 #[test]
602 fn test_circuit_breaker_half_open_to_closed_on_successes() {
603 let cb = CircuitBreaker::new();
604 cb.set_state(CircuitState::HalfOpen);
605 cb.recovery_successes().store(0, Ordering::Relaxed);
606
607 for _ in 0..10 {
608 cb.record_success(50);
609 }
610
611 assert_eq!(cb.state(), CircuitState::Closed);
612 }
613
614 #[test]
615 fn test_circuit_breaker_half_open_to_open_on_failure() {
616 let cb = CircuitBreaker::new();
617 cb.set_state(CircuitState::HalfOpen);
618 cb.recovery_successes().store(5, Ordering::Relaxed);
619
620 cb.record_failure();
621
622 assert_eq!(cb.state(), CircuitState::Open);
623 }
624
625 #[test]
626 fn test_circuit_breaker_response_time_tracking() {
627 let cb = CircuitBreaker::new();
628
629 cb.record_success(100);
630 cb.record_success(200);
631 cb.record_success(150);
632
633 let avg = cb.avg_response_time();
634 assert!(avg > 0, "Average should be positive");
635 assert!(avg < 300, "Average should be reasonable");
636 }
637
638 #[test]
643 fn test_circuit_state_debug() {
644 assert_eq!(format!("{:?}", CircuitState::Closed), "Closed");
645 assert_eq!(format!("{:?}", CircuitState::HalfOpen), "HalfOpen");
646 assert_eq!(format!("{:?}", CircuitState::Open), "Open");
647 }
648
649 #[test]
650 fn test_circuit_state_equality() {
651 assert_eq!(CircuitState::Closed, CircuitState::Closed);
652 assert_ne!(CircuitState::Closed, CircuitState::Open);
653 assert_ne!(CircuitState::HalfOpen, CircuitState::Open);
654 }
655
656 #[test]
661 fn test_process_status_variants() {
662 assert_eq!(ProcessStatus::Running, ProcessStatus::Running);
663 assert_eq!(ProcessStatus::Exited, ProcessStatus::Exited);
664 assert_eq!(ProcessStatus::Unknown, ProcessStatus::Unknown);
665 assert_ne!(ProcessStatus::Running, ProcessStatus::Exited);
666 }
667}