openzeppelin_relayer/services/health/
mod.rs

1//! Health check service.
2//!
3//! This module contains the business logic for performing health checks,
4//! including system resource checks, Redis connectivity, queue health, and plugin status.
5
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use deadpool_redis::Pool;
10use tokio::sync::RwLock;
11
12use crate::models::health::{
13    ComponentStatus, Components, PluginHealth, PoolStatus, QueueHealth, ReadinessResponse,
14    RedisHealth, RedisHealthStatus, SystemHealth,
15};
16use crate::models::{
17    NetworkRepoModel, NotificationRepoModel, RelayerRepoModel, SignerRepoModel, ThinDataAppState,
18    TransactionRepoModel,
19};
20use crate::repositories::{
21    ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository, Repository,
22    TransactionCounterTrait, TransactionRepository,
23};
24use crate::services::plugins::get_pool_manager;
25use crate::utils::RedisConnections;
26use crate::{jobs::JobProducerTrait, queues::QueueBackend};
27
28// ============================================================================
29// Constants
30// ============================================================================
31
32/// Timeout for Redis PING operations during health checks.
33const PING_TIMEOUT: Duration = Duration::from_millis(3000);
34
35/// Warning file descriptor ratio (70%) - triggers Degraded status.
36const WARNING_FD_RATIO: f64 = 0.7;
37
38/// Maximum file descriptor ratio (80%) - triggers Unhealthy status.
39const MAX_FD_RATIO: f64 = 0.8;
40
41/// Warning CLOSE_WAIT socket count - triggers Degraded status.
42/// Increased from 50 to tolerate Docker/Redis networking artifacts under load.
43const WARNING_CLOSE_WAIT: usize = 200;
44
45/// Maximum CLOSE_WAIT socket count - triggers Unhealthy status.
46/// Increased from 100 to tolerate Docker/Redis networking artifacts under load.
47const MAX_CLOSE_WAIT: usize = 500;
48
49/// Cache TTL - health checks are cached for this duration.
50const HEALTH_CACHE_TTL: Duration = Duration::from_secs(10);
51
52// ============================================================================
53// Cache
54// ============================================================================
55
56/// Cached health check result with timestamp.
57struct CachedHealth {
58    response: ReadinessResponse,
59    checked_at: Instant,
60}
61
62/// Global health cache (thread-safe).
63static HEALTH_CACHE: std::sync::OnceLock<RwLock<Option<CachedHealth>>> = std::sync::OnceLock::new();
64
65fn get_cache() -> &'static RwLock<Option<CachedHealth>> {
66    HEALTH_CACHE.get_or_init(|| RwLock::new(None))
67}
68
69/// Check if cached response is still valid and return it.
70async fn get_cached_response() -> Option<ReadinessResponse> {
71    let cache = get_cache().read().await;
72    if let Some(ref cached) = *cache {
73        if cached.checked_at.elapsed() < HEALTH_CACHE_TTL {
74            return Some(cached.response.clone());
75        }
76    }
77    None
78}
79
80/// Store response in cache.
81async fn cache_response(response: &ReadinessResponse) {
82    let mut cache = get_cache().write().await;
83    *cache = Some(CachedHealth {
84        response: response.clone(),
85        checked_at: Instant::now(),
86    });
87}
88
89/// Clear the health cache (useful for testing).
90#[cfg(test)]
91pub async fn clear_cache() {
92    let mut cache = get_cache().write().await;
93    *cache = None;
94}
95
96// ============================================================================
97// Redis Health Checks
98// ============================================================================
99
100/// Ping a single Redis pool and return its status.
101async fn ping_pool(pool: &Arc<Pool>, name: &str) -> PoolStatus {
102    let status = pool.status();
103
104    let result = tokio::time::timeout(PING_TIMEOUT, async {
105        let mut conn = pool.get().await?;
106        redis::cmd("PING")
107            .query_async::<String>(&mut conn)
108            .await
109            .map_err(deadpool_redis::PoolError::Backend)
110    })
111    .await;
112
113    match result {
114        Ok(Ok(_)) => PoolStatus {
115            connected: true,
116            available: status.available,
117            max_size: status.max_size,
118            error: None,
119        },
120        Ok(Err(e)) => {
121            tracing::warn!(pool = %name, error = %e, "Redis pool PING failed");
122            PoolStatus {
123                connected: false,
124                available: status.available,
125                max_size: status.max_size,
126                error: Some(e.to_string()),
127            }
128        }
129        Err(_) => {
130            tracing::warn!(pool = %name, "Redis pool PING timed out");
131            PoolStatus {
132                connected: false,
133                available: status.available,
134                max_size: status.max_size,
135                error: Some("PING timed out".to_string()),
136            }
137        }
138    }
139}
140
141/// Check health of Redis connections (primary and reader pools).
142///
143/// PINGs both pools concurrently with a 500ms timeout.
144/// Primary pool failure = Unhealthy, reader pool failure = Degraded.
145async fn check_redis_health(connections: &Arc<RedisConnections>) -> RedisHealthStatus {
146    let (primary_status, reader_status) = tokio::join!(
147        ping_pool(connections.primary(), "primary"),
148        ping_pool(connections.reader(), "reader")
149    );
150
151    // Healthy if primary is connected (reader is optional/degraded mode)
152    let healthy = primary_status.connected;
153
154    let error = if !primary_status.connected {
155        Some(format!(
156            "Redis primary pool: {}",
157            primary_status
158                .error
159                .as_deref()
160                .unwrap_or("connection failed")
161        ))
162    } else {
163        None
164    };
165
166    RedisHealthStatus {
167        healthy,
168        primary_pool: primary_status,
169        reader_pool: reader_status,
170        error,
171    }
172}
173
174/// Convert RedisHealthStatus to RedisHealth with proper ComponentStatus.
175fn redis_status_to_health(status: RedisHealthStatus) -> RedisHealth {
176    let component_status = if status.healthy {
177        if status.reader_pool.connected {
178            ComponentStatus::Healthy
179        } else {
180            ComponentStatus::Degraded // Reader down but primary OK
181        }
182    } else {
183        ComponentStatus::Unhealthy
184    };
185
186    RedisHealth {
187        status: component_status,
188        primary_pool: status.primary_pool,
189        reader_pool: status.reader_pool,
190        error: status.error,
191    }
192}
193
194/// Create a neutral Redis health snapshot when Redis storage is not used.
195///
196/// In this mode Redis should not degrade readiness because it is not a required
197/// dependency of the active repository backend.
198fn create_not_applicable_redis_health() -> RedisHealth {
199    let neutral_pool = PoolStatus {
200        connected: true,
201        available: 0,
202        max_size: 0,
203        error: None,
204    };
205
206    RedisHealth {
207        status: ComponentStatus::Healthy,
208        primary_pool: neutral_pool.clone(),
209        reader_pool: neutral_pool,
210        error: None,
211    }
212}
213
214/// Create unhealthy Redis and Queue health when queue is unavailable.
215fn create_unavailable_health() -> (RedisHealth, QueueHealth) {
216    let error_msg = "Queue unavailable - cannot check Redis or Queue health";
217    let unhealthy_pool = PoolStatus {
218        connected: false,
219        available: 0,
220        max_size: 0,
221        error: Some(error_msg.to_string()),
222    };
223
224    let redis = RedisHealth {
225        status: ComponentStatus::Unhealthy,
226        primary_pool: unhealthy_pool.clone(),
227        reader_pool: unhealthy_pool,
228        error: Some(error_msg.to_string()),
229    };
230
231    let queue = QueueHealth {
232        status: ComponentStatus::Unhealthy,
233        error: Some("Failed to get queue from job producer".to_string()),
234    };
235
236    (redis, queue)
237}
238
239/// Check health of the active queue backend.
240async fn check_queue_backend_health(
241    queue_backend: Option<Arc<crate::queues::QueueBackendStorage>>,
242) -> QueueHealth {
243    match queue_backend {
244        Some(backend) => match backend.health_check().await {
245            Ok(backend_healths) => {
246                let all_healthy = backend_healths.iter().all(|h| h.is_healthy);
247                let error = if all_healthy {
248                    None
249                } else {
250                    let unhealthy_queues: Vec<_> = backend_healths
251                        .iter()
252                        .filter(|h| !h.is_healthy)
253                        .map(|h| h.queue_type.to_string())
254                        .collect();
255                    Some(format!("Unhealthy queues: {}", unhealthy_queues.join(", ")))
256                };
257                QueueHealth {
258                    status: if all_healthy {
259                        ComponentStatus::Healthy
260                    } else {
261                        ComponentStatus::Unhealthy
262                    },
263                    error,
264                }
265            }
266            Err(_) => {
267                let (_, unavailable_queue) = create_unavailable_health();
268                unavailable_queue
269            }
270        },
271        None => {
272            let (_, unavailable_queue) = create_unavailable_health();
273            unavailable_queue
274        }
275    }
276}
277
278// ============================================================================
279// System Health Checks
280// ============================================================================
281
282/// Get file descriptor count for current process.
283fn get_fd_count() -> Result<usize, std::io::Error> {
284    let pid = std::process::id();
285
286    #[cfg(target_os = "linux")]
287    {
288        let fd_dir = format!("/proc/{pid}/fd");
289        std::fs::read_dir(fd_dir).map(|entries| entries.count())
290    }
291
292    #[cfg(target_os = "macos")]
293    {
294        use std::process::Command;
295        let output = Command::new("lsof")
296            .args(["-p", &pid.to_string()])
297            .output()?;
298        let count = String::from_utf8_lossy(&output.stdout)
299            .lines()
300            .count()
301            .saturating_sub(1); // Subtract header line
302        Ok(count)
303    }
304
305    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
306    {
307        Ok(0) // Unsupported platform
308    }
309}
310
311/// Get soft file descriptor limit for current process.
312fn get_fd_limit() -> Result<usize, std::io::Error> {
313    #[cfg(any(target_os = "linux", target_os = "macos"))]
314    {
315        use std::process::Command;
316        let output = Command::new("sh").args(["-c", "ulimit -n"]).output()?;
317        let limit = String::from_utf8_lossy(&output.stdout)
318            .trim()
319            .parse()
320            .unwrap_or(1024);
321        Ok(limit)
322    }
323
324    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
325    {
326        Ok(1024) // Default fallback
327    }
328}
329
330/// Get CLOSE_WAIT socket count.
331fn get_close_wait_count() -> Result<usize, std::io::Error> {
332    #[cfg(any(target_os = "linux", target_os = "macos"))]
333    {
334        use std::process::Command;
335        let output = Command::new("sh")
336            .args(["-c", "netstat -an | grep CLOSE_WAIT | wc -l"])
337            .output()?;
338        let count = String::from_utf8_lossy(&output.stdout)
339            .trim()
340            .parse()
341            .unwrap_or(0);
342        Ok(count)
343    }
344
345    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
346    {
347        Ok(0) // Unsupported platform
348    }
349}
350
351/// Evaluate system metrics and return appropriate status with optional error.
352///
353/// Returns (status, optional_error_message).
354fn evaluate_system_metrics(
355    fd_ratio: f64,
356    fd_count: usize,
357    fd_limit: usize,
358    close_wait_count: usize,
359) -> (ComponentStatus, Option<String>) {
360    let mut errors: Vec<String> = Vec::new();
361    let mut is_degraded = false;
362
363    // Check file descriptor usage
364    if fd_ratio > MAX_FD_RATIO {
365        let fd_percent = fd_ratio * 100.0;
366        let max_percent = MAX_FD_RATIO * 100.0;
367        errors.push(format!(
368            "File descriptor limit critical: {fd_count}/{fd_limit} ({fd_percent:.1}% > {max_percent:.1}%)"
369        ));
370    } else if fd_ratio > WARNING_FD_RATIO {
371        is_degraded = true;
372    }
373
374    // Check CLOSE_WAIT sockets
375    if close_wait_count > MAX_CLOSE_WAIT {
376        errors.push(format!(
377            "Too many CLOSE_WAIT sockets: {close_wait_count} > {MAX_CLOSE_WAIT}"
378        ));
379    } else if close_wait_count > WARNING_CLOSE_WAIT {
380        is_degraded = true;
381    }
382
383    let status = if !errors.is_empty() {
384        ComponentStatus::Unhealthy
385    } else if is_degraded {
386        ComponentStatus::Degraded
387    } else {
388        ComponentStatus::Healthy
389    };
390
391    let error = if errors.is_empty() {
392        None
393    } else {
394        Some(errors.join("; "))
395    };
396
397    (status, error)
398}
399
400/// Check system resources and return health status.
401///
402/// Monitors file descriptor usage and CLOSE_WAIT socket count.
403/// - Below 70% FD usage and <200 CLOSE_WAIT: Healthy
404/// - 70-80% FD usage or 200-500 CLOSE_WAIT: Degraded
405/// - Above 80% FD usage or >500 CLOSE_WAIT: Unhealthy
406pub fn check_system_health() -> SystemHealth {
407    let fd_count = get_fd_count().unwrap_or(0);
408    let fd_limit = get_fd_limit().unwrap_or(1024);
409    let close_wait_count = get_close_wait_count().unwrap_or(0);
410
411    let fd_ratio = if fd_limit > 0 {
412        fd_count as f64 / fd_limit as f64
413    } else {
414        0.0
415    };
416    let fd_usage_percent = (fd_ratio * 100.0) as u32;
417
418    let (status, error) = evaluate_system_metrics(fd_ratio, fd_count, fd_limit, close_wait_count);
419
420    SystemHealth {
421        status,
422        fd_count,
423        fd_limit,
424        fd_usage_percent,
425        close_wait_count,
426        error,
427    }
428}
429
430// ============================================================================
431// Plugin Health Checks
432// ============================================================================
433
434/// Determine plugin ComponentStatus from health check result.
435fn determine_plugin_status(healthy: bool, circuit_state: Option<&str>) -> ComponentStatus {
436    if healthy {
437        match circuit_state {
438            Some("closed") | None => ComponentStatus::Healthy,
439            Some("half_open") | Some("open") => ComponentStatus::Degraded,
440            _ => ComponentStatus::Healthy,
441        }
442    } else {
443        ComponentStatus::Degraded
444    }
445}
446
447/// Check plugin health using the global pool manager.
448pub async fn check_plugin_health() -> Option<PluginHealth> {
449    let pool_manager = get_pool_manager();
450
451    if !pool_manager.is_initialized().await {
452        return None;
453    }
454
455    match pool_manager.health_check().await {
456        Ok(plugin_status) => {
457            let status = determine_plugin_status(
458                plugin_status.healthy,
459                plugin_status.circuit_state.as_deref(),
460            );
461
462            Some(PluginHealth {
463                status,
464                enabled: true,
465                circuit_state: plugin_status.circuit_state,
466                error: if plugin_status.healthy {
467                    None
468                } else {
469                    Some(plugin_status.status)
470                },
471                uptime_ms: plugin_status.uptime_ms,
472                memory: plugin_status.memory,
473                pool_completed: plugin_status.pool_completed,
474                pool_queued: plugin_status.pool_queued,
475                success_rate: plugin_status.success_rate,
476                avg_response_time_ms: plugin_status.avg_response_time_ms,
477                recovering: plugin_status.recovering,
478                recovery_percent: plugin_status.recovery_percent,
479                shared_socket_available_slots: plugin_status.shared_socket_available_slots,
480                shared_socket_active_connections: plugin_status.shared_socket_active_connections,
481                shared_socket_registered_executions: plugin_status
482                    .shared_socket_registered_executions,
483                connection_pool_available_slots: plugin_status.connection_pool_available_slots,
484                connection_pool_active_connections: plugin_status
485                    .connection_pool_active_connections,
486            })
487        }
488        Err(e) => Some(PluginHealth {
489            status: ComponentStatus::Degraded,
490            enabled: true,
491            circuit_state: None,
492            error: Some(e.to_string()),
493            uptime_ms: None,
494            memory: None,
495            pool_completed: None,
496            pool_queued: None,
497            success_rate: None,
498            avg_response_time_ms: None,
499            recovering: None,
500            recovery_percent: None,
501            shared_socket_available_slots: None,
502            shared_socket_active_connections: None,
503            shared_socket_registered_executions: None,
504            connection_pool_available_slots: None,
505            connection_pool_active_connections: None,
506        }),
507    }
508}
509
510// ============================================================================
511// Health Aggregation
512// ============================================================================
513
514/// Aggregate component health statuses into overall status and reasons.
515///
516/// Priority: Unhealthy > Degraded > Healthy
517/// Only Unhealthy components contribute to the reason list.
518fn aggregate_health(
519    system: &SystemHealth,
520    redis: &RedisHealth,
521    queue: &QueueHealth,
522    plugins: &Option<PluginHealth>,
523) -> (ComponentStatus, Option<String>) {
524    let mut reasons: Vec<String> = Vec::new();
525    let mut overall_status = ComponentStatus::Healthy;
526
527    // System check - unhealthy = 503
528    if system.status == ComponentStatus::Unhealthy {
529        overall_status = ComponentStatus::Unhealthy;
530        if let Some(ref err) = system.error {
531            reasons.push(err.clone());
532        }
533    } else if system.status == ComponentStatus::Degraded
534        && overall_status == ComponentStatus::Healthy
535    {
536        overall_status = ComponentStatus::Degraded;
537    }
538
539    // Redis check - unhealthy = 503
540    if redis.status == ComponentStatus::Unhealthy {
541        overall_status = ComponentStatus::Unhealthy;
542        if let Some(ref err) = redis.error {
543            reasons.push(err.clone());
544        }
545    } else if redis.status == ComponentStatus::Degraded
546        && overall_status == ComponentStatus::Healthy
547    {
548        overall_status = ComponentStatus::Degraded;
549    }
550
551    // Queue check - unhealthy = 503
552    if queue.status == ComponentStatus::Unhealthy {
553        overall_status = ComponentStatus::Unhealthy;
554        if let Some(ref err) = queue.error {
555            reasons.push(err.clone());
556        }
557    } else if queue.status == ComponentStatus::Degraded
558        && overall_status == ComponentStatus::Healthy
559    {
560        overall_status = ComponentStatus::Degraded;
561    }
562
563    // Plugin check - degraded only (doesn't cause 503)
564    if let Some(ref plugin_health) = plugins {
565        if plugin_health.status == ComponentStatus::Degraded
566            && overall_status == ComponentStatus::Healthy
567        {
568            overall_status = ComponentStatus::Degraded;
569        }
570    }
571
572    let reason = if reasons.is_empty() {
573        None
574    } else {
575        Some(reasons.join("; "))
576    };
577
578    (overall_status, reason)
579}
580
581/// Build the final ReadinessResponse from components.
582fn build_response(
583    system: SystemHealth,
584    redis: RedisHealth,
585    queue: QueueHealth,
586    plugins: Option<PluginHealth>,
587) -> ReadinessResponse {
588    let (overall_status, reason) = aggregate_health(&system, &redis, &queue, &plugins);
589    let ready = overall_status != ComponentStatus::Unhealthy;
590
591    ReadinessResponse {
592        ready,
593        status: overall_status,
594        reason,
595        components: Components {
596            system,
597            redis,
598            queue,
599            plugins,
600        },
601        timestamp: chrono::Utc::now().to_rfc3339(),
602    }
603}
604
605// ============================================================================
606// Public API
607// ============================================================================
608
609/// Get readiness response with caching.
610///
611/// Checks the cache first (10-second TTL). On cache miss, performs health checks
612/// for all components: system resources, Redis pools, queue, and plugins.
613///
614/// Returns 200 OK if ready (Healthy or Degraded), 503 if Unhealthy.
615pub async fn get_readiness<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
616    data: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
617) -> ReadinessResponse
618where
619    J: JobProducerTrait + Send + Sync + 'static,
620    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
621    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
622    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
623    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
624    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
625    TCR: TransactionCounterTrait + Send + Sync + 'static,
626    PR: PluginRepositoryTrait + Send + Sync + 'static,
627    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
628{
629    // Check cache first to avoid unnecessary health checks
630    if let Some(cached) = get_cached_response().await {
631        return cached;
632    }
633
634    // Perform health checks
635    let system = check_system_health();
636    let plugins = check_plugin_health().await;
637
638    // Redis storage health is derived from repository storage, independent of queue backend.
639    let redis = if let Some((redis_connections, _key_prefix)) =
640        data.transaction_repository.connection_info()
641    {
642        let redis_status = check_redis_health(&redis_connections).await;
643        redis_status_to_health(redis_status)
644    } else {
645        create_not_applicable_redis_health()
646    };
647
648    // Queue health is derived from the active queue backend.
649    let queue_health = check_queue_backend_health(data.job_producer.get_queue_backend()).await;
650
651    // Build response
652    let response = build_response(system, redis, queue_health, plugins);
653
654    // Cache the response
655    cache_response(&response).await;
656
657    response
658}
659
660// ============================================================================
661// Tests
662// ============================================================================
663
664#[cfg(test)]
665mod tests {
666    use super::*;
667
668    // -------------------------------------------------------------------------
669    // Constants Tests
670    // -------------------------------------------------------------------------
671
672    #[test]
673    fn test_constants() {
674        assert_eq!(PING_TIMEOUT, Duration::from_millis(3000));
675        assert_eq!(WARNING_FD_RATIO, 0.7);
676        assert_eq!(MAX_FD_RATIO, 0.8);
677        assert_eq!(WARNING_CLOSE_WAIT, 200);
678        assert_eq!(MAX_CLOSE_WAIT, 500);
679        assert_eq!(HEALTH_CACHE_TTL, Duration::from_secs(10));
680    }
681
682    // -------------------------------------------------------------------------
683    // System Metrics Evaluation Tests
684    // -------------------------------------------------------------------------
685
686    #[test]
687    fn test_evaluate_system_metrics_healthy() {
688        // Low FD usage, low CLOSE_WAIT
689        let (status, error) = evaluate_system_metrics(0.5, 500, 1000, 20);
690        assert_eq!(status, ComponentStatus::Healthy);
691        assert!(error.is_none());
692    }
693
694    #[test]
695    fn test_evaluate_system_metrics_degraded_fd() {
696        // FD at 75% (between warning and max)
697        let (status, error) = evaluate_system_metrics(0.75, 750, 1000, 20);
698        assert_eq!(status, ComponentStatus::Degraded);
699        assert!(error.is_none());
700    }
701
702    #[test]
703    fn test_evaluate_system_metrics_degraded_close_wait() {
704        // CLOSE_WAIT at 300 (between warning 200 and max 500)
705        let (status, error) = evaluate_system_metrics(0.5, 500, 1000, 300);
706        assert_eq!(status, ComponentStatus::Degraded);
707        assert!(error.is_none());
708    }
709
710    #[test]
711    fn test_evaluate_system_metrics_unhealthy_fd() {
712        // FD at 85% (above max)
713        let (status, error) = evaluate_system_metrics(0.85, 850, 1000, 20);
714        assert_eq!(status, ComponentStatus::Unhealthy);
715        assert!(error.is_some());
716        assert!(error.unwrap().contains("File descriptor limit critical"));
717    }
718
719    #[test]
720    fn test_evaluate_system_metrics_unhealthy_close_wait() {
721        // CLOSE_WAIT at 600 (above max 500)
722        let (status, error) = evaluate_system_metrics(0.5, 500, 1000, 600);
723        assert_eq!(status, ComponentStatus::Unhealthy);
724        assert!(error.is_some());
725        assert!(error.unwrap().contains("CLOSE_WAIT"));
726    }
727
728    #[test]
729    fn test_evaluate_system_metrics_both_unhealthy() {
730        // Both FD and CLOSE_WAIT above max
731        let (status, error) = evaluate_system_metrics(0.9, 900, 1000, 600);
732        assert_eq!(status, ComponentStatus::Unhealthy);
733        assert!(error.is_some());
734        let err = error.unwrap();
735        assert!(err.contains("File descriptor"));
736        assert!(err.contains("CLOSE_WAIT"));
737    }
738
739    #[test]
740    fn test_evaluate_system_metrics_at_warning_threshold() {
741        // Exactly at warning threshold (70% FD, 200 CLOSE_WAIT)
742        let (status, _) = evaluate_system_metrics(0.7, 700, 1000, 200);
743        // At exactly warning threshold, should still be healthy (> comparison)
744        assert_eq!(status, ComponentStatus::Healthy);
745    }
746
747    #[test]
748    fn test_evaluate_system_metrics_just_above_warning() {
749        // Just above warning threshold
750        let (status, _) = evaluate_system_metrics(0.71, 710, 1000, 201);
751        assert_eq!(status, ComponentStatus::Degraded);
752    }
753
754    #[test]
755    fn test_evaluate_system_metrics_at_max_threshold() {
756        // Exactly at max threshold (80% FD, 500 CLOSE_WAIT)
757        let (status, _) = evaluate_system_metrics(0.8, 800, 1000, 500);
758        // At exactly max, should be degraded (> comparison for unhealthy)
759        assert_eq!(status, ComponentStatus::Degraded);
760    }
761
762    #[test]
763    fn test_evaluate_system_metrics_zero_limit() {
764        // Edge case: zero fd_limit should not cause division by zero
765        let (status, _) = evaluate_system_metrics(0.0, 0, 0, 0);
766        assert_eq!(status, ComponentStatus::Healthy);
767    }
768
769    // -------------------------------------------------------------------------
770    // Plugin Status Tests
771    // -------------------------------------------------------------------------
772
773    #[test]
774    fn test_determine_plugin_status_healthy_closed() {
775        assert_eq!(
776            determine_plugin_status(true, Some("closed")),
777            ComponentStatus::Healthy
778        );
779    }
780
781    #[test]
782    fn test_determine_plugin_status_healthy_none() {
783        assert_eq!(
784            determine_plugin_status(true, None),
785            ComponentStatus::Healthy
786        );
787    }
788
789    #[test]
790    fn test_determine_plugin_status_healthy_half_open() {
791        assert_eq!(
792            determine_plugin_status(true, Some("half_open")),
793            ComponentStatus::Degraded
794        );
795    }
796
797    #[test]
798    fn test_determine_plugin_status_healthy_open() {
799        assert_eq!(
800            determine_plugin_status(true, Some("open")),
801            ComponentStatus::Degraded
802        );
803    }
804
805    #[test]
806    fn test_determine_plugin_status_unhealthy() {
807        assert_eq!(
808            determine_plugin_status(false, Some("closed")),
809            ComponentStatus::Degraded
810        );
811    }
812
813    #[test]
814    fn test_determine_plugin_status_unknown_state() {
815        assert_eq!(
816            determine_plugin_status(true, Some("unknown")),
817            ComponentStatus::Healthy
818        );
819    }
820
821    // -------------------------------------------------------------------------
822    // Health Aggregation Tests
823    // -------------------------------------------------------------------------
824
825    fn create_healthy_system() -> SystemHealth {
826        SystemHealth {
827            status: ComponentStatus::Healthy,
828            fd_count: 100,
829            fd_limit: 1000,
830            fd_usage_percent: 10,
831            close_wait_count: 5,
832            error: None,
833        }
834    }
835
836    fn create_healthy_redis() -> RedisHealth {
837        RedisHealth {
838            status: ComponentStatus::Healthy,
839            primary_pool: PoolStatus {
840                connected: true,
841                available: 8,
842                max_size: 16,
843                error: None,
844            },
845            reader_pool: PoolStatus {
846                connected: true,
847                available: 8,
848                max_size: 16,
849                error: None,
850            },
851            error: None,
852        }
853    }
854
855    fn create_healthy_queue() -> QueueHealth {
856        QueueHealth {
857            status: ComponentStatus::Healthy,
858            error: None,
859        }
860    }
861
862    #[test]
863    fn test_aggregate_health_all_healthy() {
864        let system = create_healthy_system();
865        let redis = create_healthy_redis();
866        let queue = create_healthy_queue();
867
868        let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
869        assert_eq!(status, ComponentStatus::Healthy);
870        assert!(reason.is_none());
871    }
872
873    #[test]
874    fn test_aggregate_health_system_degraded() {
875        let mut system = create_healthy_system();
876        system.status = ComponentStatus::Degraded;
877        let redis = create_healthy_redis();
878        let queue = create_healthy_queue();
879
880        let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
881        assert_eq!(status, ComponentStatus::Degraded);
882        assert!(reason.is_none());
883    }
884
885    #[test]
886    fn test_aggregate_health_system_unhealthy() {
887        let mut system = create_healthy_system();
888        system.status = ComponentStatus::Unhealthy;
889        system.error = Some("FD limit exceeded".to_string());
890        let redis = create_healthy_redis();
891        let queue = create_healthy_queue();
892
893        let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
894        assert_eq!(status, ComponentStatus::Unhealthy);
895        assert!(reason.is_some());
896        assert!(reason.unwrap().contains("FD limit"));
897    }
898
899    #[test]
900    fn test_aggregate_health_redis_unhealthy() {
901        let system = create_healthy_system();
902        let mut redis = create_healthy_redis();
903        redis.status = ComponentStatus::Unhealthy;
904        redis.error = Some("Primary pool down".to_string());
905        let queue = create_healthy_queue();
906
907        let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
908        assert_eq!(status, ComponentStatus::Unhealthy);
909        assert!(reason.unwrap().contains("Primary pool"));
910    }
911
912    #[test]
913    fn test_aggregate_health_queue_unhealthy() {
914        let system = create_healthy_system();
915        let redis = create_healthy_redis();
916        let mut queue = create_healthy_queue();
917        queue.status = ComponentStatus::Unhealthy;
918        queue.error = Some("Queue timeout".to_string());
919
920        let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
921        assert_eq!(status, ComponentStatus::Unhealthy);
922        assert!(reason.unwrap().contains("Queue timeout"));
923    }
924
925    #[test]
926    fn test_aggregate_health_multiple_unhealthy() {
927        let mut system = create_healthy_system();
928        system.status = ComponentStatus::Unhealthy;
929        system.error = Some("FD limit".to_string());
930
931        let mut redis = create_healthy_redis();
932        redis.status = ComponentStatus::Unhealthy;
933        redis.error = Some("Redis down".to_string());
934
935        let queue = create_healthy_queue();
936
937        let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
938        assert_eq!(status, ComponentStatus::Unhealthy);
939        let reason_str = reason.unwrap();
940        assert!(reason_str.contains("FD limit"));
941        assert!(reason_str.contains("Redis down"));
942    }
943
944    #[test]
945    fn test_aggregate_health_plugin_degraded_only() {
946        let system = create_healthy_system();
947        let redis = create_healthy_redis();
948        let queue = create_healthy_queue();
949        let plugins = Some(PluginHealth {
950            status: ComponentStatus::Degraded,
951            enabled: true,
952            circuit_state: Some("open".to_string()),
953            error: Some("Circuit open".to_string()),
954            uptime_ms: None,
955            memory: None,
956            pool_completed: None,
957            pool_queued: None,
958            success_rate: None,
959            avg_response_time_ms: None,
960            recovering: None,
961            recovery_percent: None,
962            shared_socket_available_slots: None,
963            shared_socket_active_connections: None,
964            shared_socket_registered_executions: None,
965            connection_pool_available_slots: None,
966            connection_pool_active_connections: None,
967        });
968
969        let (status, reason) = aggregate_health(&system, &redis, &queue, &plugins);
970        // Plugin degraded doesn't cause 503, just degrades overall status
971        assert_eq!(status, ComponentStatus::Degraded);
972        assert!(reason.is_none());
973    }
974
975    #[test]
976    fn test_aggregate_health_unhealthy_overrides_degraded() {
977        let mut system = create_healthy_system();
978        system.status = ComponentStatus::Degraded;
979
980        let mut redis = create_healthy_redis();
981        redis.status = ComponentStatus::Unhealthy;
982        redis.error = Some("Redis down".to_string());
983
984        let queue = create_healthy_queue();
985
986        let (status, _) = aggregate_health(&system, &redis, &queue, &None);
987        assert_eq!(status, ComponentStatus::Unhealthy);
988    }
989
990    // -------------------------------------------------------------------------
991    // Build Response Tests
992    // -------------------------------------------------------------------------
993
994    #[test]
995    fn test_build_response_ready() {
996        let response = build_response(
997            create_healthy_system(),
998            create_healthy_redis(),
999            create_healthy_queue(),
1000            None,
1001        );
1002
1003        assert!(response.ready);
1004        assert_eq!(response.status, ComponentStatus::Healthy);
1005        assert!(response.reason.is_none());
1006        assert!(!response.timestamp.is_empty());
1007    }
1008
1009    #[test]
1010    fn test_build_response_not_ready() {
1011        let mut system = create_healthy_system();
1012        system.status = ComponentStatus::Unhealthy;
1013        system.error = Some("Critical error".to_string());
1014
1015        let response = build_response(system, create_healthy_redis(), create_healthy_queue(), None);
1016
1017        assert!(!response.ready);
1018        assert_eq!(response.status, ComponentStatus::Unhealthy);
1019        assert!(response.reason.is_some());
1020    }
1021
1022    #[test]
1023    fn test_build_response_degraded_is_ready() {
1024        let mut system = create_healthy_system();
1025        system.status = ComponentStatus::Degraded;
1026
1027        let response = build_response(system, create_healthy_redis(), create_healthy_queue(), None);
1028
1029        // Degraded is still ready (returns 200, not 503)
1030        assert!(response.ready);
1031        assert_eq!(response.status, ComponentStatus::Degraded);
1032    }
1033
1034    // -------------------------------------------------------------------------
1035    // Redis/Queue Helper Tests
1036    // -------------------------------------------------------------------------
1037
1038    #[test]
1039    fn test_redis_status_to_health_healthy() {
1040        let status = RedisHealthStatus {
1041            healthy: true,
1042            primary_pool: PoolStatus {
1043                connected: true,
1044                available: 8,
1045                max_size: 16,
1046                error: None,
1047            },
1048            reader_pool: PoolStatus {
1049                connected: true,
1050                available: 8,
1051                max_size: 16,
1052                error: None,
1053            },
1054            error: None,
1055        };
1056
1057        let health = redis_status_to_health(status);
1058        assert_eq!(health.status, ComponentStatus::Healthy);
1059    }
1060
1061    #[test]
1062    fn test_redis_status_to_health_degraded() {
1063        let status = RedisHealthStatus {
1064            healthy: true,
1065            primary_pool: PoolStatus {
1066                connected: true,
1067                available: 8,
1068                max_size: 16,
1069                error: None,
1070            },
1071            reader_pool: PoolStatus {
1072                connected: false, // Reader down
1073                available: 0,
1074                max_size: 16,
1075                error: Some("Connection refused".to_string()),
1076            },
1077            error: None,
1078        };
1079
1080        let health = redis_status_to_health(status);
1081        assert_eq!(health.status, ComponentStatus::Degraded);
1082    }
1083
1084    #[test]
1085    fn test_redis_status_to_health_unhealthy() {
1086        let status = RedisHealthStatus {
1087            healthy: false,
1088            primary_pool: PoolStatus {
1089                connected: false,
1090                available: 0,
1091                max_size: 16,
1092                error: Some("PING timeout".to_string()),
1093            },
1094            reader_pool: PoolStatus {
1095                connected: false,
1096                available: 0,
1097                max_size: 16,
1098                error: Some("PING timeout".to_string()),
1099            },
1100            error: Some("Primary pool failed".to_string()),
1101        };
1102
1103        let health = redis_status_to_health(status);
1104        assert_eq!(health.status, ComponentStatus::Unhealthy);
1105    }
1106
1107    #[test]
1108    fn test_create_unavailable_health() {
1109        let (redis, queue) = create_unavailable_health();
1110
1111        assert_eq!(redis.status, ComponentStatus::Unhealthy);
1112        assert!(!redis.primary_pool.connected);
1113        assert!(!redis.reader_pool.connected);
1114        assert!(redis.error.is_some());
1115
1116        assert_eq!(queue.status, ComponentStatus::Unhealthy);
1117        assert!(queue.error.is_some());
1118    }
1119
1120    // -------------------------------------------------------------------------
1121    // Serialization Tests
1122    // -------------------------------------------------------------------------
1123
1124    #[test]
1125    fn test_pool_status_serialization_without_error() {
1126        let status = PoolStatus {
1127            connected: true,
1128            available: 8,
1129            max_size: 16,
1130            error: None,
1131        };
1132
1133        let json = serde_json::to_string(&status).unwrap();
1134        assert!(json.contains("\"connected\":true"));
1135        assert!(json.contains("\"available\":8"));
1136        assert!(json.contains("\"max_size\":16"));
1137        // error should be omitted when None (skip_serializing_if)
1138        assert!(!json.contains("error"));
1139    }
1140
1141    #[test]
1142    fn test_pool_status_serialization_with_error() {
1143        let status = PoolStatus {
1144            connected: false,
1145            available: 0,
1146            max_size: 16,
1147            error: Some("Connection refused".to_string()),
1148        };
1149
1150        let json = serde_json::to_string(&status).unwrap();
1151        assert!(json.contains("\"connected\":false"));
1152        assert!(json.contains("\"error\":\"Connection refused\""));
1153    }
1154
1155    // -------------------------------------------------------------------------
1156    // System Health Integration Test
1157    // -------------------------------------------------------------------------
1158
1159    #[actix_web::test]
1160    async fn test_check_system_health_returns_valid_data() {
1161        let health = check_system_health();
1162
1163        // Should have positive fd_limit
1164        assert!(health.fd_limit > 0);
1165
1166        // fd_usage_percent should be reasonable (0-100 under normal conditions)
1167        assert!(health.fd_usage_percent <= 100);
1168
1169        // close_wait_count should be non-negative (always true for usize)
1170        // Status should be one of the valid enum values
1171        assert!(matches!(
1172            health.status,
1173            ComponentStatus::Healthy | ComponentStatus::Degraded | ComponentStatus::Unhealthy
1174        ));
1175    }
1176
1177    // -------------------------------------------------------------------------
1178    // Cache Tests
1179    // -------------------------------------------------------------------------
1180
1181    #[actix_web::test]
1182    async fn test_cache_operations() {
1183        // Clear any existing cache
1184        clear_cache().await;
1185
1186        // Should be empty initially
1187        let cached = get_cached_response().await;
1188        assert!(cached.is_none());
1189
1190        // Cache a response
1191        let response = build_response(
1192            create_healthy_system(),
1193            create_healthy_redis(),
1194            create_healthy_queue(),
1195            None,
1196        );
1197        cache_response(&response).await;
1198
1199        // Should now be cached
1200        let cached = get_cached_response().await;
1201        assert!(cached.is_some());
1202        assert_eq!(cached.unwrap().ready, response.ready);
1203
1204        // Clean up
1205        clear_cache().await;
1206    }
1207}