1use std::collections::HashMap;
7use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
8
9use chrono::{DateTime, Utc};
10use once_cell::sync::Lazy;
11use tracing::{debug, warn};
12
13#[derive(Clone, Debug, Default, PartialEq, Eq)]
15pub struct RpcConfigMetadata {
16 pub failure_timestamps: Vec<DateTime<Utc>>,
19 pub paused_until: Option<DateTime<Utc>>,
22}
23
24pub struct RpcHealthStore {
29 metadata: Arc<RwLock<HashMap<String, RpcConfigMetadata>>>,
30}
31
32static HEALTH_STORE: Lazy<RpcHealthStore> = Lazy::new(|| RpcHealthStore {
33 metadata: Arc::new(RwLock::new(HashMap::new())),
34});
35
36impl RpcHealthStore {
37 pub fn instance() -> &'static RpcHealthStore {
39 &HEALTH_STORE
40 }
41
42 fn acquire_read_lock(&self) -> RwLockReadGuard<'_, HashMap<String, RpcConfigMetadata>> {
47 match self.metadata.read() {
48 Ok(guard) => guard,
49 Err(poisoned) => {
50 warn!("RpcHealthStore read lock was poisoned, recovering");
51 poisoned.into_inner()
52 }
53 }
54 }
55
56 fn acquire_write_lock(&self) -> RwLockWriteGuard<'_, HashMap<String, RpcConfigMetadata>> {
61 match self.metadata.write() {
62 Ok(guard) => guard,
63 Err(poisoned) => {
64 warn!("RpcHealthStore write lock was poisoned, recovering");
65 poisoned.into_inner()
66 }
67 }
68 }
69
70 pub fn get_metadata(&self, url: &str) -> RpcConfigMetadata {
80 let store = self.acquire_read_lock();
81 store.get(url).cloned().unwrap_or_default()
82 }
83
84 pub fn update_metadata(&self, url: &str, metadata: RpcConfigMetadata) {
90 let mut store = self.acquire_write_lock();
91 store.insert(url.to_string(), metadata);
92 }
93
94 pub fn mark_failed(
104 &self,
105 url: &str,
106 threshold: u32,
107 pause_duration: chrono::Duration,
108 failure_expiration: chrono::Duration,
109 ) {
110 let mut store = self.acquire_write_lock();
111 let mut metadata = store.get(url).cloned().unwrap_or_default();
112
113 let now = Utc::now();
114
115 metadata
117 .failure_timestamps
118 .retain(|&ts| now - ts <= failure_expiration);
119
120 metadata.failure_timestamps.push(now);
122
123 let max_size = (threshold * 2) as usize;
125 if metadata.failure_timestamps.len() > max_size {
126 let remove_count = metadata.failure_timestamps.len() - max_size;
129 metadata.failure_timestamps.drain(0..remove_count);
130 }
131
132 let recent_failures = metadata.failure_timestamps.len() as u32;
134 let was_paused = metadata.paused_until.is_some();
135
136 if recent_failures >= threshold {
137 let paused_until = now + pause_duration;
138 metadata.paused_until = Some(paused_until);
139
140 if !was_paused {
141 debug!(
143 provider_url = %url,
144 failure_count = %recent_failures,
145 threshold = %threshold,
146 paused_until = %paused_until,
147 pause_duration_secs = %pause_duration.num_seconds(),
148 "RPC provider paused due to failures"
149 );
150 } else {
151 debug!(
153 provider_url = %url,
154 failure_count = %recent_failures,
155 threshold = %threshold,
156 paused_until = %paused_until,
157 pause_duration_secs = %pause_duration.num_seconds(),
158 "RPC provider pause extended due to additional failures"
159 );
160 }
161 }
162
163 store.insert(url.to_string(), metadata);
164 }
165
166 pub fn reset_failures(&self, url: &str) {
171 let mut store = self.acquire_write_lock();
172 store.remove(url);
173 }
174
175 #[must_use]
183 pub fn reset_failures_if_exists(&self, url: &str) -> bool {
184 let mut store = self.acquire_write_lock();
185 store.remove(url).is_some()
186 }
187
188 pub fn is_paused(
209 &self,
210 url: &str,
211 threshold: u32,
212 failure_expiration: chrono::Duration,
213 ) -> bool {
214 let now = Utc::now();
215
216 let needs_write = {
218 let store = self.acquire_read_lock();
219 match store.get(url) {
220 None => return false, Some(meta) => {
222 let has_stale_failures = meta
224 .failure_timestamps
225 .iter()
226 .any(|&ts| now - ts > failure_expiration);
227 let pause_expired = meta
228 .paused_until
229 .is_some_and(|paused_until| now >= paused_until);
230 let needs_cleanup =
231 meta.failure_timestamps.is_empty() && meta.paused_until.is_none();
232
233 if has_stale_failures || pause_expired || needs_cleanup {
234 true
236 } else {
237 let recent_failures = meta.failure_timestamps.len() as u32;
239 if recent_failures >= threshold {
240 if let Some(paused_until) = meta.paused_until {
241 return now < paused_until;
242 }
243 }
244 return false;
245 }
246 }
247 }
248 };
249
250 if needs_write {
252 self.is_paused_with_cleanup(url, threshold, failure_expiration, now)
253 } else {
254 false
255 }
256 }
257
258 fn is_paused_with_cleanup(
262 &self,
263 url: &str,
264 threshold: u32,
265 failure_expiration: chrono::Duration,
266 now: DateTime<Utc>,
267 ) -> bool {
268 let mut store = self.acquire_write_lock();
269
270 let Some(meta) = store.get_mut(url) else {
272 return false;
273 };
274
275 meta.failure_timestamps
277 .retain(|&ts| now - ts <= failure_expiration);
278
279 if let Some(paused_until) = meta.paused_until {
281 if now >= paused_until {
282 debug!(
284 provider_url = %url,
285 paused_until = %paused_until,
286 current_time = %now,
287 remaining_failures = %meta.failure_timestamps.len(),
288 "RPC provider pause expired, provider available again"
289 );
290 meta.paused_until = None;
291 if meta.failure_timestamps.is_empty() {
293 store.remove(url);
294 }
295 return false;
296 }
297 }
298
299 let recent_failures = meta.failure_timestamps.len() as u32;
301 if recent_failures >= threshold {
302 if let Some(paused_until) = meta.paused_until {
303 return now < paused_until;
304 }
305 return false;
307 }
308
309 if meta.failure_timestamps.is_empty() && meta.paused_until.is_none() {
311 store.remove(url);
312 }
313
314 false
315 }
316
317 #[cfg(test)]
320 pub fn clear_all(&self) {
321 let mut store = self.acquire_write_lock();
322 store.clear();
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 #[test]
331 fn test_get_metadata_returns_default_when_not_found() {
332 let store = RpcHealthStore::instance();
333
334 let url = "https://test-get-metadata.example.com";
336 let metadata = store.get_metadata(url);
337 assert_eq!(metadata, RpcConfigMetadata::default());
338 assert_eq!(metadata.failure_timestamps.len(), 0);
339 assert_eq!(metadata.paused_until, None);
340 }
341
342 #[test]
343 fn test_update_and_get_metadata() {
344 let store = RpcHealthStore::instance();
345
346 let url = "https://test-update-metadata.example.com";
347 let mut metadata = RpcConfigMetadata::default();
348 metadata.failure_timestamps.push(Utc::now());
349 metadata.failure_timestamps.push(Utc::now());
350 metadata.failure_timestamps.push(Utc::now());
351
352 store.update_metadata(url, metadata.clone());
353
354 let retrieved = store.get_metadata(url);
355 assert_eq!(
356 retrieved.failure_timestamps.len(),
357 metadata.failure_timestamps.len()
358 );
359 }
360
361 #[test]
362 fn test_mark_failed_increments_count() {
363 let store = RpcHealthStore::instance();
364
365 let url = "https://test-increment-count.example.com";
367 let expiration = chrono::Duration::seconds(60);
368 let threshold = 3;
369
370 store.mark_failed(url, threshold, chrono::Duration::seconds(60), expiration);
372 let metadata = store.get_metadata(url);
373 assert_eq!(
374 metadata.failure_timestamps.len(),
375 1,
376 "Should have 1 failure after first mark"
377 );
378 assert!(metadata.paused_until.is_none(), "Should not be paused yet");
379 assert!(
381 !store.is_paused(url, threshold, expiration),
382 "Should not be paused with 1 failure"
383 );
384 let metadata_after = store.get_metadata(url);
386 assert_eq!(
387 metadata_after.failure_timestamps.len(),
388 1,
389 "Should still have 1 failure"
390 );
391
392 store.mark_failed(url, threshold, chrono::Duration::seconds(60), expiration);
394 let metadata = store.get_metadata(url);
395 assert_eq!(
396 metadata.failure_timestamps.len(),
397 2,
398 "Should have 2 failures after second mark"
399 );
400 assert!(metadata.paused_until.is_none(), "Should not be paused yet");
401 assert!(
402 !store.is_paused(url, threshold, expiration),
403 "Should not be paused with 2 failures"
404 );
405 let metadata_after = store.get_metadata(url);
406 assert_eq!(
407 metadata_after.failure_timestamps.len(),
408 2,
409 "Should still have 2 failures"
410 );
411
412 store.mark_failed(url, threshold, chrono::Duration::seconds(60), expiration);
414 let metadata = store.get_metadata(url);
415 assert_eq!(
416 metadata.failure_timestamps.len(),
417 3,
418 "Should have 3 failures after third mark"
419 );
420 assert!(
421 metadata.paused_until.is_some(),
422 "Should be paused after reaching threshold"
423 );
424 assert!(
425 store.is_paused(url, threshold, expiration),
426 "Should be paused"
427 );
428 let metadata_after = store.get_metadata(url);
429 assert_eq!(
430 metadata_after.failure_timestamps.len(),
431 3,
432 "Should still have 3 failures"
433 );
434 assert!(
435 metadata_after.paused_until.is_some(),
436 "Should still be paused"
437 );
438 }
439
440 #[test]
441 fn test_reset_failures() {
442 let store = RpcHealthStore::instance();
443
444 let url = "https://test-reset-failures.example.com";
445 let expiration = chrono::Duration::seconds(60);
446 let threshold = 3;
447
448 store.mark_failed(url, threshold, chrono::Duration::seconds(60), expiration);
450 store.mark_failed(url, threshold, chrono::Duration::seconds(60), expiration);
451 store.mark_failed(url, threshold, chrono::Duration::seconds(60), expiration);
452 assert!(store.is_paused(url, threshold, expiration));
453
454 store.reset_failures(url);
455 assert!(!store.is_paused(url, threshold, expiration));
456 let metadata = store.get_metadata(url);
457 assert_eq!(metadata, RpcConfigMetadata::default());
458 }
459
460 #[test]
461 fn test_is_paused_with_failure_count_below_threshold() {
462 let store = RpcHealthStore::instance();
463
464 let url = "https://test-below-threshold.example.com";
465 let expiration = chrono::Duration::seconds(60);
466 let mut metadata = RpcConfigMetadata::default();
467 metadata.failure_timestamps.push(Utc::now());
468 store.update_metadata(url, metadata);
469
470 assert!(!store.is_paused(url, 3, expiration));
472 }
473
474 #[test]
475 fn test_is_paused_with_time_based_pause() {
476 let store = RpcHealthStore::instance();
477
478 let url = "https://test-time-based-pause.example.com";
479 let expiration = chrono::Duration::seconds(60);
480 let mut metadata = RpcConfigMetadata::default();
481 metadata.failure_timestamps.push(Utc::now());
483 metadata.failure_timestamps.push(Utc::now());
484 metadata.failure_timestamps.push(Utc::now());
485 metadata.paused_until = Some(Utc::now() + chrono::Duration::seconds(60));
486 store.update_metadata(url, metadata);
487
488 assert!(store.is_paused(url, 3, expiration));
489 }
490
491 #[test]
492 fn test_is_paused_expires_after_time() {
493 let store = RpcHealthStore::instance();
494
495 let url = "https://test-expires-after-time.example.com";
496 let expiration = chrono::Duration::seconds(60);
497 let mut metadata = RpcConfigMetadata::default();
498 metadata.paused_until = Some(Utc::now() - chrono::Duration::seconds(60));
500 store.update_metadata(url, metadata);
501
502 assert!(!store.is_paused(url, 3, expiration));
504 }
505
506 #[test]
507 fn test_shared_state_across_instances() {
508 let store1 = RpcHealthStore::instance();
509 let store2 = RpcHealthStore::instance();
510
511 let url = "https://test-shared-state.example.com";
512 let expiration = chrono::Duration::seconds(60);
513 let threshold = 3;
514
515 store1.mark_failed(url, threshold, chrono::Duration::seconds(60), expiration);
517 store1.mark_failed(url, threshold, chrono::Duration::seconds(60), expiration);
518 store1.mark_failed(url, threshold, chrono::Duration::seconds(60), expiration);
519
520 assert!(store1.is_paused(url, threshold, expiration));
522 assert!(store2.is_paused(url, threshold, expiration));
523
524 let metadata1 = store1.get_metadata(url);
525 let metadata2 = store2.get_metadata(url);
526 assert_eq!(
527 metadata1.failure_timestamps.len(),
528 metadata2.failure_timestamps.len()
529 );
530 }
531
532 #[test]
533 fn test_stale_failures_are_expired() {
534 let store = RpcHealthStore::instance();
535
536 let url = "https://test-stale-failures.example.com";
537 let expiration = chrono::Duration::seconds(60);
538
539 let mut metadata = RpcConfigMetadata::default();
541 metadata
542 .failure_timestamps
543 .push(Utc::now() - chrono::Duration::seconds(120)); metadata
545 .failure_timestamps
546 .push(Utc::now() - chrono::Duration::seconds(90)); store.update_metadata(url, metadata);
548
549 assert!(!store.is_paused(url, 3, expiration));
551
552 let metadata = store.get_metadata(url);
554 assert_eq!(metadata.failure_timestamps.len(), 0);
555 }
556
557 #[test]
558 fn test_failure_timestamps_size_limit() {
559 let store = RpcHealthStore::instance();
560
561 let url = "https://test-size-limit.example.com";
562 let expiration = chrono::Duration::seconds(60);
563 let threshold = 3;
564
565 for _ in 0..10 {
567 store.mark_failed(url, threshold, chrono::Duration::seconds(60), expiration);
568 }
569
570 let metadata = store.get_metadata(url);
571 assert!(metadata.failure_timestamps.len() <= (threshold * 2) as usize);
573 }
574
575 #[test]
576 fn test_mixed_stale_and_recent_failures() {
577 let store = RpcHealthStore::instance();
578
579 let url = "https://test-mixed-failures.example.com";
580 let expiration = chrono::Duration::seconds(60);
581 let threshold = 3;
582
583 let mut metadata = RpcConfigMetadata::default();
585 metadata
586 .failure_timestamps
587 .push(Utc::now() - chrono::Duration::seconds(120)); metadata
589 .failure_timestamps
590 .push(Utc::now() - chrono::Duration::seconds(90)); store.update_metadata(url, metadata);
592
593 store.mark_failed(url, threshold, chrono::Duration::seconds(60), expiration);
595 store.mark_failed(url, threshold, chrono::Duration::seconds(60), expiration);
596
597 let metadata = store.get_metadata(url);
598 assert_eq!(metadata.failure_timestamps.len(), 2);
600 assert!(!store.is_paused(url, threshold, expiration)); }
602
603 #[test]
604 fn test_pause_extension_when_already_paused() {
605 let store = RpcHealthStore::instance();
606
607 let url = "https://test-pause-extension.example.com";
609 let expiration = chrono::Duration::seconds(60);
610 let pause_duration = chrono::Duration::seconds(60);
611 let threshold = 3;
612
613 store.mark_failed(url, threshold, pause_duration, expiration);
615 store.mark_failed(url, threshold, pause_duration, expiration);
616 store.mark_failed(url, threshold, pause_duration, expiration);
617
618 let metadata1 = store.get_metadata(url);
620 assert_eq!(
621 metadata1.failure_timestamps.len(),
622 3,
623 "Should have 3 failures"
624 );
625 assert!(
626 metadata1.paused_until.is_some(),
627 "Should be paused after 3 failures"
628 );
629 let initial_paused_until = metadata1.paused_until.unwrap();
630
631 assert!(
633 store.is_paused(url, threshold, expiration),
634 "Should be paused"
635 );
636 let metadata1_after = store.get_metadata(url);
638 assert_eq!(
639 metadata1_after.failure_timestamps.len(),
640 3,
641 "Should still have 3 failures"
642 );
643
644 std::thread::sleep(std::time::Duration::from_millis(10));
646
647 store.mark_failed(url, threshold, pause_duration, expiration);
649
650 let metadata2 = store.get_metadata(url);
651 assert_eq!(
652 metadata2.failure_timestamps.len(),
653 4,
654 "Should have 4 failures now"
655 );
656 assert!(
657 metadata2.paused_until.is_some(),
658 "Should still be paused after 4th failure"
659 );
660 let new_paused_until = metadata2.paused_until.unwrap();
661
662 assert!(
664 new_paused_until > initial_paused_until,
665 "Pause should be extended"
666 );
667 assert!(
668 store.is_paused(url, threshold, expiration),
669 "Should still be paused"
670 );
671 }
672
673 #[test]
674 fn test_stale_failures_removed_during_mark_failed() {
675 let store = RpcHealthStore::instance();
676
677 let url = "https://test-stale-removed.example.com";
678 let expiration = chrono::Duration::seconds(60);
679
680 let mut metadata = RpcConfigMetadata::default();
682 metadata
683 .failure_timestamps
684 .push(Utc::now() - chrono::Duration::seconds(120)); metadata
686 .failure_timestamps
687 .push(Utc::now() - chrono::Duration::seconds(90)); store.update_metadata(url, metadata);
689
690 store.mark_failed(url, 3, chrono::Duration::seconds(60), expiration);
692
693 let metadata = store.get_metadata(url);
694 assert_eq!(metadata.failure_timestamps.len(), 1);
696 let remaining_failure = metadata.failure_timestamps[0];
698 let age = Utc::now() - remaining_failure;
699 assert!(age.num_seconds() < 5); }
701
702 #[test]
703 fn test_pause_expiration_cleans_up_metadata() {
704 let store = RpcHealthStore::instance();
705
706 let url = "https://test-pause-expiration-cleanup.example.com";
707 let expiration = chrono::Duration::seconds(60);
708
709 let mut metadata = RpcConfigMetadata::default();
711 metadata.paused_until = Some(Utc::now() - chrono::Duration::seconds(10)); store.update_metadata(url, metadata);
713
714 assert!(!store.is_paused(url, 3, expiration));
716
717 let metadata_after = store.get_metadata(url);
719 assert_eq!(metadata_after, RpcConfigMetadata::default());
720 }
721
722 #[test]
723 fn test_pause_expiration_keeps_recent_failures() {
724 let store = RpcHealthStore::instance();
725
726 let url = "https://test-pause-expiration.example.com";
728 let expiration = chrono::Duration::seconds(60);
729 let threshold = 3;
730
731 let mut metadata = RpcConfigMetadata::default();
734 metadata
736 .failure_timestamps
737 .push(Utc::now() - chrono::Duration::seconds(30)); metadata
739 .failure_timestamps
740 .push(Utc::now() - chrono::Duration::seconds(25)); metadata
742 .failure_timestamps
743 .push(Utc::now() - chrono::Duration::seconds(20)); metadata.paused_until = Some(Utc::now() - chrono::Duration::seconds(10)); store.update_metadata(url, metadata);
746
747 assert!(
751 !store.is_paused(url, threshold, expiration),
752 "Should not be paused when pause expired"
753 );
754
755 let metadata_after = store.get_metadata(url);
757 assert_eq!(
758 metadata_after.failure_timestamps.len(),
759 3,
760 "Should keep all recent failures"
761 );
762 assert!(
763 metadata_after.paused_until.is_none(),
764 "Pause should be cleared"
765 );
766 }
767
768 #[test]
769 fn test_reset_failures_if_exists_returns_true_when_entry_exists() {
770 let store = RpcHealthStore::instance();
771
772 let url = "https://test-reset-if-exists-true.example.com";
773 let expiration = chrono::Duration::seconds(60);
774 let threshold = 3;
775
776 store.mark_failed(url, threshold, chrono::Duration::seconds(60), expiration);
778 store.mark_failed(url, threshold, chrono::Duration::seconds(60), expiration);
779
780 let result = store.reset_failures_if_exists(url);
782 assert!(result, "Should return true when entry existed");
783
784 let metadata = store.get_metadata(url);
786 assert_eq!(metadata, RpcConfigMetadata::default());
787 }
788
789 #[test]
790 fn test_reset_failures_if_exists_returns_false_when_no_entry() {
791 let store = RpcHealthStore::instance();
792
793 let url = "https://test-reset-if-exists-false.example.com";
795
796 let result = store.reset_failures_if_exists(url);
798 assert!(!result, "Should return false when entry doesn't exist");
799 }
800
801 #[test]
802 fn test_is_paused_fast_path_no_cleanup_needed() {
803 let store = RpcHealthStore::instance();
804
805 let url = "https://test-fast-path.example.com";
808 let expiration = chrono::Duration::seconds(60);
809 let threshold = 3;
810
811 let mut metadata = RpcConfigMetadata::default();
813 metadata.failure_timestamps.push(Utc::now());
814 metadata.failure_timestamps.push(Utc::now());
815 metadata.failure_timestamps.push(Utc::now());
816 metadata.paused_until = Some(Utc::now() + chrono::Duration::seconds(60));
817 store.update_metadata(url, metadata);
818
819 assert!(
821 store.is_paused(url, threshold, expiration),
822 "Should be paused via fast path"
823 );
824
825 let metadata_after = store.get_metadata(url);
827 assert_eq!(
828 metadata_after.failure_timestamps.len(),
829 3,
830 "Should have 3 failures unchanged"
831 );
832 assert!(
833 metadata_after.paused_until.is_some(),
834 "Pause should be unchanged"
835 );
836 }
837
838 #[test]
839 fn test_is_paused_fast_path_below_threshold_returns_false() {
840 let store = RpcHealthStore::instance();
841
842 let url = "https://test-fast-path-below-threshold.example.com";
844 let expiration = chrono::Duration::seconds(60);
845 let threshold = 3;
846
847 let mut metadata = RpcConfigMetadata::default();
849 metadata.failure_timestamps.push(Utc::now());
850 metadata.failure_timestamps.push(Utc::now());
851 store.update_metadata(url, metadata);
853
854 assert!(
856 !store.is_paused(url, threshold, expiration),
857 "Should not be paused - below threshold"
858 );
859 }
860
861 #[test]
862 fn test_is_paused_threshold_reached_but_no_pause_until() {
863 let store = RpcHealthStore::instance();
864
865 let url = "https://test-threshold-no-pause.example.com";
868 let expiration = chrono::Duration::seconds(60);
869 let threshold = 3;
870
871 let mut metadata = RpcConfigMetadata::default();
873 metadata.failure_timestamps.push(Utc::now());
874 metadata.failure_timestamps.push(Utc::now());
875 metadata.failure_timestamps.push(Utc::now());
876 store.update_metadata(url, metadata);
878
879 assert!(
882 !store.is_paused(url, threshold, expiration),
883 "Should not be paused - no paused_until set despite threshold reached"
884 );
885 }
886
887 #[test]
888 fn test_is_paused_cleans_up_empty_entry() {
889 let store = RpcHealthStore::instance();
890
891 let url = "https://test-cleanup-empty.example.com";
893 let expiration = chrono::Duration::seconds(60);
894
895 let metadata = RpcConfigMetadata::default();
897 store.update_metadata(url, metadata);
898
899 assert!(!store.is_paused(url, 3, expiration));
901
902 }
905
906 #[test]
907 fn test_mark_failed_logs_new_pause_vs_extended_pause() {
908 let store = RpcHealthStore::instance();
909
910 let url = "https://test-pause-logging.example.com";
914 let expiration = chrono::Duration::seconds(60);
915 let pause_duration = chrono::Duration::seconds(60);
916 let threshold = 3;
917
918 store.mark_failed(url, threshold, pause_duration, expiration);
920 store.mark_failed(url, threshold, pause_duration, expiration);
921 store.mark_failed(url, threshold, pause_duration, expiration);
922
923 let metadata1 = store.get_metadata(url);
924 assert!(metadata1.paused_until.is_some(), "Should be paused");
925
926 store.mark_failed(url, threshold, pause_duration, expiration);
928
929 let metadata2 = store.get_metadata(url);
930 assert!(
931 metadata2.paused_until.is_some(),
932 "Should still be paused after extension"
933 );
934 }
935}