1use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9use deadpool_redis::Pool;
10use tokio::sync::RwLock;
11
12use crate::models::health::{
13 ComponentStatus, Components, PluginHealth, PoolStatus, QueueHealth, ReadinessResponse,
14 RedisHealth, RedisHealthStatus, SystemHealth,
15};
16use crate::models::{
17 NetworkRepoModel, NotificationRepoModel, RelayerRepoModel, SignerRepoModel, ThinDataAppState,
18 TransactionRepoModel,
19};
20use crate::repositories::{
21 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository, Repository,
22 TransactionCounterTrait, TransactionRepository,
23};
24use crate::services::plugins::get_pool_manager;
25use crate::utils::RedisConnections;
26use crate::{jobs::JobProducerTrait, queues::QueueBackend};
27
28const PING_TIMEOUT: Duration = Duration::from_millis(3000);
34
35const WARNING_FD_RATIO: f64 = 0.7;
37
38const MAX_FD_RATIO: f64 = 0.8;
40
41const WARNING_CLOSE_WAIT: usize = 200;
44
45const MAX_CLOSE_WAIT: usize = 500;
48
49const HEALTH_CACHE_TTL: Duration = Duration::from_secs(10);
51
52struct CachedHealth {
58 response: ReadinessResponse,
59 checked_at: Instant,
60}
61
62static HEALTH_CACHE: std::sync::OnceLock<RwLock<Option<CachedHealth>>> = std::sync::OnceLock::new();
64
65fn get_cache() -> &'static RwLock<Option<CachedHealth>> {
66 HEALTH_CACHE.get_or_init(|| RwLock::new(None))
67}
68
69async fn get_cached_response() -> Option<ReadinessResponse> {
71 let cache = get_cache().read().await;
72 if let Some(ref cached) = *cache {
73 if cached.checked_at.elapsed() < HEALTH_CACHE_TTL {
74 return Some(cached.response.clone());
75 }
76 }
77 None
78}
79
80async fn cache_response(response: &ReadinessResponse) {
82 let mut cache = get_cache().write().await;
83 *cache = Some(CachedHealth {
84 response: response.clone(),
85 checked_at: Instant::now(),
86 });
87}
88
89#[cfg(test)]
91pub async fn clear_cache() {
92 let mut cache = get_cache().write().await;
93 *cache = None;
94}
95
96async fn ping_pool(pool: &Arc<Pool>, name: &str) -> PoolStatus {
102 let status = pool.status();
103
104 let result = tokio::time::timeout(PING_TIMEOUT, async {
105 let mut conn = pool.get().await?;
106 redis::cmd("PING")
107 .query_async::<String>(&mut conn)
108 .await
109 .map_err(deadpool_redis::PoolError::Backend)
110 })
111 .await;
112
113 match result {
114 Ok(Ok(_)) => PoolStatus {
115 connected: true,
116 available: status.available,
117 max_size: status.max_size,
118 error: None,
119 },
120 Ok(Err(e)) => {
121 tracing::warn!(pool = %name, error = %e, "Redis pool PING failed");
122 PoolStatus {
123 connected: false,
124 available: status.available,
125 max_size: status.max_size,
126 error: Some(e.to_string()),
127 }
128 }
129 Err(_) => {
130 tracing::warn!(pool = %name, "Redis pool PING timed out");
131 PoolStatus {
132 connected: false,
133 available: status.available,
134 max_size: status.max_size,
135 error: Some("PING timed out".to_string()),
136 }
137 }
138 }
139}
140
141async fn check_redis_health(connections: &Arc<RedisConnections>) -> RedisHealthStatus {
146 let (primary_status, reader_status) = tokio::join!(
147 ping_pool(connections.primary(), "primary"),
148 ping_pool(connections.reader(), "reader")
149 );
150
151 let healthy = primary_status.connected;
153
154 let error = if !primary_status.connected {
155 Some(format!(
156 "Redis primary pool: {}",
157 primary_status
158 .error
159 .as_deref()
160 .unwrap_or("connection failed")
161 ))
162 } else {
163 None
164 };
165
166 RedisHealthStatus {
167 healthy,
168 primary_pool: primary_status,
169 reader_pool: reader_status,
170 error,
171 }
172}
173
174fn redis_status_to_health(status: RedisHealthStatus) -> RedisHealth {
176 let component_status = if status.healthy {
177 if status.reader_pool.connected {
178 ComponentStatus::Healthy
179 } else {
180 ComponentStatus::Degraded }
182 } else {
183 ComponentStatus::Unhealthy
184 };
185
186 RedisHealth {
187 status: component_status,
188 primary_pool: status.primary_pool,
189 reader_pool: status.reader_pool,
190 error: status.error,
191 }
192}
193
194fn create_not_applicable_redis_health() -> RedisHealth {
199 let neutral_pool = PoolStatus {
200 connected: true,
201 available: 0,
202 max_size: 0,
203 error: None,
204 };
205
206 RedisHealth {
207 status: ComponentStatus::Healthy,
208 primary_pool: neutral_pool.clone(),
209 reader_pool: neutral_pool,
210 error: None,
211 }
212}
213
214fn create_unavailable_health() -> (RedisHealth, QueueHealth) {
216 let error_msg = "Queue unavailable - cannot check Redis or Queue health";
217 let unhealthy_pool = PoolStatus {
218 connected: false,
219 available: 0,
220 max_size: 0,
221 error: Some(error_msg.to_string()),
222 };
223
224 let redis = RedisHealth {
225 status: ComponentStatus::Unhealthy,
226 primary_pool: unhealthy_pool.clone(),
227 reader_pool: unhealthy_pool,
228 error: Some(error_msg.to_string()),
229 };
230
231 let queue = QueueHealth {
232 status: ComponentStatus::Unhealthy,
233 error: Some("Failed to get queue from job producer".to_string()),
234 };
235
236 (redis, queue)
237}
238
239async fn check_queue_backend_health(
241 queue_backend: Option<Arc<crate::queues::QueueBackendStorage>>,
242) -> QueueHealth {
243 match queue_backend {
244 Some(backend) => match backend.health_check().await {
245 Ok(backend_healths) => {
246 let all_healthy = backend_healths.iter().all(|h| h.is_healthy);
247 let error = if all_healthy {
248 None
249 } else {
250 let unhealthy_queues: Vec<_> = backend_healths
251 .iter()
252 .filter(|h| !h.is_healthy)
253 .map(|h| h.queue_type.to_string())
254 .collect();
255 Some(format!("Unhealthy queues: {}", unhealthy_queues.join(", ")))
256 };
257 QueueHealth {
258 status: if all_healthy {
259 ComponentStatus::Healthy
260 } else {
261 ComponentStatus::Unhealthy
262 },
263 error,
264 }
265 }
266 Err(_) => {
267 let (_, unavailable_queue) = create_unavailable_health();
268 unavailable_queue
269 }
270 },
271 None => {
272 let (_, unavailable_queue) = create_unavailable_health();
273 unavailable_queue
274 }
275 }
276}
277
278fn get_fd_count() -> Result<usize, std::io::Error> {
284 let pid = std::process::id();
285
286 #[cfg(target_os = "linux")]
287 {
288 let fd_dir = format!("/proc/{pid}/fd");
289 std::fs::read_dir(fd_dir).map(|entries| entries.count())
290 }
291
292 #[cfg(target_os = "macos")]
293 {
294 use std::process::Command;
295 let output = Command::new("lsof")
296 .args(["-p", &pid.to_string()])
297 .output()?;
298 let count = String::from_utf8_lossy(&output.stdout)
299 .lines()
300 .count()
301 .saturating_sub(1); Ok(count)
303 }
304
305 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
306 {
307 Ok(0) }
309}
310
311fn get_fd_limit() -> Result<usize, std::io::Error> {
313 #[cfg(any(target_os = "linux", target_os = "macos"))]
314 {
315 use std::process::Command;
316 let output = Command::new("sh").args(["-c", "ulimit -n"]).output()?;
317 let limit = String::from_utf8_lossy(&output.stdout)
318 .trim()
319 .parse()
320 .unwrap_or(1024);
321 Ok(limit)
322 }
323
324 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
325 {
326 Ok(1024) }
328}
329
330fn get_close_wait_count() -> Result<usize, std::io::Error> {
332 #[cfg(any(target_os = "linux", target_os = "macos"))]
333 {
334 use std::process::Command;
335 let output = Command::new("sh")
336 .args(["-c", "netstat -an | grep CLOSE_WAIT | wc -l"])
337 .output()?;
338 let count = String::from_utf8_lossy(&output.stdout)
339 .trim()
340 .parse()
341 .unwrap_or(0);
342 Ok(count)
343 }
344
345 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
346 {
347 Ok(0) }
349}
350
351fn evaluate_system_metrics(
355 fd_ratio: f64,
356 fd_count: usize,
357 fd_limit: usize,
358 close_wait_count: usize,
359) -> (ComponentStatus, Option<String>) {
360 let mut errors: Vec<String> = Vec::new();
361 let mut is_degraded = false;
362
363 if fd_ratio > MAX_FD_RATIO {
365 let fd_percent = fd_ratio * 100.0;
366 let max_percent = MAX_FD_RATIO * 100.0;
367 errors.push(format!(
368 "File descriptor limit critical: {fd_count}/{fd_limit} ({fd_percent:.1}% > {max_percent:.1}%)"
369 ));
370 } else if fd_ratio > WARNING_FD_RATIO {
371 is_degraded = true;
372 }
373
374 if close_wait_count > MAX_CLOSE_WAIT {
376 errors.push(format!(
377 "Too many CLOSE_WAIT sockets: {close_wait_count} > {MAX_CLOSE_WAIT}"
378 ));
379 } else if close_wait_count > WARNING_CLOSE_WAIT {
380 is_degraded = true;
381 }
382
383 let status = if !errors.is_empty() {
384 ComponentStatus::Unhealthy
385 } else if is_degraded {
386 ComponentStatus::Degraded
387 } else {
388 ComponentStatus::Healthy
389 };
390
391 let error = if errors.is_empty() {
392 None
393 } else {
394 Some(errors.join("; "))
395 };
396
397 (status, error)
398}
399
400pub fn check_system_health() -> SystemHealth {
407 let fd_count = get_fd_count().unwrap_or(0);
408 let fd_limit = get_fd_limit().unwrap_or(1024);
409 let close_wait_count = get_close_wait_count().unwrap_or(0);
410
411 let fd_ratio = if fd_limit > 0 {
412 fd_count as f64 / fd_limit as f64
413 } else {
414 0.0
415 };
416 let fd_usage_percent = (fd_ratio * 100.0) as u32;
417
418 let (status, error) = evaluate_system_metrics(fd_ratio, fd_count, fd_limit, close_wait_count);
419
420 SystemHealth {
421 status,
422 fd_count,
423 fd_limit,
424 fd_usage_percent,
425 close_wait_count,
426 error,
427 }
428}
429
430fn determine_plugin_status(healthy: bool, circuit_state: Option<&str>) -> ComponentStatus {
436 if healthy {
437 match circuit_state {
438 Some("closed") | None => ComponentStatus::Healthy,
439 Some("half_open") | Some("open") => ComponentStatus::Degraded,
440 _ => ComponentStatus::Healthy,
441 }
442 } else {
443 ComponentStatus::Degraded
444 }
445}
446
447pub async fn check_plugin_health() -> Option<PluginHealth> {
449 let pool_manager = get_pool_manager();
450
451 if !pool_manager.is_initialized().await {
452 return None;
453 }
454
455 match pool_manager.health_check().await {
456 Ok(plugin_status) => {
457 let status = determine_plugin_status(
458 plugin_status.healthy,
459 plugin_status.circuit_state.as_deref(),
460 );
461
462 Some(PluginHealth {
463 status,
464 enabled: true,
465 circuit_state: plugin_status.circuit_state,
466 error: if plugin_status.healthy {
467 None
468 } else {
469 Some(plugin_status.status)
470 },
471 uptime_ms: plugin_status.uptime_ms,
472 memory: plugin_status.memory,
473 pool_completed: plugin_status.pool_completed,
474 pool_queued: plugin_status.pool_queued,
475 success_rate: plugin_status.success_rate,
476 avg_response_time_ms: plugin_status.avg_response_time_ms,
477 recovering: plugin_status.recovering,
478 recovery_percent: plugin_status.recovery_percent,
479 shared_socket_available_slots: plugin_status.shared_socket_available_slots,
480 shared_socket_active_connections: plugin_status.shared_socket_active_connections,
481 shared_socket_registered_executions: plugin_status
482 .shared_socket_registered_executions,
483 connection_pool_available_slots: plugin_status.connection_pool_available_slots,
484 connection_pool_active_connections: plugin_status
485 .connection_pool_active_connections,
486 })
487 }
488 Err(e) => Some(PluginHealth {
489 status: ComponentStatus::Degraded,
490 enabled: true,
491 circuit_state: None,
492 error: Some(e.to_string()),
493 uptime_ms: None,
494 memory: None,
495 pool_completed: None,
496 pool_queued: None,
497 success_rate: None,
498 avg_response_time_ms: None,
499 recovering: None,
500 recovery_percent: None,
501 shared_socket_available_slots: None,
502 shared_socket_active_connections: None,
503 shared_socket_registered_executions: None,
504 connection_pool_available_slots: None,
505 connection_pool_active_connections: None,
506 }),
507 }
508}
509
510fn aggregate_health(
519 system: &SystemHealth,
520 redis: &RedisHealth,
521 queue: &QueueHealth,
522 plugins: &Option<PluginHealth>,
523) -> (ComponentStatus, Option<String>) {
524 let mut reasons: Vec<String> = Vec::new();
525 let mut overall_status = ComponentStatus::Healthy;
526
527 if system.status == ComponentStatus::Unhealthy {
529 overall_status = ComponentStatus::Unhealthy;
530 if let Some(ref err) = system.error {
531 reasons.push(err.clone());
532 }
533 } else if system.status == ComponentStatus::Degraded
534 && overall_status == ComponentStatus::Healthy
535 {
536 overall_status = ComponentStatus::Degraded;
537 }
538
539 if redis.status == ComponentStatus::Unhealthy {
541 overall_status = ComponentStatus::Unhealthy;
542 if let Some(ref err) = redis.error {
543 reasons.push(err.clone());
544 }
545 } else if redis.status == ComponentStatus::Degraded
546 && overall_status == ComponentStatus::Healthy
547 {
548 overall_status = ComponentStatus::Degraded;
549 }
550
551 if queue.status == ComponentStatus::Unhealthy {
553 overall_status = ComponentStatus::Unhealthy;
554 if let Some(ref err) = queue.error {
555 reasons.push(err.clone());
556 }
557 } else if queue.status == ComponentStatus::Degraded
558 && overall_status == ComponentStatus::Healthy
559 {
560 overall_status = ComponentStatus::Degraded;
561 }
562
563 if let Some(ref plugin_health) = plugins {
565 if plugin_health.status == ComponentStatus::Degraded
566 && overall_status == ComponentStatus::Healthy
567 {
568 overall_status = ComponentStatus::Degraded;
569 }
570 }
571
572 let reason = if reasons.is_empty() {
573 None
574 } else {
575 Some(reasons.join("; "))
576 };
577
578 (overall_status, reason)
579}
580
581fn build_response(
583 system: SystemHealth,
584 redis: RedisHealth,
585 queue: QueueHealth,
586 plugins: Option<PluginHealth>,
587) -> ReadinessResponse {
588 let (overall_status, reason) = aggregate_health(&system, &redis, &queue, &plugins);
589 let ready = overall_status != ComponentStatus::Unhealthy;
590
591 ReadinessResponse {
592 ready,
593 status: overall_status,
594 reason,
595 components: Components {
596 system,
597 redis,
598 queue,
599 plugins,
600 },
601 timestamp: chrono::Utc::now().to_rfc3339(),
602 }
603}
604
605pub async fn get_readiness<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
616 data: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
617) -> ReadinessResponse
618where
619 J: JobProducerTrait + Send + Sync + 'static,
620 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
621 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
622 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
623 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
624 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
625 TCR: TransactionCounterTrait + Send + Sync + 'static,
626 PR: PluginRepositoryTrait + Send + Sync + 'static,
627 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
628{
629 if let Some(cached) = get_cached_response().await {
631 return cached;
632 }
633
634 let system = check_system_health();
636 let plugins = check_plugin_health().await;
637
638 let redis = if let Some((redis_connections, _key_prefix)) =
640 data.transaction_repository.connection_info()
641 {
642 let redis_status = check_redis_health(&redis_connections).await;
643 redis_status_to_health(redis_status)
644 } else {
645 create_not_applicable_redis_health()
646 };
647
648 let queue_health = check_queue_backend_health(data.job_producer.get_queue_backend()).await;
650
651 let response = build_response(system, redis, queue_health, plugins);
653
654 cache_response(&response).await;
656
657 response
658}
659
660#[cfg(test)]
665mod tests {
666 use super::*;
667
668 #[test]
673 fn test_constants() {
674 assert_eq!(PING_TIMEOUT, Duration::from_millis(3000));
675 assert_eq!(WARNING_FD_RATIO, 0.7);
676 assert_eq!(MAX_FD_RATIO, 0.8);
677 assert_eq!(WARNING_CLOSE_WAIT, 200);
678 assert_eq!(MAX_CLOSE_WAIT, 500);
679 assert_eq!(HEALTH_CACHE_TTL, Duration::from_secs(10));
680 }
681
682 #[test]
687 fn test_evaluate_system_metrics_healthy() {
688 let (status, error) = evaluate_system_metrics(0.5, 500, 1000, 20);
690 assert_eq!(status, ComponentStatus::Healthy);
691 assert!(error.is_none());
692 }
693
694 #[test]
695 fn test_evaluate_system_metrics_degraded_fd() {
696 let (status, error) = evaluate_system_metrics(0.75, 750, 1000, 20);
698 assert_eq!(status, ComponentStatus::Degraded);
699 assert!(error.is_none());
700 }
701
702 #[test]
703 fn test_evaluate_system_metrics_degraded_close_wait() {
704 let (status, error) = evaluate_system_metrics(0.5, 500, 1000, 300);
706 assert_eq!(status, ComponentStatus::Degraded);
707 assert!(error.is_none());
708 }
709
710 #[test]
711 fn test_evaluate_system_metrics_unhealthy_fd() {
712 let (status, error) = evaluate_system_metrics(0.85, 850, 1000, 20);
714 assert_eq!(status, ComponentStatus::Unhealthy);
715 assert!(error.is_some());
716 assert!(error.unwrap().contains("File descriptor limit critical"));
717 }
718
719 #[test]
720 fn test_evaluate_system_metrics_unhealthy_close_wait() {
721 let (status, error) = evaluate_system_metrics(0.5, 500, 1000, 600);
723 assert_eq!(status, ComponentStatus::Unhealthy);
724 assert!(error.is_some());
725 assert!(error.unwrap().contains("CLOSE_WAIT"));
726 }
727
728 #[test]
729 fn test_evaluate_system_metrics_both_unhealthy() {
730 let (status, error) = evaluate_system_metrics(0.9, 900, 1000, 600);
732 assert_eq!(status, ComponentStatus::Unhealthy);
733 assert!(error.is_some());
734 let err = error.unwrap();
735 assert!(err.contains("File descriptor"));
736 assert!(err.contains("CLOSE_WAIT"));
737 }
738
739 #[test]
740 fn test_evaluate_system_metrics_at_warning_threshold() {
741 let (status, _) = evaluate_system_metrics(0.7, 700, 1000, 200);
743 assert_eq!(status, ComponentStatus::Healthy);
745 }
746
747 #[test]
748 fn test_evaluate_system_metrics_just_above_warning() {
749 let (status, _) = evaluate_system_metrics(0.71, 710, 1000, 201);
751 assert_eq!(status, ComponentStatus::Degraded);
752 }
753
754 #[test]
755 fn test_evaluate_system_metrics_at_max_threshold() {
756 let (status, _) = evaluate_system_metrics(0.8, 800, 1000, 500);
758 assert_eq!(status, ComponentStatus::Degraded);
760 }
761
762 #[test]
763 fn test_evaluate_system_metrics_zero_limit() {
764 let (status, _) = evaluate_system_metrics(0.0, 0, 0, 0);
766 assert_eq!(status, ComponentStatus::Healthy);
767 }
768
769 #[test]
774 fn test_determine_plugin_status_healthy_closed() {
775 assert_eq!(
776 determine_plugin_status(true, Some("closed")),
777 ComponentStatus::Healthy
778 );
779 }
780
781 #[test]
782 fn test_determine_plugin_status_healthy_none() {
783 assert_eq!(
784 determine_plugin_status(true, None),
785 ComponentStatus::Healthy
786 );
787 }
788
789 #[test]
790 fn test_determine_plugin_status_healthy_half_open() {
791 assert_eq!(
792 determine_plugin_status(true, Some("half_open")),
793 ComponentStatus::Degraded
794 );
795 }
796
797 #[test]
798 fn test_determine_plugin_status_healthy_open() {
799 assert_eq!(
800 determine_plugin_status(true, Some("open")),
801 ComponentStatus::Degraded
802 );
803 }
804
805 #[test]
806 fn test_determine_plugin_status_unhealthy() {
807 assert_eq!(
808 determine_plugin_status(false, Some("closed")),
809 ComponentStatus::Degraded
810 );
811 }
812
813 #[test]
814 fn test_determine_plugin_status_unknown_state() {
815 assert_eq!(
816 determine_plugin_status(true, Some("unknown")),
817 ComponentStatus::Healthy
818 );
819 }
820
821 fn create_healthy_system() -> SystemHealth {
826 SystemHealth {
827 status: ComponentStatus::Healthy,
828 fd_count: 100,
829 fd_limit: 1000,
830 fd_usage_percent: 10,
831 close_wait_count: 5,
832 error: None,
833 }
834 }
835
836 fn create_healthy_redis() -> RedisHealth {
837 RedisHealth {
838 status: ComponentStatus::Healthy,
839 primary_pool: PoolStatus {
840 connected: true,
841 available: 8,
842 max_size: 16,
843 error: None,
844 },
845 reader_pool: PoolStatus {
846 connected: true,
847 available: 8,
848 max_size: 16,
849 error: None,
850 },
851 error: None,
852 }
853 }
854
855 fn create_healthy_queue() -> QueueHealth {
856 QueueHealth {
857 status: ComponentStatus::Healthy,
858 error: None,
859 }
860 }
861
862 #[test]
863 fn test_aggregate_health_all_healthy() {
864 let system = create_healthy_system();
865 let redis = create_healthy_redis();
866 let queue = create_healthy_queue();
867
868 let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
869 assert_eq!(status, ComponentStatus::Healthy);
870 assert!(reason.is_none());
871 }
872
873 #[test]
874 fn test_aggregate_health_system_degraded() {
875 let mut system = create_healthy_system();
876 system.status = ComponentStatus::Degraded;
877 let redis = create_healthy_redis();
878 let queue = create_healthy_queue();
879
880 let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
881 assert_eq!(status, ComponentStatus::Degraded);
882 assert!(reason.is_none());
883 }
884
885 #[test]
886 fn test_aggregate_health_system_unhealthy() {
887 let mut system = create_healthy_system();
888 system.status = ComponentStatus::Unhealthy;
889 system.error = Some("FD limit exceeded".to_string());
890 let redis = create_healthy_redis();
891 let queue = create_healthy_queue();
892
893 let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
894 assert_eq!(status, ComponentStatus::Unhealthy);
895 assert!(reason.is_some());
896 assert!(reason.unwrap().contains("FD limit"));
897 }
898
899 #[test]
900 fn test_aggregate_health_redis_unhealthy() {
901 let system = create_healthy_system();
902 let mut redis = create_healthy_redis();
903 redis.status = ComponentStatus::Unhealthy;
904 redis.error = Some("Primary pool down".to_string());
905 let queue = create_healthy_queue();
906
907 let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
908 assert_eq!(status, ComponentStatus::Unhealthy);
909 assert!(reason.unwrap().contains("Primary pool"));
910 }
911
912 #[test]
913 fn test_aggregate_health_queue_unhealthy() {
914 let system = create_healthy_system();
915 let redis = create_healthy_redis();
916 let mut queue = create_healthy_queue();
917 queue.status = ComponentStatus::Unhealthy;
918 queue.error = Some("Queue timeout".to_string());
919
920 let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
921 assert_eq!(status, ComponentStatus::Unhealthy);
922 assert!(reason.unwrap().contains("Queue timeout"));
923 }
924
925 #[test]
926 fn test_aggregate_health_multiple_unhealthy() {
927 let mut system = create_healthy_system();
928 system.status = ComponentStatus::Unhealthy;
929 system.error = Some("FD limit".to_string());
930
931 let mut redis = create_healthy_redis();
932 redis.status = ComponentStatus::Unhealthy;
933 redis.error = Some("Redis down".to_string());
934
935 let queue = create_healthy_queue();
936
937 let (status, reason) = aggregate_health(&system, &redis, &queue, &None);
938 assert_eq!(status, ComponentStatus::Unhealthy);
939 let reason_str = reason.unwrap();
940 assert!(reason_str.contains("FD limit"));
941 assert!(reason_str.contains("Redis down"));
942 }
943
944 #[test]
945 fn test_aggregate_health_plugin_degraded_only() {
946 let system = create_healthy_system();
947 let redis = create_healthy_redis();
948 let queue = create_healthy_queue();
949 let plugins = Some(PluginHealth {
950 status: ComponentStatus::Degraded,
951 enabled: true,
952 circuit_state: Some("open".to_string()),
953 error: Some("Circuit open".to_string()),
954 uptime_ms: None,
955 memory: None,
956 pool_completed: None,
957 pool_queued: None,
958 success_rate: None,
959 avg_response_time_ms: None,
960 recovering: None,
961 recovery_percent: None,
962 shared_socket_available_slots: None,
963 shared_socket_active_connections: None,
964 shared_socket_registered_executions: None,
965 connection_pool_available_slots: None,
966 connection_pool_active_connections: None,
967 });
968
969 let (status, reason) = aggregate_health(&system, &redis, &queue, &plugins);
970 assert_eq!(status, ComponentStatus::Degraded);
972 assert!(reason.is_none());
973 }
974
975 #[test]
976 fn test_aggregate_health_unhealthy_overrides_degraded() {
977 let mut system = create_healthy_system();
978 system.status = ComponentStatus::Degraded;
979
980 let mut redis = create_healthy_redis();
981 redis.status = ComponentStatus::Unhealthy;
982 redis.error = Some("Redis down".to_string());
983
984 let queue = create_healthy_queue();
985
986 let (status, _) = aggregate_health(&system, &redis, &queue, &None);
987 assert_eq!(status, ComponentStatus::Unhealthy);
988 }
989
990 #[test]
995 fn test_build_response_ready() {
996 let response = build_response(
997 create_healthy_system(),
998 create_healthy_redis(),
999 create_healthy_queue(),
1000 None,
1001 );
1002
1003 assert!(response.ready);
1004 assert_eq!(response.status, ComponentStatus::Healthy);
1005 assert!(response.reason.is_none());
1006 assert!(!response.timestamp.is_empty());
1007 }
1008
1009 #[test]
1010 fn test_build_response_not_ready() {
1011 let mut system = create_healthy_system();
1012 system.status = ComponentStatus::Unhealthy;
1013 system.error = Some("Critical error".to_string());
1014
1015 let response = build_response(system, create_healthy_redis(), create_healthy_queue(), None);
1016
1017 assert!(!response.ready);
1018 assert_eq!(response.status, ComponentStatus::Unhealthy);
1019 assert!(response.reason.is_some());
1020 }
1021
1022 #[test]
1023 fn test_build_response_degraded_is_ready() {
1024 let mut system = create_healthy_system();
1025 system.status = ComponentStatus::Degraded;
1026
1027 let response = build_response(system, create_healthy_redis(), create_healthy_queue(), None);
1028
1029 assert!(response.ready);
1031 assert_eq!(response.status, ComponentStatus::Degraded);
1032 }
1033
1034 #[test]
1039 fn test_redis_status_to_health_healthy() {
1040 let status = RedisHealthStatus {
1041 healthy: true,
1042 primary_pool: PoolStatus {
1043 connected: true,
1044 available: 8,
1045 max_size: 16,
1046 error: None,
1047 },
1048 reader_pool: PoolStatus {
1049 connected: true,
1050 available: 8,
1051 max_size: 16,
1052 error: None,
1053 },
1054 error: None,
1055 };
1056
1057 let health = redis_status_to_health(status);
1058 assert_eq!(health.status, ComponentStatus::Healthy);
1059 }
1060
1061 #[test]
1062 fn test_redis_status_to_health_degraded() {
1063 let status = RedisHealthStatus {
1064 healthy: true,
1065 primary_pool: PoolStatus {
1066 connected: true,
1067 available: 8,
1068 max_size: 16,
1069 error: None,
1070 },
1071 reader_pool: PoolStatus {
1072 connected: false, available: 0,
1074 max_size: 16,
1075 error: Some("Connection refused".to_string()),
1076 },
1077 error: None,
1078 };
1079
1080 let health = redis_status_to_health(status);
1081 assert_eq!(health.status, ComponentStatus::Degraded);
1082 }
1083
1084 #[test]
1085 fn test_redis_status_to_health_unhealthy() {
1086 let status = RedisHealthStatus {
1087 healthy: false,
1088 primary_pool: PoolStatus {
1089 connected: false,
1090 available: 0,
1091 max_size: 16,
1092 error: Some("PING timeout".to_string()),
1093 },
1094 reader_pool: PoolStatus {
1095 connected: false,
1096 available: 0,
1097 max_size: 16,
1098 error: Some("PING timeout".to_string()),
1099 },
1100 error: Some("Primary pool failed".to_string()),
1101 };
1102
1103 let health = redis_status_to_health(status);
1104 assert_eq!(health.status, ComponentStatus::Unhealthy);
1105 }
1106
1107 #[test]
1108 fn test_create_unavailable_health() {
1109 let (redis, queue) = create_unavailable_health();
1110
1111 assert_eq!(redis.status, ComponentStatus::Unhealthy);
1112 assert!(!redis.primary_pool.connected);
1113 assert!(!redis.reader_pool.connected);
1114 assert!(redis.error.is_some());
1115
1116 assert_eq!(queue.status, ComponentStatus::Unhealthy);
1117 assert!(queue.error.is_some());
1118 }
1119
1120 #[test]
1125 fn test_pool_status_serialization_without_error() {
1126 let status = PoolStatus {
1127 connected: true,
1128 available: 8,
1129 max_size: 16,
1130 error: None,
1131 };
1132
1133 let json = serde_json::to_string(&status).unwrap();
1134 assert!(json.contains("\"connected\":true"));
1135 assert!(json.contains("\"available\":8"));
1136 assert!(json.contains("\"max_size\":16"));
1137 assert!(!json.contains("error"));
1139 }
1140
1141 #[test]
1142 fn test_pool_status_serialization_with_error() {
1143 let status = PoolStatus {
1144 connected: false,
1145 available: 0,
1146 max_size: 16,
1147 error: Some("Connection refused".to_string()),
1148 };
1149
1150 let json = serde_json::to_string(&status).unwrap();
1151 assert!(json.contains("\"connected\":false"));
1152 assert!(json.contains("\"error\":\"Connection refused\""));
1153 }
1154
1155 #[actix_web::test]
1160 async fn test_check_system_health_returns_valid_data() {
1161 let health = check_system_health();
1162
1163 assert!(health.fd_limit > 0);
1165
1166 assert!(health.fd_usage_percent <= 100);
1168
1169 assert!(matches!(
1172 health.status,
1173 ComponentStatus::Healthy | ComponentStatus::Degraded | ComponentStatus::Unhealthy
1174 ));
1175 }
1176
1177 #[actix_web::test]
1182 async fn test_cache_operations() {
1183 clear_cache().await;
1185
1186 let cached = get_cached_response().await;
1188 assert!(cached.is_none());
1189
1190 let response = build_response(
1192 create_healthy_system(),
1193 create_healthy_redis(),
1194 create_healthy_queue(),
1195 None,
1196 );
1197 cache_response(&response).await;
1198
1199 let cached = get_cached_response().await;
1201 assert!(cached.is_some());
1202 assert_eq!(cached.unwrap().ready, response.ready);
1203
1204 clear_cache().await;
1206 }
1207}