1use std::collections::HashSet;
13use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
14use std::sync::Arc;
15
16use eyre::Result;
17use parking_lot::RwLock;
18use rand::distr::weighted::WeightedIndex;
19use rand::prelude::*;
20use serde::Serialize;
21use thiserror::Error;
22use tracing::info;
23
24use crate::models::RpcConfig;
25use crate::services::provider::rpc_health_store::RpcHealthStore;
26
27#[derive(Error, Debug, Serialize)]
28pub enum RpcSelectorError {
29 #[error("No providers available")]
30 NoProviders,
31 #[error("Client initialization failed: {0}")]
32 ClientInitializationError(String),
33 #[error("Weighted index error: {0}")]
34 WeightedIndexError(String),
35 #[error("All available providers have failed")]
36 AllProvidersFailed,
37}
38
39#[derive(Debug)]
41pub struct RpcSelector {
42 configs: Arc<RwLock<Vec<RpcConfig>>>,
44 weights_dist: Option<Arc<WeightedIndex<u8>>>,
46 next_index: Arc<AtomicUsize>,
48 current_index: Arc<AtomicUsize>,
50 has_current: Arc<AtomicBool>,
52 failure_threshold: u32,
54 pause_duration_secs: u64,
56 failure_expiration_secs: u64,
58}
59
60impl RpcSelector {
61 pub fn new(
80 configs: Vec<RpcConfig>,
81 failure_threshold: u32,
82 pause_duration_secs: u64,
83 failure_expiration_secs: u64,
84 ) -> Result<Self, RpcSelectorError> {
85 if configs.is_empty() {
86 return Err(RpcSelectorError::NoProviders);
87 }
88
89 let weights_dist = Self::create_weights_distribution(&configs, &HashSet::new());
91
92 let selector = Self {
93 configs: Arc::new(RwLock::new(configs)),
94 weights_dist,
95 next_index: Arc::new(AtomicUsize::new(0)),
96 current_index: Arc::new(AtomicUsize::new(0)),
97 has_current: Arc::new(AtomicBool::new(false)), failure_threshold,
99 pause_duration_secs,
100 failure_expiration_secs,
101 };
102
103 let mut rng = rand::rng();
105 selector.next_index.store(
106 rng.random_range(0..selector.configs.read().len()),
107 Ordering::Relaxed,
108 );
109
110 Ok(selector)
111 }
112
113 pub fn new_with_defaults(configs: Vec<RpcConfig>) -> Result<Self, RpcSelectorError> {
124 Self::new(
125 configs,
126 crate::config::ServerConfig::get_provider_failure_threshold(),
127 crate::config::ServerConfig::get_provider_pause_duration_secs(),
128 crate::config::ServerConfig::get_provider_failure_expiration_secs(),
129 )
130 }
131
132 pub fn provider_count(&self) -> usize {
137 self.configs.read().len()
138 }
139
140 pub fn available_provider_count(&self) -> usize {
145 let health_store = RpcHealthStore::instance();
146 let expiration = chrono::Duration::seconds(self.failure_expiration_secs as i64);
147 self.configs
148 .read()
149 .iter()
150 .filter(|c| !health_store.is_paused(&c.url, self.failure_threshold, expiration))
151 .count()
152 }
153
154 pub fn get_configs(&self) -> Vec<RpcConfig> {
159 self.configs.read().clone()
160 }
161
162 pub fn mark_current_as_failed(&self) {
167 info!("Marking current provider as failed");
168 if self.has_current.load(Ordering::Relaxed) {
170 let current = self.current_index.load(Ordering::Relaxed);
171 let configs = self.configs.read();
172 let config = &configs[current];
173
174 let health_store = RpcHealthStore::instance();
176 use chrono::Duration;
177 health_store.mark_failed(
178 &config.url,
179 self.failure_threshold,
180 Duration::seconds(self.pause_duration_secs as i64),
181 Duration::seconds(self.failure_expiration_secs as i64),
182 );
183
184 self.has_current.store(false, Ordering::Relaxed);
186
187 if configs.len() > 1 {
189 self.next_index.fetch_add(1, Ordering::Relaxed);
190 }
191 }
192 }
193
194 fn create_weights_distribution(
203 configs: &[RpcConfig],
204 excluded_indices: &HashSet<usize>,
205 ) -> Option<Arc<WeightedIndex<u8>>> {
206 let weights: Vec<u8> = configs
208 .iter()
209 .enumerate()
210 .map(|(idx, config)| {
211 if excluded_indices.contains(&idx) {
212 0
213 } else {
214 config.get_weight()
215 }
216 })
217 .collect();
218
219 let available_count = weights.iter().filter(|&&w| w > 0).count();
221 if available_count == 0 {
222 return None;
223 }
224
225 let first_non_zero_weight = weights.iter().find(|&&w| w > 0).copied();
226 if let Some(first_weight) = first_non_zero_weight {
227 let all_equal = weights
229 .iter()
230 .filter(|&&w| w > 0)
231 .all(|&w| w == first_weight);
232
233 if all_equal {
234 return None;
235 }
236 }
237
238 match WeightedIndex::new(&weights) {
240 Ok(dist) => Some(Arc::new(dist)),
241 Err(_) => None,
242 }
243 }
244
245 fn try_weighted_selection(
257 &self,
258 configs: &[RpcConfig],
259 excluded_urls: &std::collections::HashSet<String>,
260 allow_paused: bool,
261 health_store: &RpcHealthStore,
262 expiration: chrono::Duration,
263 ) -> Option<(usize, String)> {
264 let dist = self.weights_dist.as_ref()?;
265 let mut rng = rand::rng();
266
267 const MAX_ATTEMPTS: usize = 10;
268 for _ in 0..MAX_ATTEMPTS {
269 let index = dist.sample(&mut rng);
270 if configs[index].get_weight() == 0 {
272 continue;
273 }
274 if excluded_urls.contains(&configs[index].url) {
276 continue;
277 }
278 if !allow_paused
280 && health_store.is_paused(&configs[index].url, self.failure_threshold, expiration)
281 {
282 continue;
283 }
284 self.current_index.store(index, Ordering::Relaxed);
286 self.has_current.store(true, Ordering::Relaxed);
287 return Some((index, configs[index].url.clone()));
288 }
289 None
290 }
291
292 fn try_round_robin_selection(
305 &self,
306 configs: &[RpcConfig],
307 excluded_urls: &std::collections::HashSet<String>,
308 allow_paused: bool,
309 health_store: &RpcHealthStore,
310 expiration: chrono::Duration,
311 start_index: usize,
312 ) -> Option<(usize, String)> {
313 let len = configs.len();
314 for i in 0..len {
315 let index = (start_index + i) % len;
316 if configs[index].get_weight() == 0 {
318 continue;
319 }
320 if excluded_urls.contains(&configs[index].url) {
322 continue;
323 }
324 if !allow_paused
326 && health_store.is_paused(&configs[index].url, self.failure_threshold, expiration)
327 {
328 continue;
329 }
330 self.next_index.store((index + 1) % len, Ordering::Relaxed);
333 self.current_index.store(index, Ordering::Relaxed);
334 self.has_current.store(true, Ordering::Relaxed);
335 return Some((index, configs[index].url.clone()));
336 }
337 None
338 }
339
340 fn select_url_internal(
349 &self,
350 excluded_urls: &std::collections::HashSet<String>,
351 ) -> Result<String, RpcSelectorError> {
352 let configs = self.configs.read();
353 if configs.is_empty() {
354 return Err(RpcSelectorError::NoProviders);
355 }
356
357 let health_store = RpcHealthStore::instance();
358 let expiration = chrono::Duration::seconds(self.failure_expiration_secs as i64);
359
360 if configs.len() == 1 {
362 if configs[0].get_weight() == 0 {
364 return Err(RpcSelectorError::AllProvidersFailed);
365 }
366 if excluded_urls.contains(&configs[0].url) {
368 return Err(RpcSelectorError::AllProvidersFailed);
369 }
370 self.current_index.store(0, Ordering::Relaxed);
372 self.has_current.store(true, Ordering::Relaxed);
373 return Ok(configs[0].url.clone());
374 }
375
376 if let Some((_, url)) = self.try_weighted_selection(
379 &configs,
380 excluded_urls,
381 false, health_store,
383 expiration,
384 ) {
385 return Ok(url);
386 }
387
388 let start_index = self.next_index.load(Ordering::Relaxed) % configs.len();
390 if let Some((_, url)) = self.try_round_robin_selection(
391 &configs,
392 excluded_urls,
393 false, health_store,
395 expiration,
396 start_index,
397 ) {
398 return Ok(url);
399 }
400
401 tracing::warn!(
404 "No non-paused providers available, falling back to paused providers as last resort"
405 );
406
407 if let Some((_, url)) = self.try_weighted_selection(
409 &configs,
410 excluded_urls,
411 true, health_store,
413 expiration,
414 ) {
415 return Ok(url);
416 }
417
418 if let Some((_, url)) = self.try_round_robin_selection(
420 &configs,
421 excluded_urls,
422 true, health_store,
424 expiration,
425 start_index,
426 ) {
427 return Ok(url);
428 }
429
430 Err(RpcSelectorError::AllProvidersFailed)
432 }
433
434 pub fn get_current_url(&self) -> Result<String, RpcSelectorError> {
439 self.select_url_internal(&std::collections::HashSet::new())
440 }
441
442 pub fn get_next_url(
450 &self,
451 excluded_urls: &std::collections::HashSet<String>,
452 ) -> Result<String, RpcSelectorError> {
453 self.select_url_internal(excluded_urls)
454 }
455
456 #[cfg(test)]
459 pub fn select_url(&self) -> Result<String, RpcSelectorError> {
460 self.select_url_internal(&std::collections::HashSet::new())
461 }
462
463 pub fn get_client<T>(
472 &self,
473 initializer: impl Fn(&str) -> Result<T>,
474 excluded_urls: &std::collections::HashSet<String>,
475 ) -> Result<T, RpcSelectorError> {
476 let url = self.select_url_internal(excluded_urls)?;
477
478 initializer(&url).map_err(|e| RpcSelectorError::ClientInitializationError(e.to_string()))
479 }
480}
481
482impl Clone for RpcSelector {
484 fn clone(&self) -> Self {
485 Self {
486 configs: Arc::new(RwLock::new(self.configs.read().clone())),
487 weights_dist: self.weights_dist.clone(),
488 next_index: Arc::clone(&self.next_index),
489 current_index: Arc::clone(&self.current_index),
490 has_current: Arc::clone(&self.has_current),
491 failure_threshold: self.failure_threshold,
492 pause_duration_secs: self.pause_duration_secs,
493 failure_expiration_secs: self.failure_expiration_secs,
494 }
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501 use crate::services::provider::rpc_health_store::RpcHealthStore;
502 use serial_test::serial;
503 use std::sync::Arc;
504 use std::thread;
505
506 #[test]
507 fn test_create_weights_distribution_single_config() {
508 let configs = vec![RpcConfig {
509 url: "https://example.com/rpc".to_string(),
510 weight: 1,
511 ..Default::default()
512 }];
513
514 let excluded = HashSet::new();
515 let result = RpcSelector::create_weights_distribution(&configs, &excluded);
516 assert!(result.is_none());
517 }
518
519 #[test]
520 fn test_create_weights_distribution_equal_weights() {
521 let configs = vec![
522 RpcConfig {
523 url: "https://example1.com/rpc".to_string(),
524 weight: 5,
525 ..Default::default()
526 },
527 RpcConfig {
528 url: "https://example2.com/rpc".to_string(),
529 weight: 5,
530 ..Default::default()
531 },
532 RpcConfig {
533 url: "https://example3.com/rpc".to_string(),
534 weight: 5,
535 ..Default::default()
536 },
537 ];
538
539 let excluded = HashSet::new();
540 let result = RpcSelector::create_weights_distribution(&configs, &excluded);
541 assert!(result.is_none());
542 }
543
544 #[test]
545 fn test_create_weights_distribution_different_weights() {
546 let configs = vec![
547 RpcConfig {
548 url: "https://example1.com/rpc".to_string(),
549 weight: 1,
550 ..Default::default()
551 },
552 RpcConfig {
553 url: "https://example2.com/rpc".to_string(),
554 weight: 2,
555 ..Default::default()
556 },
557 RpcConfig {
558 url: "https://example3.com/rpc".to_string(),
559 weight: 3,
560 ..Default::default()
561 },
562 ];
563
564 let excluded = HashSet::new();
565 let result = RpcSelector::create_weights_distribution(&configs, &excluded);
566 assert!(result.is_some());
567 }
568
569 #[test]
570 fn test_create_weights_distribution_with_excluded() {
571 let configs = vec![
572 RpcConfig {
573 url: "https://example1.com/rpc".to_string(),
574 weight: 1,
575 ..Default::default()
576 },
577 RpcConfig {
578 url: "https://example2.com/rpc".to_string(),
579 weight: 2,
580 ..Default::default()
581 },
582 RpcConfig {
583 url: "https://example3.com/rpc".to_string(),
584 weight: 3,
585 ..Default::default()
586 },
587 ];
588
589 let mut excluded = HashSet::new();
591 excluded.insert(0);
592
593 let result = RpcSelector::create_weights_distribution(&configs, &excluded);
594 assert!(result.is_some());
595
596 excluded.insert(1);
598 let result = RpcSelector::create_weights_distribution(&configs, &excluded);
599 assert!(result.is_none());
600 }
601
602 #[test]
603 fn test_rpc_selector_new_empty_configs() {
604 let configs: Vec<RpcConfig> = vec![];
605 let result = RpcSelector::new_with_defaults(configs);
606 assert!(result.is_err());
607 assert!(matches!(result.unwrap_err(), RpcSelectorError::NoProviders));
608 }
609
610 #[test]
611 fn test_rpc_selector_new_single_config() {
612 let configs = vec![RpcConfig {
613 url: "https://example.com/rpc".to_string(),
614 weight: 1,
615 ..Default::default()
616 }];
617
618 let result = RpcSelector::new_with_defaults(configs);
619 assert!(result.is_ok());
620 let selector = result.unwrap();
621 assert!(selector.weights_dist.is_none());
622 }
623
624 #[test]
625 fn test_rpc_selector_new_multiple_equal_weights() {
626 let configs = vec![
627 RpcConfig {
628 url: "https://example1.com/rpc".to_string(),
629 weight: 5,
630 ..Default::default()
631 },
632 RpcConfig {
633 url: "https://example2.com/rpc".to_string(),
634 weight: 5,
635 ..Default::default()
636 },
637 ];
638
639 let result = RpcSelector::new_with_defaults(configs);
640 assert!(result.is_ok());
641 let selector = result.unwrap();
642 assert!(selector.weights_dist.is_none());
643 }
644
645 #[test]
646 fn test_rpc_selector_new_multiple_different_weights() {
647 let configs = vec![
648 RpcConfig {
649 url: "https://example1.com/rpc".to_string(),
650 weight: 1,
651 ..Default::default()
652 },
653 RpcConfig {
654 url: "https://example2.com/rpc".to_string(),
655 weight: 3,
656 ..Default::default()
657 },
658 ];
659
660 let result = RpcSelector::new_with_defaults(configs);
661 assert!(result.is_ok());
662 let selector = result.unwrap();
663 assert!(selector.weights_dist.is_some());
664 }
665
666 #[test]
667 fn test_rpc_selector_select_url_single_provider() {
668 let configs = vec![RpcConfig {
669 url: "https://example.com/rpc".to_string(),
670 weight: 1,
671 ..Default::default()
672 }];
673
674 let selector = RpcSelector::new_with_defaults(configs).unwrap();
675 let result = selector.select_url();
676 assert!(result.is_ok());
677 assert_eq!(result.unwrap(), "https://example.com/rpc");
678 assert!(selector.has_current.load(Ordering::Relaxed));
679 }
680
681 #[test]
682 fn test_rpc_selector_select_url_round_robin() {
683 let configs = vec![
684 RpcConfig {
685 url: "https://example1.com/rpc".to_string(),
686 weight: 1,
687 ..Default::default()
688 },
689 RpcConfig {
690 url: "https://example2.com/rpc".to_string(),
691 weight: 1,
692 ..Default::default()
693 },
694 ];
695
696 let selector = RpcSelector::new_with_defaults(configs).unwrap();
697
698 let first_url = selector.select_url().unwrap();
700 let second_url = selector.select_url().unwrap();
702 let third_url = selector.select_url().unwrap();
704
705 assert_ne!(first_url, second_url);
707 assert_eq!(first_url, third_url);
708 }
709
710 #[test]
711 fn test_rpc_selector_get_client_success() {
712 let configs = vec![RpcConfig {
713 url: "https://example.com/rpc".to_string(),
714 weight: 1,
715 ..Default::default()
716 }];
717
718 let selector = RpcSelector::new_with_defaults(configs).unwrap();
719
720 let initializer = |url: &str| -> Result<String> { Ok(url.to_string()) };
722
723 let result = selector.get_client(initializer, &std::collections::HashSet::new());
724 assert!(result.is_ok());
725 assert_eq!(result.unwrap(), "https://example.com/rpc");
726 }
727
728 #[test]
729 fn test_rpc_selector_get_client_failure() {
730 let configs = vec![RpcConfig {
731 url: "https://example.com/rpc".to_string(),
732 weight: 1,
733 ..Default::default()
734 }];
735
736 let selector = RpcSelector::new_with_defaults(configs).unwrap();
737
738 let initializer =
740 |_url: &str| -> Result<String> { Err(eyre::eyre!("Initialization error")) };
741
742 let result = selector.get_client(initializer, &std::collections::HashSet::new());
743 assert!(result.is_err());
744 assert!(matches!(
745 result.unwrap_err(),
746 RpcSelectorError::ClientInitializationError(_)
747 ));
748 }
749
750 #[test]
751 fn test_rpc_selector_clone() {
752 let configs = vec![
753 RpcConfig {
754 url: "https://example1.com/rpc".to_string(),
755 weight: 1,
756 ..Default::default()
757 },
758 RpcConfig {
759 url: "https://example2.com/rpc".to_string(),
760 weight: 3,
761 ..Default::default()
762 },
763 ];
764
765 let selector = RpcSelector::new_with_defaults(configs).unwrap();
766 let cloned = selector.clone();
767
768 assert_eq!(selector.configs.read().len(), cloned.configs.read().len());
770 assert_eq!(selector.configs.read()[0].url, cloned.configs.read()[0].url);
771 assert_eq!(selector.configs.read()[1].url, cloned.configs.read()[1].url);
772
773 assert_eq!(
775 selector.weights_dist.is_some(),
776 cloned.weights_dist.is_some()
777 );
778 }
779
780 #[test]
781 #[serial]
782 fn test_mark_current_as_failed_single_provider() {
783 RpcHealthStore::instance().clear_all();
785
786 let configs = vec![RpcConfig {
789 url: "https://test-single-provider.example.com/rpc".to_string(),
790 weight: 1,
791 ..Default::default()
792 }];
793
794 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
796 let _initial_url = selector.select_url().unwrap();
797
798 selector.mark_current_as_failed();
800
801 assert_eq!(selector.available_provider_count(), 0);
803
804 let next_url = selector.select_url();
806 assert!(next_url.is_ok());
807 assert_eq!(
808 next_url.unwrap(),
809 "https://test-single-provider.example.com/rpc"
810 );
811 }
812
813 #[test]
814 #[serial]
815 fn test_mark_current_as_failed_multiple_providers() {
816 RpcHealthStore::instance().clear_all();
818
819 let configs = vec![
822 RpcConfig {
823 url: "https://test-multi1.example.com/rpc".to_string(),
824 weight: 5,
825 ..Default::default()
826 },
827 RpcConfig {
828 url: "https://test-multi2.example.com/rpc".to_string(),
829 weight: 5,
830 ..Default::default()
831 },
832 RpcConfig {
833 url: "https://test-multi3.example.com/rpc".to_string(),
834 weight: 5,
835 ..Default::default()
836 },
837 ];
838
839 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
841
842 let url1 = selector.select_url().unwrap().to_string();
844
845 selector.mark_current_as_failed();
847 assert_eq!(selector.available_provider_count(), 2);
849
850 let url2 = selector.select_url().unwrap().to_string();
852 assert_ne!(url1, url2);
854
855 selector.mark_current_as_failed();
857 assert_eq!(selector.available_provider_count(), 1);
858
859 let url3 = selector.select_url().unwrap().to_string();
860 assert_ne!(url1, url3);
862 assert_ne!(url2, url3);
863
864 selector.mark_current_as_failed();
866 assert_eq!(selector.available_provider_count(), 0);
867
868 let url4 = selector.select_url();
870 assert!(url4.is_ok());
871 let url4_str = url4.unwrap();
873 assert!(
874 url4_str == "https://test-multi1.example.com/rpc"
875 || url4_str == "https://test-multi2.example.com/rpc"
876 || url4_str == "https://test-multi3.example.com/rpc"
877 );
878 }
879
880 #[test]
881 #[serial]
882 fn test_mark_current_as_failed_weighted() {
883 RpcHealthStore::instance().clear_all();
885
886 let configs = vec![
888 RpcConfig {
889 url: "https://test-weighted1.example.com/rpc".to_string(),
890 weight: 1, ..Default::default()
892 },
893 RpcConfig {
894 url: "https://test-weighted2.example.com/rpc".to_string(),
895 weight: 10, ..Default::default()
897 },
898 ];
899
900 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
902 assert!(selector.weights_dist.is_some()); let url1 = selector.select_url().unwrap().to_string();
906
907 selector.mark_current_as_failed();
909 assert_eq!(selector.available_provider_count(), 1);
910
911 let url2 = selector.select_url().unwrap().to_string();
913 assert_ne!(url1, url2);
914
915 selector.mark_current_as_failed();
917 assert_eq!(selector.available_provider_count(), 0);
918
919 let url3 = selector.select_url();
921 assert!(url3.is_ok());
922 let url3_str = url3.unwrap();
923 assert!(
924 url3_str == "https://test-weighted1.example.com/rpc"
925 || url3_str == "https://test-weighted2.example.com/rpc"
926 );
927 }
928
929 #[test]
930 fn test_provider_count() {
931 let configs: Vec<RpcConfig> = vec![];
933 let result = RpcSelector::new_with_defaults(configs);
934 assert!(result.is_err());
935
936 let configs = vec![RpcConfig {
938 url: "https://example.com/rpc".to_string(),
939 weight: 1,
940 ..Default::default()
941 }];
942 let selector = RpcSelector::new_with_defaults(configs).unwrap();
943 assert_eq!(selector.provider_count(), 1);
944
945 let configs = vec![
947 RpcConfig {
948 url: "https://example1.com/rpc".to_string(),
949 weight: 1,
950 ..Default::default()
951 },
952 RpcConfig {
953 url: "https://example2.com/rpc".to_string(),
954 weight: 2,
955 ..Default::default()
956 },
957 RpcConfig {
958 url: "https://example3.com/rpc".to_string(),
959 weight: 3,
960 ..Default::default()
961 },
962 ];
963 let selector = RpcSelector::new_with_defaults(configs).unwrap();
964 assert_eq!(selector.provider_count(), 3);
965 }
966
967 #[test]
968 #[serial]
969 fn test_available_provider_count() {
970 RpcHealthStore::instance().clear_all();
972
973 let configs = vec![
974 RpcConfig {
975 url: "https://test-available1.example.com/rpc".to_string(),
976 weight: 1,
977 ..Default::default()
978 },
979 RpcConfig {
980 url: "https://test-available2.example.com/rpc".to_string(),
981 weight: 2,
982 ..Default::default()
983 },
984 RpcConfig {
985 url: "https://test-available3.example.com/rpc".to_string(),
986 weight: 3,
987 ..Default::default()
988 },
989 ];
990
991 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
993 assert_eq!(selector.provider_count(), 3);
994 assert_eq!(selector.available_provider_count(), 3);
995
996 selector.select_url().unwrap(); selector.mark_current_as_failed();
999 assert_eq!(selector.available_provider_count(), 2);
1001
1002 selector.select_url().unwrap(); selector.mark_current_as_failed();
1005 assert_eq!(selector.available_provider_count(), 1);
1006 }
1007
1008 #[test]
1009 fn test_get_current_url() {
1010 let configs = vec![
1011 RpcConfig::new("https://example1.com/rpc".to_string()),
1012 RpcConfig::new("https://example2.com/rpc".to_string()),
1013 ];
1014
1015 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1016
1017 let url = selector.get_current_url();
1019 assert!(url.is_ok());
1020 let url_str = url.unwrap();
1021 assert!(
1022 url_str == "https://example1.com/rpc" || url_str == "https://example2.com/rpc",
1023 "Unexpected URL: {url_str}"
1024 );
1025 }
1026
1027 #[test]
1028 #[serial]
1029 fn test_concurrent_usage() {
1030 RpcHealthStore::instance().clear_all();
1032
1033 let configs = vec![
1035 RpcConfig::new("https://test-concurrent1.example.com/rpc".to_string()),
1036 RpcConfig::new("https://test-concurrent2.example.com/rpc".to_string()),
1037 RpcConfig::new("https://test-concurrent3.example.com/rpc".to_string()),
1038 ];
1039
1040 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1042 let selector_arc = Arc::new(selector);
1043
1044 let mut handles = Vec::with_capacity(10);
1045
1046 for _ in 0..10 {
1048 let selector_clone = Arc::clone(&selector_arc);
1049 let handle = thread::spawn(move || {
1050 let url = selector_clone.select_url().unwrap().to_string();
1051 if url.contains("test-concurrent1") {
1052 selector_clone.mark_current_as_failed();
1054 }
1055 url
1056 });
1057 handles.push(handle);
1058 }
1059
1060 let mut urls = Vec::new();
1062 for handle in handles {
1063 urls.push(handle.join().unwrap());
1064 }
1065
1066 let unique_urls: std::collections::HashSet<String> = urls.into_iter().collect();
1068 assert!(unique_urls.len() > 1, "Expected multiple unique URLs");
1069
1070 let mut found_non_example1 = false;
1073 for _ in 0..10 {
1074 let url = selector_arc.select_url().unwrap().to_string();
1075 if !url.contains("test-concurrent1") {
1076 found_non_example1 = true;
1077 }
1078 }
1079
1080 assert!(found_non_example1, "Should prefer non-paused providers");
1081 }
1082
1083 #[test]
1084 fn test_consecutive_mark_as_failed() {
1085 let configs = vec![
1086 RpcConfig::new("https://example1.com/rpc".to_string()),
1087 RpcConfig::new("https://example2.com/rpc".to_string()),
1088 ];
1089
1090 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1091
1092 selector.select_url().unwrap();
1094
1095 selector.mark_current_as_failed();
1097 selector.mark_current_as_failed(); let result = selector.select_url();
1101 assert!(result.is_ok());
1102 }
1103
1104 #[test]
1105 #[serial]
1106 fn test_weighted_to_round_robin_fallback() {
1107 RpcHealthStore::instance().clear_all();
1109
1110 let configs = vec![
1111 RpcConfig {
1112 url: "https://test-wrr1.example.com/rpc".to_string(),
1113 weight: 10, ..Default::default()
1115 },
1116 RpcConfig {
1117 url: "https://test-wrr2.example.com/rpc".to_string(),
1118 weight: 1, ..Default::default()
1120 },
1121 RpcConfig {
1122 url: "https://test-wrr3.example.com/rpc".to_string(),
1123 weight: 1, ..Default::default()
1125 },
1126 ];
1127
1128 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1130 assert!(selector.weights_dist.is_some()); let mut selected_first = false;
1135
1136 for _ in 0..10 {
1138 let url = selector.select_url().unwrap();
1139 if url.contains("test-wrr1") {
1140 selected_first = true;
1141 selector.mark_current_as_failed();
1143 break;
1144 }
1145 }
1146
1147 assert!(
1148 selected_first,
1149 "High-weight provider should have been selected"
1150 );
1151
1152 let mut seen_urls = HashSet::new();
1154 for _ in 0..10 {
1155 let url = selector.select_url().unwrap().to_string();
1156 seen_urls.insert(url);
1157 }
1158
1159 assert!(seen_urls.len() >= 2);
1161 assert!(
1162 !seen_urls.iter().any(|url| url.contains("test-wrr1")),
1163 "Paused provider should not be selected (prefer non-paused)"
1164 );
1165 }
1166
1167 #[test]
1168 fn test_zero_weight_providers() {
1169 let configs = vec![
1170 RpcConfig {
1171 url: "https://example1.com/rpc".to_string(),
1172 weight: 0, ..Default::default()
1174 },
1175 RpcConfig {
1176 url: "https://example2.com/rpc".to_string(),
1177 weight: 5, ..Default::default()
1179 },
1180 ];
1181
1182 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1183
1184 let mut seen_urls = HashSet::new();
1186 for _ in 0..10 {
1187 let url = selector.select_url().unwrap().to_string();
1188 seen_urls.insert(url);
1189 }
1190
1191 assert_eq!(seen_urls.len(), 1);
1192 assert!(
1193 seen_urls.iter().next().unwrap().contains("example2"),
1194 "Only the non-zero weight provider should be selected"
1195 );
1196 }
1197
1198 #[test]
1199 #[serial]
1200 fn test_extreme_weight_differences() {
1201 let configs = vec![
1202 RpcConfig {
1203 url: "https://example1.com/rpc".to_string(),
1204 weight: 100, ..Default::default()
1206 },
1207 RpcConfig {
1208 url: "https://example2.com/rpc".to_string(),
1209 weight: 1, ..Default::default()
1211 },
1212 ];
1213
1214 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1215
1216 let mut count_high = 0;
1218
1219 for _ in 0..100 {
1220 let url = selector.select_url().unwrap().to_string();
1221 if url.contains("example1") {
1222 count_high += 1;
1223 }
1224
1225 selector.has_current.store(false, Ordering::Relaxed);
1227 }
1228
1229 assert!(
1231 count_high > 90,
1232 "High-weight provider selected only {}/{} times",
1233 count_high,
1234 100
1235 );
1236 }
1237
1238 #[test]
1239 fn test_mark_unselected_as_failed() {
1240 let configs = vec![
1241 RpcConfig::new("https://example1.com/rpc".to_string()),
1242 RpcConfig::new("https://example2.com/rpc".to_string()),
1243 ];
1244
1245 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1246
1247 selector.mark_current_as_failed();
1249
1250 let mut seen_urls = HashSet::new();
1252 for _ in 0..10 {
1253 let url = selector.select_url().unwrap().to_string();
1254 seen_urls.insert(url);
1255
1256 selector.has_current.store(false, Ordering::Relaxed);
1258 }
1259
1260 assert_eq!(
1261 seen_urls.len(),
1262 2,
1263 "Both providers should still be available"
1264 );
1265 }
1266
1267 #[test]
1268 fn test_rpc_selector_error_serialization() {
1269 let error = RpcSelectorError::NoProviders;
1270 let json = serde_json::to_string(&error).unwrap();
1271 assert!(json.contains("NoProviders"));
1272
1273 let error = RpcSelectorError::ClientInitializationError("test error".to_string());
1274 let json = serde_json::to_string(&error).unwrap();
1275 assert!(json.contains("ClientInitializationError"));
1276 assert!(json.contains("test error"));
1277
1278 let error = RpcSelectorError::WeightedIndexError("index error".to_string());
1279 let json = serde_json::to_string(&error).unwrap();
1280 assert!(json.contains("WeightedIndexError"));
1281 assert!(json.contains("index error"));
1282
1283 let error = RpcSelectorError::AllProvidersFailed;
1284 let json = serde_json::to_string(&error).unwrap();
1285 assert!(json.contains("AllProvidersFailed"));
1286 }
1287
1288 #[cfg(test)]
1289 mod rate_limiting_tests {
1290 use super::*;
1291 use crate::services::provider::rpc_health_store::RpcHealthStore;
1292
1293 #[test]
1301 #[serial]
1302 fn test_rpc_selector_switches_on_rate_limit() {
1303 RpcHealthStore::instance().clear_all();
1304 let configs = vec![
1305 RpcConfig {
1306 url: "https://test-rate-limit1.example.com".to_string(),
1307 weight: 100,
1308 ..Default::default()
1309 },
1310 RpcConfig {
1311 url: "https://test-rate-limit2.example.com".to_string(),
1312 weight: 100,
1313 ..Default::default()
1314 },
1315 ];
1316
1317 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1319
1320 assert_eq!(selector.available_provider_count(), 2);
1322
1323 let first_url = selector.select_url().unwrap();
1325
1326 assert!(
1328 first_url == "https://test-rate-limit1.example.com"
1329 || first_url == "https://test-rate-limit2.example.com"
1330 );
1331
1332 selector.mark_current_as_failed();
1335
1336 assert_eq!(selector.available_provider_count(), 1);
1338
1339 let second_url = selector.select_url().unwrap();
1341
1342 assert_ne!(first_url, second_url);
1344
1345 let third_url = selector.select_url().unwrap();
1347 assert_eq!(second_url, third_url); let mut selected_urls = std::collections::HashSet::new();
1351 for _ in 0..10 {
1352 let url = selector.select_url().unwrap();
1353 selected_urls.insert(url.to_string());
1354 }
1355
1356 assert_eq!(selected_urls.len(), 1);
1358 assert!(!selected_urls.contains(&first_url.to_string()));
1359 assert!(selected_urls.contains(&second_url.to_string()));
1360 }
1361
1362 #[test]
1367 #[serial]
1368 fn test_rpc_selector_rate_limit_with_weighted_selection() {
1369 RpcHealthStore::instance().clear_all();
1370 let configs = vec![
1371 RpcConfig {
1372 url: "https://test-weighted-rl1.example.com".to_string(),
1373 weight: 80, ..Default::default()
1375 },
1376 RpcConfig {
1377 url: "https://test-weighted-rl2.example.com".to_string(),
1378 weight: 20, ..Default::default()
1380 },
1381 ];
1382
1383 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1385
1386 let mut rpc1_count = 0;
1388 let mut rpc2_count = 0;
1389
1390 for _ in 0..20 {
1391 let url = selector.select_url().unwrap();
1392 if url == "https://test-weighted-rl1.example.com" {
1393 rpc1_count += 1;
1394 } else {
1395 rpc2_count += 1;
1396 }
1397 }
1398
1399 assert!(rpc1_count > rpc2_count);
1401
1402 let mut selected_rpc1 = false;
1405 for _ in 0..10 {
1406 let url = selector.select_url().unwrap();
1407 if url == "https://test-weighted-rl1.example.com" {
1408 selector.mark_current_as_failed();
1409 selected_rpc1 = true;
1410 break;
1411 }
1412 }
1413 assert!(selected_rpc1, "Should have selected rpc1 at least once");
1414
1415 for _ in 0..20 {
1417 let url = selector.select_url().unwrap();
1418 assert_eq!(url, "https://test-weighted-rl2.example.com");
1419 }
1420
1421 assert_eq!(selector.available_provider_count(), 1);
1423 }
1424
1425 #[test]
1430 #[serial]
1431 fn test_rpc_selector_rate_limit_recovery() {
1432 RpcHealthStore::instance().clear_all();
1433 let configs = vec![
1434 RpcConfig {
1435 url: "https://test-recovery1.example.com".to_string(),
1436 weight: 100,
1437 ..Default::default()
1438 },
1439 RpcConfig {
1440 url: "https://test-recovery2.example.com".to_string(),
1441 weight: 100,
1442 ..Default::default()
1443 },
1444 ];
1445
1446 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1448
1449 let first_url = selector.select_url().unwrap();
1451
1452 selector.mark_current_as_failed();
1454 assert_eq!(selector.available_provider_count(), 1);
1455
1456 let second_url = selector.select_url().unwrap();
1458 assert_ne!(first_url, second_url);
1459
1460 for _ in 0..10 {
1462 let url = selector.select_url().unwrap();
1463 assert_eq!(url, second_url);
1464 }
1465
1466 assert_eq!(selector.available_provider_count(), 1);
1468 }
1469
1470 #[test]
1472 #[serial]
1473 fn test_rpc_selector_both_providers_rate_limited() {
1474 RpcHealthStore::instance().clear_all();
1475 let configs = vec![
1476 RpcConfig {
1477 url: "https://test-both-rl1.example.com".to_string(),
1478 weight: 100,
1479 ..Default::default()
1480 },
1481 RpcConfig {
1482 url: "https://test-both-rl2.example.com".to_string(),
1483 weight: 100,
1484 ..Default::default()
1485 },
1486 ];
1487
1488 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1490
1491 selector.select_url().unwrap();
1493 selector.mark_current_as_failed();
1494 assert_eq!(selector.available_provider_count(), 1);
1495
1496 selector.select_url().unwrap();
1498 selector.mark_current_as_failed();
1499 assert_eq!(selector.available_provider_count(), 0);
1500
1501 let result = selector.select_url();
1503 assert!(result.is_ok());
1504 let url = result.unwrap();
1505 assert!(
1506 url == "https://test-both-rl1.example.com"
1507 || url == "https://test-both-rl2.example.com"
1508 );
1509 }
1510
1511 #[test]
1516 #[serial]
1517 fn test_rpc_selector_rate_limit_round_robin_fallback() {
1518 RpcHealthStore::instance().clear_all();
1519 let configs = vec![
1520 RpcConfig {
1521 url: "https://test-rr-fallback1.example.com".to_string(),
1522 weight: 100,
1523 ..Default::default()
1524 },
1525 RpcConfig {
1526 url: "https://test-rr-fallback2.example.com".to_string(),
1527 weight: 100,
1528 ..Default::default()
1529 },
1530 RpcConfig {
1531 url: "https://test-rr-fallback3.example.com".to_string(),
1532 weight: 100,
1533 ..Default::default()
1534 },
1535 ];
1536
1537 let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1539
1540 selector.select_url().unwrap();
1542 let first_url = selector.get_current_url().unwrap();
1543
1544 if first_url == "https://test-rr-fallback1.example.com" {
1546 selector.mark_current_as_failed();
1547 } else {
1548 loop {
1550 let url = selector.select_url().unwrap();
1551 if url == "https://test-rr-fallback1.example.com" {
1552 selector.mark_current_as_failed();
1553 break;
1554 }
1555 }
1556 }
1557
1558 let mut selected_urls = std::collections::HashSet::new();
1560 for _ in 0..20 {
1561 let url = selector.select_url().unwrap();
1562 selected_urls.insert(url.to_string());
1563 assert_ne!(url, "https://test-rr-fallback1.example.com");
1565 }
1566
1567 assert!(selected_urls.contains("https://test-rr-fallback2.example.com"));
1569 assert!(selected_urls.contains("https://test-rr-fallback3.example.com"));
1570 assert_eq!(selected_urls.len(), 2);
1571 }
1572
1573 #[test]
1574 #[serial]
1575 fn test_select_url_excludes_tried_providers() {
1576 RpcHealthStore::instance().clear_all();
1577 let configs = vec![
1578 RpcConfig {
1579 url: "https://provider1.com".to_string(),
1580 weight: 1,
1581 ..Default::default()
1582 },
1583 RpcConfig {
1584 url: "https://provider2.com".to_string(),
1585 weight: 1,
1586 ..Default::default()
1587 },
1588 RpcConfig {
1589 url: "https://provider3.com".to_string(),
1590 weight: 1,
1591 ..Default::default()
1592 },
1593 ];
1594
1595 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1596
1597 let mut excluded = std::collections::HashSet::new();
1599 excluded.insert("https://provider1.com".to_string());
1600
1601 for _ in 0..10 {
1603 let url = selector.get_next_url(&excluded).unwrap();
1604 assert_ne!(url, "https://provider1.com");
1605 }
1606 }
1607
1608 #[test]
1609 #[serial]
1610 fn test_select_url_fallback_to_paused_providers() {
1611 RpcHealthStore::instance().clear_all();
1612 let configs = vec![
1613 RpcConfig {
1614 url: "https://provider1.com".to_string(),
1615 weight: 1,
1616 ..Default::default()
1617 },
1618 RpcConfig {
1619 url: "https://provider2.com".to_string(),
1620 weight: 1,
1621 ..Default::default()
1622 },
1623 ];
1624
1625 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1626 let health_store = RpcHealthStore::instance();
1627 let expiration = chrono::Duration::seconds(60);
1628
1629 health_store.mark_failed(
1631 "https://provider1.com",
1632 3,
1633 chrono::Duration::seconds(60),
1634 expiration,
1635 );
1636 health_store.mark_failed(
1637 "https://provider1.com",
1638 3,
1639 chrono::Duration::seconds(60),
1640 expiration,
1641 );
1642 health_store.mark_failed(
1643 "https://provider1.com",
1644 3,
1645 chrono::Duration::seconds(60),
1646 expiration,
1647 );
1648
1649 health_store.mark_failed(
1650 "https://provider2.com",
1651 3,
1652 chrono::Duration::seconds(60),
1653 expiration,
1654 );
1655 health_store.mark_failed(
1656 "https://provider2.com",
1657 3,
1658 chrono::Duration::seconds(60),
1659 expiration,
1660 );
1661 health_store.mark_failed(
1662 "https://provider2.com",
1663 3,
1664 chrono::Duration::seconds(60),
1665 expiration,
1666 );
1667
1668 assert!(health_store.is_paused("https://provider1.com", 3, expiration));
1670 assert!(health_store.is_paused("https://provider2.com", 3, expiration));
1671
1672 let url = selector
1674 .get_next_url(&std::collections::HashSet::new())
1675 .unwrap();
1676 assert!(url == "https://provider1.com" || url == "https://provider2.com");
1677 }
1678
1679 #[test]
1680 #[serial]
1681 fn test_select_url_single_provider_excluded() {
1682 RpcHealthStore::instance().clear_all();
1683 let configs = vec![RpcConfig {
1684 url: "https://single-provider.com".to_string(),
1685 weight: 1,
1686 ..Default::default()
1687 }];
1688
1689 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1690
1691 let mut excluded = std::collections::HashSet::new();
1693 excluded.insert("https://single-provider.com".to_string());
1694
1695 let result = selector.get_next_url(&excluded);
1697 assert!(result.is_err());
1698 assert!(matches!(
1699 result.unwrap_err(),
1700 RpcSelectorError::AllProvidersFailed
1701 ));
1702 }
1703
1704 #[test]
1705 #[serial]
1706 fn test_select_url_all_providers_excluded() {
1707 RpcHealthStore::instance().clear_all();
1708 let configs = vec![
1709 RpcConfig {
1710 url: "https://provider1.com".to_string(),
1711 weight: 1,
1712 ..Default::default()
1713 },
1714 RpcConfig {
1715 url: "https://provider2.com".to_string(),
1716 weight: 1,
1717 ..Default::default()
1718 },
1719 ];
1720
1721 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1722
1723 let mut excluded = std::collections::HashSet::new();
1725 excluded.insert("https://provider1.com".to_string());
1726 excluded.insert("https://provider2.com".to_string());
1727
1728 let result = selector.get_next_url(&excluded);
1730 assert!(result.is_err());
1731 assert!(matches!(
1732 result.unwrap_err(),
1733 RpcSelectorError::AllProvidersFailed
1734 ));
1735 }
1736
1737 #[test]
1738 #[serial]
1739 fn test_select_url_excluded_providers_with_weighted_selection() {
1740 RpcHealthStore::instance().clear_all();
1741 let configs = vec![
1742 RpcConfig {
1743 url: "https://provider1.com".to_string(),
1744 weight: 10,
1745 ..Default::default()
1746 },
1747 RpcConfig {
1748 url: "https://provider2.com".to_string(),
1749 weight: 1,
1750 ..Default::default()
1751 },
1752 RpcConfig {
1753 url: "https://provider3.com".to_string(),
1754 weight: 1,
1755 ..Default::default()
1756 },
1757 ];
1758
1759 let selector = RpcSelector::new_with_defaults(configs).unwrap();
1760
1761 let mut excluded = std::collections::HashSet::new();
1763 excluded.insert("https://provider1.com".to_string());
1764
1765 for _ in 0..20 {
1767 let url = selector.get_next_url(&excluded).unwrap();
1768 assert_ne!(url, "https://provider1.com");
1769 }
1770 }
1771 }
1772}