openzeppelin_relayer/services/provider/
rpc_selector.rs

1//! # RPC Provider Selector
2//!
3//! This module provides functionality for dynamically selecting RPC endpoints based on configured priorities,
4//! health status, and selection strategies.
5//!
6//! ## Features
7//!
8//! - **Weighted selection**: Providers can be assigned different weights to control selection probability
9//! - **Round-robin fallback**: If weighted selection fails or weights are equal, round-robin is used
10//! - **Health tracking**: Failed providers are temporarily excluded from selection
11//! - **Automatic recovery**: Failed providers are automatically recovered after a configurable period
12use std::collections::HashSet;
13use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
14use std::sync::Arc;
15
16use eyre::Result;
17use parking_lot::RwLock;
18use rand::distr::weighted::WeightedIndex;
19use rand::prelude::*;
20use serde::Serialize;
21use thiserror::Error;
22use tracing::info;
23
24use crate::models::RpcConfig;
25use crate::services::provider::rpc_health_store::RpcHealthStore;
26
27#[derive(Error, Debug, Serialize)]
28pub enum RpcSelectorError {
29    #[error("No providers available")]
30    NoProviders,
31    #[error("Client initialization failed: {0}")]
32    ClientInitializationError(String),
33    #[error("Weighted index error: {0}")]
34    WeightedIndexError(String),
35    #[error("All available providers have failed")]
36    AllProvidersFailed,
37}
38
39/// Manages selection of RPC endpoints based on configuration.
40#[derive(Debug)]
41pub struct RpcSelector {
42    /// RPC configurations
43    configs: Arc<RwLock<Vec<RpcConfig>>>,
44    /// Pre-computed weighted distribution for faster provider selection
45    weights_dist: Option<Arc<WeightedIndex<u8>>>,
46    /// Counter for round-robin selection as a fallback or for equal weights
47    next_index: Arc<AtomicUsize>,
48    /// Currently selected provider index
49    current_index: Arc<AtomicUsize>,
50    /// Flag indicating whether a current provider is valid
51    has_current: Arc<AtomicBool>,
52    /// Number of consecutive failures before pausing a provider
53    failure_threshold: u32,
54    /// Duration in seconds to pause a provider after reaching failure threshold
55    pause_duration_secs: u64,
56    /// Duration in seconds after which failures are considered stale and reset
57    failure_expiration_secs: u64,
58}
59
60impl RpcSelector {
61    /// Creates a new RpcSelector instance.
62    ///
63    /// # Arguments
64    /// * `configs` - RPC configurations
65    /// * `failure_threshold` - Number of consecutive failures before pausing a provider.
66    ///   Defaults to [`DEFAULT_PROVIDER_FAILURE_THRESHOLD`] if not provided via env var `PROVIDER_FAILURE_THRESHOLD`.
67    /// * `pause_duration_secs` - Duration in seconds to pause a provider after reaching failure threshold.
68    ///   Defaults to [`DEFAULT_PROVIDER_PAUSE_DURATION_SECS`] if not provided via env var `PROVIDER_PAUSE_DURATION_SECS`.
69    /// * `failure_expiration_secs` - Duration in seconds after which failures are considered stale and reset.
70    ///   Defaults to [`DEFAULT_PROVIDER_FAILURE_EXPIRATION_SECS`] (60 seconds).
71    ///
72    /// # Returns
73    /// * `Result<Self>` - A new selector instance or an error
74    ///
75    /// # Note
76    /// These values are typically loaded from `ServerConfig::from_env()` which reads from environment variables:
77    /// - `PROVIDER_FAILURE_THRESHOLD` (default: 3, legacy: `RPC_FAILURE_THRESHOLD`)
78    /// - `PROVIDER_PAUSE_DURATION_SECS` (default: 60, legacy: `RPC_PAUSE_DURATION_SECS`)
79    pub fn new(
80        configs: Vec<RpcConfig>,
81        failure_threshold: u32,
82        pause_duration_secs: u64,
83        failure_expiration_secs: u64,
84    ) -> Result<Self, RpcSelectorError> {
85        if configs.is_empty() {
86            return Err(RpcSelectorError::NoProviders);
87        }
88
89        // Create the weights distribution based on provided weights
90        let weights_dist = Self::create_weights_distribution(&configs, &HashSet::new());
91
92        let selector = Self {
93            configs: Arc::new(RwLock::new(configs)),
94            weights_dist,
95            next_index: Arc::new(AtomicUsize::new(0)),
96            current_index: Arc::new(AtomicUsize::new(0)),
97            has_current: Arc::new(AtomicBool::new(false)), // Initially no current provider
98            failure_threshold,
99            pause_duration_secs,
100            failure_expiration_secs,
101        };
102
103        // Randomize the starting index to avoid always starting with the same provider
104        let mut rng = rand::rng();
105        selector.next_index.store(
106            rng.random_range(0..selector.configs.read().len()),
107            Ordering::Relaxed,
108        );
109
110        Ok(selector)
111    }
112
113    /// Creates a new RpcSelector instance with default failure threshold and pause duration.
114    ///
115    /// This is a convenience method primarily for testing. In production code, use `new()` with
116    /// values from `ServerConfig::from_env()`.
117    ///
118    /// # Arguments
119    /// * `configs` - RPC configurations
120    ///
121    /// # Returns
122    /// * `Result<Self>` - A new selector instance or an error
123    pub fn new_with_defaults(configs: Vec<RpcConfig>) -> Result<Self, RpcSelectorError> {
124        Self::new(
125            configs,
126            crate::config::ServerConfig::get_provider_failure_threshold(),
127            crate::config::ServerConfig::get_provider_pause_duration_secs(),
128            crate::config::ServerConfig::get_provider_failure_expiration_secs(),
129        )
130    }
131
132    /// Gets the number of available providers
133    ///
134    /// # Returns
135    /// * `usize` - The number of providers in the selector
136    pub fn provider_count(&self) -> usize {
137        self.configs.read().len()
138    }
139
140    /// Gets the number of available (non-paused) providers
141    ///
142    /// # Returns
143    /// * `usize` - The number of non-paused providers
144    pub fn available_provider_count(&self) -> usize {
145        let health_store = RpcHealthStore::instance();
146        let expiration = chrono::Duration::seconds(self.failure_expiration_secs as i64);
147        self.configs
148            .read()
149            .iter()
150            .filter(|c| !health_store.is_paused(&c.url, self.failure_threshold, expiration))
151            .count()
152    }
153
154    /// Gets the current RPC configurations.
155    ///
156    /// # Returns
157    /// * `Vec<RpcConfig>` - The current configurations
158    pub fn get_configs(&self) -> Vec<RpcConfig> {
159        self.configs.read().clone()
160    }
161
162    /// Marks the current endpoint as failed and forces selection of a different endpoint.
163    ///
164    /// This method is used when a provider consistently fails, and we want to try a different one.
165    /// It adds the current provider to the failed providers set and will avoid selecting it again.
166    pub fn mark_current_as_failed(&self) {
167        info!("Marking current provider as failed");
168        // Only proceed if we have a current provider
169        if self.has_current.load(Ordering::Relaxed) {
170            let current = self.current_index.load(Ordering::Relaxed);
171            let configs = self.configs.read();
172            let config = &configs[current];
173
174            // Mark this provider as failed in the health store
175            let health_store = RpcHealthStore::instance();
176            use chrono::Duration;
177            health_store.mark_failed(
178                &config.url,
179                self.failure_threshold,
180                Duration::seconds(self.pause_duration_secs as i64),
181                Duration::seconds(self.failure_expiration_secs as i64),
182            );
183
184            // Clear the current provider
185            self.has_current.store(false, Ordering::Relaxed);
186
187            // Move round-robin index forward to avoid selecting the same provider again
188            if configs.len() > 1 {
189                self.next_index.fetch_add(1, Ordering::Relaxed);
190            }
191        }
192    }
193
194    /// Creates a weighted distribution for selecting RPC endpoints based on their weights.
195    ///
196    /// # Arguments
197    /// * `configs` - A slice of RPC configurations with weights
198    /// * `excluded_indices` - A set of indices to exclude from the distribution
199    ///
200    /// # Returns
201    /// * `Option<Arc<WeightedIndex<u8>>>` - A weighted distribution if configs have different weights, None otherwise
202    fn create_weights_distribution(
203        configs: &[RpcConfig],
204        excluded_indices: &HashSet<usize>,
205    ) -> Option<Arc<WeightedIndex<u8>>> {
206        // Collect weights, using 0 for excluded providers
207        let weights: Vec<u8> = configs
208            .iter()
209            .enumerate()
210            .map(|(idx, config)| {
211                if excluded_indices.contains(&idx) {
212                    0
213                } else {
214                    config.get_weight()
215                }
216            })
217            .collect();
218
219        // Count available providers with non-zero weight
220        let available_count = weights.iter().filter(|&&w| w > 0).count();
221        if available_count == 0 {
222            return None;
223        }
224
225        let first_non_zero_weight = weights.iter().find(|&&w| w > 0).copied();
226        if let Some(first_weight) = first_non_zero_weight {
227            // First check for the original equal weights case
228            let all_equal = weights
229                .iter()
230                .filter(|&&w| w > 0)
231                .all(|&w| w == first_weight);
232
233            if all_equal {
234                return None;
235            }
236        }
237
238        // Create weighted distribution
239        match WeightedIndex::new(&weights) {
240            Ok(dist) => Some(Arc::new(dist)),
241            Err(_) => None,
242        }
243    }
244
245    /// Attempts weighted selection of a provider.
246    ///
247    /// # Arguments
248    /// * `configs` - RPC configurations
249    /// * `excluded_urls` - URLs of providers that have already been tried
250    /// * `allow_paused` - If true, allows selection of paused providers
251    /// * `health_store` - Health store instance
252    /// * `expiration` - Duration after which failures expire
253    ///
254    /// # Returns
255    /// * `Option<(usize, String)>` - Some(index, url) if a provider was selected, None otherwise
256    fn try_weighted_selection(
257        &self,
258        configs: &[RpcConfig],
259        excluded_urls: &std::collections::HashSet<String>,
260        allow_paused: bool,
261        health_store: &RpcHealthStore,
262        expiration: chrono::Duration,
263    ) -> Option<(usize, String)> {
264        let dist = self.weights_dist.as_ref()?;
265        let mut rng = rand::rng();
266
267        const MAX_ATTEMPTS: usize = 10;
268        for _ in 0..MAX_ATTEMPTS {
269            let index = dist.sample(&mut rng);
270            // Skip providers with zero weight
271            if configs[index].get_weight() == 0 {
272                continue;
273            }
274            // Skip providers already tried in this failover cycle
275            if excluded_urls.contains(&configs[index].url) {
276                continue;
277            }
278            // Check health status unless paused providers are allowed
279            if !allow_paused
280                && health_store.is_paused(&configs[index].url, self.failure_threshold, expiration)
281            {
282                continue;
283            }
284            // Found a suitable provider
285            self.current_index.store(index, Ordering::Relaxed);
286            self.has_current.store(true, Ordering::Relaxed);
287            return Some((index, configs[index].url.clone()));
288        }
289        None
290    }
291
292    /// Attempts round-robin selection of a provider.
293    ///
294    /// # Arguments
295    /// * `configs` - RPC configurations
296    /// * `excluded_urls` - URLs of providers that have already been tried
297    /// * `allow_paused` - If true, allows selection of paused providers
298    /// * `health_store` - Health store instance
299    /// * `expiration` - Duration after which failures expire
300    /// * `start_index` - Starting index for round-robin iteration
301    ///
302    /// # Returns
303    /// * `Option<(usize, String)>` - Some(index, url) if a provider was selected, None otherwise
304    fn try_round_robin_selection(
305        &self,
306        configs: &[RpcConfig],
307        excluded_urls: &std::collections::HashSet<String>,
308        allow_paused: bool,
309        health_store: &RpcHealthStore,
310        expiration: chrono::Duration,
311        start_index: usize,
312    ) -> Option<(usize, String)> {
313        let len = configs.len();
314        for i in 0..len {
315            let index = (start_index + i) % len;
316            // Skip providers with zero weight
317            if configs[index].get_weight() == 0 {
318                continue;
319            }
320            // Skip providers already tried in this failover cycle
321            if excluded_urls.contains(&configs[index].url) {
322                continue;
323            }
324            // Check health status unless paused providers are allowed
325            if !allow_paused
326                && health_store.is_paused(&configs[index].url, self.failure_threshold, expiration)
327            {
328                continue;
329            }
330            // Found a suitable provider
331            // Update the next_index atomically to point after this provider
332            self.next_index.store((index + 1) % len, Ordering::Relaxed);
333            self.current_index.store(index, Ordering::Relaxed);
334            self.has_current.store(true, Ordering::Relaxed);
335            return Some((index, configs[index].url.clone()));
336        }
337        None
338    }
339
340    /// Gets the URL of the next RPC endpoint based on the selection strategy.
341    ///
342    /// This method first tries to select non-paused providers. If no non-paused providers
343    /// are available, it falls back to paused providers as a last resort, since they might
344    /// have recovered.
345    ///
346    /// # Arguments
347    /// * `excluded_urls` - URLs of providers that have already been tried in the current failover cycle
348    fn select_url_internal(
349        &self,
350        excluded_urls: &std::collections::HashSet<String>,
351    ) -> Result<String, RpcSelectorError> {
352        let configs = self.configs.read();
353        if configs.is_empty() {
354            return Err(RpcSelectorError::NoProviders);
355        }
356
357        let health_store = RpcHealthStore::instance();
358        let expiration = chrono::Duration::seconds(self.failure_expiration_secs as i64);
359
360        // For a single provider, handle special case
361        if configs.len() == 1 {
362            // Skip providers with zero weight
363            if configs[0].get_weight() == 0 {
364                return Err(RpcSelectorError::AllProvidersFailed);
365            }
366            // Skip if already tried
367            if excluded_urls.contains(&configs[0].url) {
368                return Err(RpcSelectorError::AllProvidersFailed);
369            }
370            // Even if paused, try it as last resort
371            self.current_index.store(0, Ordering::Relaxed);
372            self.has_current.store(true, Ordering::Relaxed);
373            return Ok(configs[0].url.clone());
374        }
375
376        // First, try to find a non-paused provider
377        // Try weighted selection first if available
378        if let Some((_, url)) = self.try_weighted_selection(
379            &configs,
380            excluded_urls,
381            false, // allow_paused = false
382            health_store,
383            expiration,
384        ) {
385            return Ok(url);
386        }
387
388        // Fall back to round-robin selection for non-paused providers
389        let start_index = self.next_index.load(Ordering::Relaxed) % configs.len();
390        if let Some((_, url)) = self.try_round_robin_selection(
391            &configs,
392            excluded_urls,
393            false, // allow_paused = false
394            health_store,
395            expiration,
396            start_index,
397        ) {
398            return Ok(url);
399        }
400
401        // If we get here, no non-paused providers are available
402        // Fall back to paused providers as a last resort
403        tracing::warn!(
404            "No non-paused providers available, falling back to paused providers as last resort"
405        );
406
407        // Try weighted selection for paused providers
408        if let Some((_, url)) = self.try_weighted_selection(
409            &configs,
410            excluded_urls,
411            true, // allow_paused = true
412            health_store,
413            expiration,
414        ) {
415            return Ok(url);
416        }
417
418        // Fall back to round-robin for paused providers
419        if let Some((_, url)) = self.try_round_robin_selection(
420            &configs,
421            excluded_urls,
422            true, // allow_paused = true
423            health_store,
424            expiration,
425            start_index,
426        ) {
427            return Ok(url);
428        }
429
430        // If we get here, all providers have zero weight (shouldn't happen in practice)
431        Err(RpcSelectorError::AllProvidersFailed)
432    }
433
434    /// Gets the URL of the currently selected RPC endpoint.
435    ///
436    /// # Returns
437    /// * `Result<String, RpcSelectorError>` - The URL of the current provider, or an error
438    pub fn get_current_url(&self) -> Result<String, RpcSelectorError> {
439        self.select_url_internal(&std::collections::HashSet::new())
440    }
441
442    /// Gets the URL of the next RPC endpoint, excluding providers that have already been tried.
443    ///
444    /// # Arguments
445    /// * `excluded_urls` - URLs of providers that have already been tried in the current failover cycle
446    ///
447    /// # Returns
448    /// * `Result<String, RpcSelectorError>` - The URL of the next provider, or an error
449    pub fn get_next_url(
450        &self,
451        excluded_urls: &std::collections::HashSet<String>,
452    ) -> Result<String, RpcSelectorError> {
453        self.select_url_internal(excluded_urls)
454    }
455
456    /// Gets the URL of the next RPC endpoint (for backward compatibility in tests).
457    /// This method doesn't exclude any providers - use `get_next_url()` with excluded URLs in production code.
458    #[cfg(test)]
459    pub fn select_url(&self) -> Result<String, RpcSelectorError> {
460        self.select_url_internal(&std::collections::HashSet::new())
461    }
462
463    /// Gets a client for the selected RPC endpoint.
464    ///
465    /// # Arguments
466    /// * `initializer` - A function that takes a URL string and returns a `Result<T>`
467    /// * `excluded_urls` - URLs of providers that have already been tried in the current failover cycle
468    ///
469    /// # Returns
470    /// * `Result<T>` - The client instance or an error
471    pub fn get_client<T>(
472        &self,
473        initializer: impl Fn(&str) -> Result<T>,
474        excluded_urls: &std::collections::HashSet<String>,
475    ) -> Result<T, RpcSelectorError> {
476        let url = self.select_url_internal(excluded_urls)?;
477
478        initializer(&url).map_err(|e| RpcSelectorError::ClientInitializationError(e.to_string()))
479    }
480}
481
482// Implement Clone for RpcSelector manually since the generic T doesn't require Clone
483impl Clone for RpcSelector {
484    fn clone(&self) -> Self {
485        Self {
486            configs: Arc::new(RwLock::new(self.configs.read().clone())),
487            weights_dist: self.weights_dist.clone(),
488            next_index: Arc::clone(&self.next_index),
489            current_index: Arc::clone(&self.current_index),
490            has_current: Arc::clone(&self.has_current),
491            failure_threshold: self.failure_threshold,
492            pause_duration_secs: self.pause_duration_secs,
493            failure_expiration_secs: self.failure_expiration_secs,
494        }
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501    use crate::services::provider::rpc_health_store::RpcHealthStore;
502    use serial_test::serial;
503    use std::sync::Arc;
504    use std::thread;
505
506    #[test]
507    fn test_create_weights_distribution_single_config() {
508        let configs = vec![RpcConfig {
509            url: "https://example.com/rpc".to_string(),
510            weight: 1,
511            ..Default::default()
512        }];
513
514        let excluded = HashSet::new();
515        let result = RpcSelector::create_weights_distribution(&configs, &excluded);
516        assert!(result.is_none());
517    }
518
519    #[test]
520    fn test_create_weights_distribution_equal_weights() {
521        let configs = vec![
522            RpcConfig {
523                url: "https://example1.com/rpc".to_string(),
524                weight: 5,
525                ..Default::default()
526            },
527            RpcConfig {
528                url: "https://example2.com/rpc".to_string(),
529                weight: 5,
530                ..Default::default()
531            },
532            RpcConfig {
533                url: "https://example3.com/rpc".to_string(),
534                weight: 5,
535                ..Default::default()
536            },
537        ];
538
539        let excluded = HashSet::new();
540        let result = RpcSelector::create_weights_distribution(&configs, &excluded);
541        assert!(result.is_none());
542    }
543
544    #[test]
545    fn test_create_weights_distribution_different_weights() {
546        let configs = vec![
547            RpcConfig {
548                url: "https://example1.com/rpc".to_string(),
549                weight: 1,
550                ..Default::default()
551            },
552            RpcConfig {
553                url: "https://example2.com/rpc".to_string(),
554                weight: 2,
555                ..Default::default()
556            },
557            RpcConfig {
558                url: "https://example3.com/rpc".to_string(),
559                weight: 3,
560                ..Default::default()
561            },
562        ];
563
564        let excluded = HashSet::new();
565        let result = RpcSelector::create_weights_distribution(&configs, &excluded);
566        assert!(result.is_some());
567    }
568
569    #[test]
570    fn test_create_weights_distribution_with_excluded() {
571        let configs = vec![
572            RpcConfig {
573                url: "https://example1.com/rpc".to_string(),
574                weight: 1,
575                ..Default::default()
576            },
577            RpcConfig {
578                url: "https://example2.com/rpc".to_string(),
579                weight: 2,
580                ..Default::default()
581            },
582            RpcConfig {
583                url: "https://example3.com/rpc".to_string(),
584                weight: 3,
585                ..Default::default()
586            },
587        ];
588
589        // Exclude the first provider
590        let mut excluded = HashSet::new();
591        excluded.insert(0);
592
593        let result = RpcSelector::create_weights_distribution(&configs, &excluded);
594        assert!(result.is_some());
595
596        // Exclude two providers (with only one remaining, should return None)
597        excluded.insert(1);
598        let result = RpcSelector::create_weights_distribution(&configs, &excluded);
599        assert!(result.is_none());
600    }
601
602    #[test]
603    fn test_rpc_selector_new_empty_configs() {
604        let configs: Vec<RpcConfig> = vec![];
605        let result = RpcSelector::new_with_defaults(configs);
606        assert!(result.is_err());
607        assert!(matches!(result.unwrap_err(), RpcSelectorError::NoProviders));
608    }
609
610    #[test]
611    fn test_rpc_selector_new_single_config() {
612        let configs = vec![RpcConfig {
613            url: "https://example.com/rpc".to_string(),
614            weight: 1,
615            ..Default::default()
616        }];
617
618        let result = RpcSelector::new_with_defaults(configs);
619        assert!(result.is_ok());
620        let selector = result.unwrap();
621        assert!(selector.weights_dist.is_none());
622    }
623
624    #[test]
625    fn test_rpc_selector_new_multiple_equal_weights() {
626        let configs = vec![
627            RpcConfig {
628                url: "https://example1.com/rpc".to_string(),
629                weight: 5,
630                ..Default::default()
631            },
632            RpcConfig {
633                url: "https://example2.com/rpc".to_string(),
634                weight: 5,
635                ..Default::default()
636            },
637        ];
638
639        let result = RpcSelector::new_with_defaults(configs);
640        assert!(result.is_ok());
641        let selector = result.unwrap();
642        assert!(selector.weights_dist.is_none());
643    }
644
645    #[test]
646    fn test_rpc_selector_new_multiple_different_weights() {
647        let configs = vec![
648            RpcConfig {
649                url: "https://example1.com/rpc".to_string(),
650                weight: 1,
651                ..Default::default()
652            },
653            RpcConfig {
654                url: "https://example2.com/rpc".to_string(),
655                weight: 3,
656                ..Default::default()
657            },
658        ];
659
660        let result = RpcSelector::new_with_defaults(configs);
661        assert!(result.is_ok());
662        let selector = result.unwrap();
663        assert!(selector.weights_dist.is_some());
664    }
665
666    #[test]
667    fn test_rpc_selector_select_url_single_provider() {
668        let configs = vec![RpcConfig {
669            url: "https://example.com/rpc".to_string(),
670            weight: 1,
671            ..Default::default()
672        }];
673
674        let selector = RpcSelector::new_with_defaults(configs).unwrap();
675        let result = selector.select_url();
676        assert!(result.is_ok());
677        assert_eq!(result.unwrap(), "https://example.com/rpc");
678        assert!(selector.has_current.load(Ordering::Relaxed));
679    }
680
681    #[test]
682    fn test_rpc_selector_select_url_round_robin() {
683        let configs = vec![
684            RpcConfig {
685                url: "https://example1.com/rpc".to_string(),
686                weight: 1,
687                ..Default::default()
688            },
689            RpcConfig {
690                url: "https://example2.com/rpc".to_string(),
691                weight: 1,
692                ..Default::default()
693            },
694        ];
695
696        let selector = RpcSelector::new_with_defaults(configs).unwrap();
697
698        // First call should return the first URL
699        let first_url = selector.select_url().unwrap();
700        // Second call should return the second URL due to round-robin
701        let second_url = selector.select_url().unwrap();
702        // Third call should return the first URL again
703        let third_url = selector.select_url().unwrap();
704
705        // We don't know which URL comes first, but the sequence should alternate
706        assert_ne!(first_url, second_url);
707        assert_eq!(first_url, third_url);
708    }
709
710    #[test]
711    fn test_rpc_selector_get_client_success() {
712        let configs = vec![RpcConfig {
713            url: "https://example.com/rpc".to_string(),
714            weight: 1,
715            ..Default::default()
716        }];
717
718        let selector = RpcSelector::new_with_defaults(configs).unwrap();
719
720        // Create a simple initializer function that returns the URL as a string
721        let initializer = |url: &str| -> Result<String> { Ok(url.to_string()) };
722
723        let result = selector.get_client(initializer, &std::collections::HashSet::new());
724        assert!(result.is_ok());
725        assert_eq!(result.unwrap(), "https://example.com/rpc");
726    }
727
728    #[test]
729    fn test_rpc_selector_get_client_failure() {
730        let configs = vec![RpcConfig {
731            url: "https://example.com/rpc".to_string(),
732            weight: 1,
733            ..Default::default()
734        }];
735
736        let selector = RpcSelector::new_with_defaults(configs).unwrap();
737
738        // Create a failing initializer function
739        let initializer =
740            |_url: &str| -> Result<String> { Err(eyre::eyre!("Initialization error")) };
741
742        let result = selector.get_client(initializer, &std::collections::HashSet::new());
743        assert!(result.is_err());
744        assert!(matches!(
745            result.unwrap_err(),
746            RpcSelectorError::ClientInitializationError(_)
747        ));
748    }
749
750    #[test]
751    fn test_rpc_selector_clone() {
752        let configs = vec![
753            RpcConfig {
754                url: "https://example1.com/rpc".to_string(),
755                weight: 1,
756                ..Default::default()
757            },
758            RpcConfig {
759                url: "https://example2.com/rpc".to_string(),
760                weight: 3,
761                ..Default::default()
762            },
763        ];
764
765        let selector = RpcSelector::new_with_defaults(configs).unwrap();
766        let cloned = selector.clone();
767
768        // Check that the cloned selector has the same configuration
769        assert_eq!(selector.configs.read().len(), cloned.configs.read().len());
770        assert_eq!(selector.configs.read()[0].url, cloned.configs.read()[0].url);
771        assert_eq!(selector.configs.read()[1].url, cloned.configs.read()[1].url);
772
773        // Check that weights distribution is also cloned
774        assert_eq!(
775            selector.weights_dist.is_some(),
776            cloned.weights_dist.is_some()
777        );
778    }
779
780    #[test]
781    #[serial]
782    fn test_mark_current_as_failed_single_provider() {
783        // Clear health store to ensure clean state
784        RpcHealthStore::instance().clear_all();
785
786        // With a single provider, marking as failed multiple times (to reach threshold) will pause it,
787        // but it can still be selected as a last resort
788        let configs = vec![RpcConfig {
789            url: "https://test-single-provider.example.com/rpc".to_string(),
790            weight: 1,
791            ..Default::default()
792        }];
793
794        // Create selector with threshold=1 for testing
795        let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
796        let _initial_url = selector.select_url().unwrap();
797
798        // Mark as failed once (with threshold=1, this will pause it)
799        selector.mark_current_as_failed();
800
801        // With threshold=1, available_provider_count should be 0
802        assert_eq!(selector.available_provider_count(), 0);
803
804        // But select_url should still work (selecting paused provider as last resort)
805        let next_url = selector.select_url();
806        assert!(next_url.is_ok());
807        assert_eq!(
808            next_url.unwrap(),
809            "https://test-single-provider.example.com/rpc"
810        );
811    }
812
813    #[test]
814    #[serial]
815    fn test_mark_current_as_failed_multiple_providers() {
816        // Clear health store to ensure clean state
817        RpcHealthStore::instance().clear_all();
818
819        // With multiple providers, marking as failed (with threshold=1) will pause them,
820        // but they can still be selected as a last resort
821        let configs = vec![
822            RpcConfig {
823                url: "https://test-multi1.example.com/rpc".to_string(),
824                weight: 5,
825                ..Default::default()
826            },
827            RpcConfig {
828                url: "https://test-multi2.example.com/rpc".to_string(),
829                weight: 5,
830                ..Default::default()
831            },
832            RpcConfig {
833                url: "https://test-multi3.example.com/rpc".to_string(),
834                weight: 5,
835                ..Default::default()
836            },
837        ];
838
839        // Create selector with threshold=1 for testing
840        let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
841
842        // Get the first URL
843        let url1 = selector.select_url().unwrap().to_string();
844
845        // Mark as failed (with threshold=1, this pauses it)
846        selector.mark_current_as_failed();
847        // Available count should decrease
848        assert_eq!(selector.available_provider_count(), 2);
849
850        // Next selection should prefer non-paused providers
851        let url2 = selector.select_url().unwrap().to_string();
852        // Should be different from the paused one
853        assert_ne!(url1, url2);
854
855        // Mark the second URL as failed too
856        selector.mark_current_as_failed();
857        assert_eq!(selector.available_provider_count(), 1);
858
859        let url3 = selector.select_url().unwrap().to_string();
860        // Should get the third URL (non-paused)
861        assert_ne!(url1, url3);
862        assert_ne!(url2, url3);
863
864        // Mark the third URL as failed too
865        selector.mark_current_as_failed();
866        assert_eq!(selector.available_provider_count(), 0);
867
868        // Now all URLs are paused, but select_url should still work (selecting paused providers as last resort)
869        let url4 = selector.select_url();
870        assert!(url4.is_ok());
871        // Should return one of the paused providers
872        let url4_str = url4.unwrap();
873        assert!(
874            url4_str == "https://test-multi1.example.com/rpc"
875                || url4_str == "https://test-multi2.example.com/rpc"
876                || url4_str == "https://test-multi3.example.com/rpc"
877        );
878    }
879
880    #[test]
881    #[serial]
882    fn test_mark_current_as_failed_weighted() {
883        // Clear health store to ensure clean state
884        RpcHealthStore::instance().clear_all();
885
886        // Test with weighted selection
887        let configs = vec![
888            RpcConfig {
889                url: "https://test-weighted1.example.com/rpc".to_string(),
890                weight: 1, // Low weight
891                ..Default::default()
892            },
893            RpcConfig {
894                url: "https://test-weighted2.example.com/rpc".to_string(),
895                weight: 10, // High weight
896                ..Default::default()
897            },
898        ];
899
900        // Create selector with threshold=1 for testing
901        let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
902        assert!(selector.weights_dist.is_some()); // Confirm we're using weighted selection
903
904        // Get a URL
905        let url1 = selector.select_url().unwrap().to_string();
906
907        // Mark it as failed (with threshold=1, this pauses it)
908        selector.mark_current_as_failed();
909        assert_eq!(selector.available_provider_count(), 1);
910
911        // Get another URL, it should prefer the non-paused one
912        let url2 = selector.select_url().unwrap().to_string();
913        assert_ne!(url1, url2);
914
915        // Mark this one as failed too
916        selector.mark_current_as_failed();
917        assert_eq!(selector.available_provider_count(), 0);
918
919        // With all providers paused, select_url should still work (selecting paused providers as last resort)
920        let url3 = selector.select_url();
921        assert!(url3.is_ok());
922        let url3_str = url3.unwrap();
923        assert!(
924            url3_str == "https://test-weighted1.example.com/rpc"
925                || url3_str == "https://test-weighted2.example.com/rpc"
926        );
927    }
928
929    #[test]
930    fn test_provider_count() {
931        // Test with no providers
932        let configs: Vec<RpcConfig> = vec![];
933        let result = RpcSelector::new_with_defaults(configs);
934        assert!(result.is_err());
935
936        // Test with a single provider
937        let configs = vec![RpcConfig {
938            url: "https://example.com/rpc".to_string(),
939            weight: 1,
940            ..Default::default()
941        }];
942        let selector = RpcSelector::new_with_defaults(configs).unwrap();
943        assert_eq!(selector.provider_count(), 1);
944
945        // Test with multiple providers
946        let configs = vec![
947            RpcConfig {
948                url: "https://example1.com/rpc".to_string(),
949                weight: 1,
950                ..Default::default()
951            },
952            RpcConfig {
953                url: "https://example2.com/rpc".to_string(),
954                weight: 2,
955                ..Default::default()
956            },
957            RpcConfig {
958                url: "https://example3.com/rpc".to_string(),
959                weight: 3,
960                ..Default::default()
961            },
962        ];
963        let selector = RpcSelector::new_with_defaults(configs).unwrap();
964        assert_eq!(selector.provider_count(), 3);
965    }
966
967    #[test]
968    #[serial]
969    fn test_available_provider_count() {
970        // Clear health store to ensure clean state
971        RpcHealthStore::instance().clear_all();
972
973        let configs = vec![
974            RpcConfig {
975                url: "https://test-available1.example.com/rpc".to_string(),
976                weight: 1,
977                ..Default::default()
978            },
979            RpcConfig {
980                url: "https://test-available2.example.com/rpc".to_string(),
981                weight: 2,
982                ..Default::default()
983            },
984            RpcConfig {
985                url: "https://test-available3.example.com/rpc".to_string(),
986                weight: 3,
987                ..Default::default()
988            },
989        ];
990
991        // Create selector with threshold=1 for testing
992        let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
993        assert_eq!(selector.provider_count(), 3);
994        assert_eq!(selector.available_provider_count(), 3);
995
996        // Mark one provider as failed (with threshold=1, this pauses it)
997        selector.select_url().unwrap(); // Select a provider first
998        selector.mark_current_as_failed();
999        // Available count should decrease (only non-paused providers)
1000        assert_eq!(selector.available_provider_count(), 2);
1001
1002        // Mark another provider as failed
1003        selector.select_url().unwrap(); // Select another provider
1004        selector.mark_current_as_failed();
1005        assert_eq!(selector.available_provider_count(), 1);
1006    }
1007
1008    #[test]
1009    fn test_get_current_url() {
1010        let configs = vec![
1011            RpcConfig::new("https://example1.com/rpc".to_string()),
1012            RpcConfig::new("https://example2.com/rpc".to_string()),
1013        ];
1014
1015        let selector = RpcSelector::new_with_defaults(configs).unwrap();
1016
1017        // Should return a valid URL
1018        let url = selector.get_current_url();
1019        assert!(url.is_ok());
1020        let url_str = url.unwrap();
1021        assert!(
1022            url_str == "https://example1.com/rpc" || url_str == "https://example2.com/rpc",
1023            "Unexpected URL: {url_str}"
1024        );
1025    }
1026
1027    #[test]
1028    #[serial]
1029    fn test_concurrent_usage() {
1030        // Clear health store to ensure clean state
1031        RpcHealthStore::instance().clear_all();
1032
1033        // Test RpcSelector with concurrent access from multiple threads
1034        let configs = vec![
1035            RpcConfig::new("https://test-concurrent1.example.com/rpc".to_string()),
1036            RpcConfig::new("https://test-concurrent2.example.com/rpc".to_string()),
1037            RpcConfig::new("https://test-concurrent3.example.com/rpc".to_string()),
1038        ];
1039
1040        // Create selector with threshold=1 for testing
1041        let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1042        let selector_arc = Arc::new(selector);
1043
1044        let mut handles = Vec::with_capacity(10);
1045
1046        // Launch 10 threads that select and mark providers
1047        for _ in 0..10 {
1048            let selector_clone = Arc::clone(&selector_arc);
1049            let handle = thread::spawn(move || {
1050                let url = selector_clone.select_url().unwrap().to_string();
1051                if url.contains("test-concurrent1") {
1052                    // Only mark example1 as failed
1053                    selector_clone.mark_current_as_failed();
1054                }
1055                url
1056            });
1057            handles.push(handle);
1058        }
1059
1060        // Collect results
1061        let mut urls = Vec::new();
1062        for handle in handles {
1063            urls.push(handle.join().unwrap());
1064        }
1065
1066        // Check that at least some threads got different URLs
1067        let unique_urls: std::collections::HashSet<String> = urls.into_iter().collect();
1068        assert!(unique_urls.len() > 1, "Expected multiple unique URLs");
1069
1070        // After all threads, example1 should be marked as failed (paused)
1071        // Selections should prefer non-paused providers
1072        let mut found_non_example1 = false;
1073        for _ in 0..10 {
1074            let url = selector_arc.select_url().unwrap().to_string();
1075            if !url.contains("test-concurrent1") {
1076                found_non_example1 = true;
1077            }
1078        }
1079
1080        assert!(found_non_example1, "Should prefer non-paused providers");
1081    }
1082
1083    #[test]
1084    fn test_consecutive_mark_as_failed() {
1085        let configs = vec![
1086            RpcConfig::new("https://example1.com/rpc".to_string()),
1087            RpcConfig::new("https://example2.com/rpc".to_string()),
1088        ];
1089
1090        let selector = RpcSelector::new_with_defaults(configs).unwrap();
1091
1092        // First call to select a provider
1093        selector.select_url().unwrap();
1094
1095        // Mark as failed twice consecutively without selecting in between
1096        selector.mark_current_as_failed();
1097        selector.mark_current_as_failed(); // This should be a no-op since has_current is now 0
1098
1099        // We should still be able to select a provider (since only one was marked failed)
1100        let result = selector.select_url();
1101        assert!(result.is_ok());
1102    }
1103
1104    #[test]
1105    #[serial]
1106    fn test_weighted_to_round_robin_fallback() {
1107        // Clear health store to ensure clean state
1108        RpcHealthStore::instance().clear_all();
1109
1110        let configs = vec![
1111            RpcConfig {
1112                url: "https://test-wrr1.example.com/rpc".to_string(),
1113                weight: 10, // High weight
1114                ..Default::default()
1115            },
1116            RpcConfig {
1117                url: "https://test-wrr2.example.com/rpc".to_string(),
1118                weight: 1, // Low weight
1119                ..Default::default()
1120            },
1121            RpcConfig {
1122                url: "https://test-wrr3.example.com/rpc".to_string(),
1123                weight: 1, // Low weight
1124                ..Default::default()
1125            },
1126        ];
1127
1128        // Create selector with threshold=1 for testing
1129        let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1130        assert!(selector.weights_dist.is_some()); // Using weighted selection
1131
1132        // Mock a situation where weighted selection would fail multiple times
1133        // by marking the high-weight provider as failed
1134        let mut selected_first = false;
1135
1136        // Try multiple times - the first provider should be selected more often due to weight
1137        for _ in 0..10 {
1138            let url = selector.select_url().unwrap();
1139            if url.contains("test-wrr1") {
1140                selected_first = true;
1141                // Mark the high-weight provider as failed (pauses it)
1142                selector.mark_current_as_failed();
1143                break;
1144            }
1145        }
1146
1147        assert!(
1148            selected_first,
1149            "High-weight provider should have been selected"
1150        );
1151
1152        // After marking it failed (paused), selections should prefer the other providers (non-paused)
1153        let mut seen_urls = HashSet::new();
1154        for _ in 0..10 {
1155            let url = selector.select_url().unwrap().to_string();
1156            seen_urls.insert(url);
1157        }
1158
1159        // Should have seen at least example2 and example3 (non-paused providers)
1160        assert!(seen_urls.len() >= 2);
1161        assert!(
1162            !seen_urls.iter().any(|url| url.contains("test-wrr1")),
1163            "Paused provider should not be selected (prefer non-paused)"
1164        );
1165    }
1166
1167    #[test]
1168    fn test_zero_weight_providers() {
1169        let configs = vec![
1170            RpcConfig {
1171                url: "https://example1.com/rpc".to_string(),
1172                weight: 0, // Zero weight
1173                ..Default::default()
1174            },
1175            RpcConfig {
1176                url: "https://example2.com/rpc".to_string(),
1177                weight: 5, // Normal weight
1178                ..Default::default()
1179            },
1180        ];
1181
1182        let selector = RpcSelector::new_with_defaults(configs).unwrap();
1183
1184        // With weighted selection, should never select the zero-weight provider
1185        let mut seen_urls = HashSet::new();
1186        for _ in 0..10 {
1187            let url = selector.select_url().unwrap().to_string();
1188            seen_urls.insert(url);
1189        }
1190
1191        assert_eq!(seen_urls.len(), 1);
1192        assert!(
1193            seen_urls.iter().next().unwrap().contains("example2"),
1194            "Only the non-zero weight provider should be selected"
1195        );
1196    }
1197
1198    #[test]
1199    #[serial]
1200    fn test_extreme_weight_differences() {
1201        let configs = vec![
1202            RpcConfig {
1203                url: "https://example1.com/rpc".to_string(),
1204                weight: 100, // Very high weight
1205                ..Default::default()
1206            },
1207            RpcConfig {
1208                url: "https://example2.com/rpc".to_string(),
1209                weight: 1, // Very low weight
1210                ..Default::default()
1211            },
1212        ];
1213
1214        let selector = RpcSelector::new_with_defaults(configs).unwrap();
1215
1216        // High weight provider should be selected much more frequently
1217        let mut count_high = 0;
1218
1219        for _ in 0..100 {
1220            let url = selector.select_url().unwrap().to_string();
1221            if url.contains("example1") {
1222                count_high += 1;
1223            }
1224
1225            // Reset to clear current selection
1226            selector.has_current.store(false, Ordering::Relaxed);
1227        }
1228
1229        // High-weight provider should be selected at least 90% of the time
1230        assert!(
1231            count_high > 90,
1232            "High-weight provider selected only {}/{} times",
1233            count_high,
1234            100
1235        );
1236    }
1237
1238    #[test]
1239    fn test_mark_unselected_as_failed() {
1240        let configs = vec![
1241            RpcConfig::new("https://example1.com/rpc".to_string()),
1242            RpcConfig::new("https://example2.com/rpc".to_string()),
1243        ];
1244
1245        let selector = RpcSelector::new_with_defaults(configs).unwrap();
1246
1247        // Without selecting, mark as failed (should be a no-op)
1248        selector.mark_current_as_failed();
1249
1250        // Should still be able to select both providers
1251        let mut seen_urls = HashSet::new();
1252        for _ in 0..10 {
1253            let url = selector.select_url().unwrap().to_string();
1254            seen_urls.insert(url);
1255
1256            // Reset for next iteration
1257            selector.has_current.store(false, Ordering::Relaxed);
1258        }
1259
1260        assert_eq!(
1261            seen_urls.len(),
1262            2,
1263            "Both providers should still be available"
1264        );
1265    }
1266
1267    #[test]
1268    fn test_rpc_selector_error_serialization() {
1269        let error = RpcSelectorError::NoProviders;
1270        let json = serde_json::to_string(&error).unwrap();
1271        assert!(json.contains("NoProviders"));
1272
1273        let error = RpcSelectorError::ClientInitializationError("test error".to_string());
1274        let json = serde_json::to_string(&error).unwrap();
1275        assert!(json.contains("ClientInitializationError"));
1276        assert!(json.contains("test error"));
1277
1278        let error = RpcSelectorError::WeightedIndexError("index error".to_string());
1279        let json = serde_json::to_string(&error).unwrap();
1280        assert!(json.contains("WeightedIndexError"));
1281        assert!(json.contains("index error"));
1282
1283        let error = RpcSelectorError::AllProvidersFailed;
1284        let json = serde_json::to_string(&error).unwrap();
1285        assert!(json.contains("AllProvidersFailed"));
1286    }
1287
1288    #[cfg(test)]
1289    mod rate_limiting_tests {
1290        use super::*;
1291        use crate::services::provider::rpc_health_store::RpcHealthStore;
1292
1293        /// Test that RpcSelector switches to the second RPC when the first one is rate-limited.
1294        ///
1295        /// This test simulates a scenario where:
1296        /// 1. Two RPC configs are set up with equal weights
1297        /// 2. The first RPC starts returning rate limit errors (429)
1298        /// 3. The selector should switch to the second RPC
1299        /// 4. The first RPC should be marked as failed and excluded from selection
1300        #[test]
1301        #[serial]
1302        fn test_rpc_selector_switches_on_rate_limit() {
1303            RpcHealthStore::instance().clear_all();
1304            let configs = vec![
1305                RpcConfig {
1306                    url: "https://test-rate-limit1.example.com".to_string(),
1307                    weight: 100,
1308                    ..Default::default()
1309                },
1310                RpcConfig {
1311                    url: "https://test-rate-limit2.example.com".to_string(),
1312                    weight: 100,
1313                    ..Default::default()
1314                },
1315            ];
1316
1317            // Create selector with threshold=1 for testing
1318            let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1319
1320            // Initially, both providers should be available
1321            assert_eq!(selector.available_provider_count(), 2);
1322
1323            // Select the first provider
1324            let first_url = selector.select_url().unwrap();
1325
1326            // Verify we got a valid URL
1327            assert!(
1328                first_url == "https://test-rate-limit1.example.com"
1329                    || first_url == "https://test-rate-limit2.example.com"
1330            );
1331
1332            // Simulate rate limiting: mark the current provider as failed
1333            // This simulates what happens when a provider returns HTTP 429 after retries are exhausted
1334            selector.mark_current_as_failed();
1335
1336            // Now only one provider should be available (non-paused)
1337            assert_eq!(selector.available_provider_count(), 1);
1338
1339            // The next selection should prefer the non-paused provider
1340            let second_url = selector.select_url().unwrap();
1341
1342            // Verify we got a different URL (the non-paused one)
1343            assert_ne!(first_url, second_url);
1344
1345            // Verify the failed provider is not selected again (prefer non-paused)
1346            let third_url = selector.select_url().unwrap();
1347            assert_eq!(second_url, third_url); // Should keep using the working provider
1348
1349            // Verify the failed provider is excluded from preferred selection
1350            let mut selected_urls = std::collections::HashSet::new();
1351            for _ in 0..10 {
1352                let url = selector.select_url().unwrap();
1353                selected_urls.insert(url.to_string());
1354            }
1355
1356            // Should only select from the non-failed provider (preferred)
1357            assert_eq!(selected_urls.len(), 1);
1358            assert!(!selected_urls.contains(&first_url.to_string()));
1359            assert!(selected_urls.contains(&second_url.to_string()));
1360        }
1361
1362        /// Test that RpcSelector handles rate limiting with weighted selection.
1363        ///
1364        /// This test verifies that even with weighted selection, a rate-limited provider
1365        /// is excluded and the selector falls back to the other provider.
1366        #[test]
1367        #[serial]
1368        fn test_rpc_selector_rate_limit_with_weighted_selection() {
1369            RpcHealthStore::instance().clear_all();
1370            let configs = vec![
1371                RpcConfig {
1372                    url: "https://test-weighted-rl1.example.com".to_string(),
1373                    weight: 80, // Higher weight, should be preferred
1374                    ..Default::default()
1375                },
1376                RpcConfig {
1377                    url: "https://test-weighted-rl2.example.com".to_string(),
1378                    weight: 20, // Lower weight
1379                    ..Default::default()
1380                },
1381            ];
1382
1383            // Create selector with threshold=1 for testing
1384            let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1385
1386            // Select multiple times - with weighted selection, rpc1 should be selected more often
1387            let mut rpc1_count = 0;
1388            let mut rpc2_count = 0;
1389
1390            for _ in 0..20 {
1391                let url = selector.select_url().unwrap();
1392                if url == "https://test-weighted-rl1.example.com" {
1393                    rpc1_count += 1;
1394                } else {
1395                    rpc2_count += 1;
1396                }
1397            }
1398
1399            // With weighted selection, rpc1 should be selected more often
1400            assert!(rpc1_count > rpc2_count);
1401
1402            // Now simulate rate limiting on rpc1
1403            // First, select rpc1
1404            let mut selected_rpc1 = false;
1405            for _ in 0..10 {
1406                let url = selector.select_url().unwrap();
1407                if url == "https://test-weighted-rl1.example.com" {
1408                    selector.mark_current_as_failed();
1409                    selected_rpc1 = true;
1410                    break;
1411                }
1412            }
1413            assert!(selected_rpc1, "Should have selected rpc1 at least once");
1414
1415            // After marking rpc1 as failed (paused), selections should prefer rpc2 (non-paused)
1416            for _ in 0..20 {
1417                let url = selector.select_url().unwrap();
1418                assert_eq!(url, "https://test-weighted-rl2.example.com");
1419            }
1420
1421            // Verify only one provider is available (non-paused)
1422            assert_eq!(selector.available_provider_count(), 1);
1423        }
1424
1425        /// Test that a rate-limited provider stays failed.
1426        ///
1427        /// This test verifies that failed providers remain failed,
1428        /// which simulates persistence of health state.
1429        #[test]
1430        #[serial]
1431        fn test_rpc_selector_rate_limit_recovery() {
1432            RpcHealthStore::instance().clear_all();
1433            let configs = vec![
1434                RpcConfig {
1435                    url: "https://test-recovery1.example.com".to_string(),
1436                    weight: 100,
1437                    ..Default::default()
1438                },
1439                RpcConfig {
1440                    url: "https://test-recovery2.example.com".to_string(),
1441                    weight: 100,
1442                    ..Default::default()
1443                },
1444            ];
1445
1446            // Create selector with threshold=1 for testing
1447            let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1448
1449            // Select first provider
1450            let first_url = selector.select_url().unwrap();
1451
1452            // Mark it as failed (simulating rate limit, with threshold=1 this pauses it)
1453            selector.mark_current_as_failed();
1454            assert_eq!(selector.available_provider_count(), 1);
1455
1456            // Next selection should prefer the other provider (non-paused)
1457            let second_url = selector.select_url().unwrap();
1458            assert_ne!(first_url, second_url);
1459
1460            // Verify only the working provider is selected (prefer non-paused)
1461            for _ in 0..10 {
1462                let url = selector.select_url().unwrap();
1463                assert_eq!(url, second_url);
1464            }
1465
1466            // Since we persist health, the failed provider stays failed (paused)
1467            assert_eq!(selector.available_provider_count(), 1);
1468        }
1469
1470        /// Test that when both providers are rate-limited, the selector handles it gracefully.
1471        #[test]
1472        #[serial]
1473        fn test_rpc_selector_both_providers_rate_limited() {
1474            RpcHealthStore::instance().clear_all();
1475            let configs = vec![
1476                RpcConfig {
1477                    url: "https://test-both-rl1.example.com".to_string(),
1478                    weight: 100,
1479                    ..Default::default()
1480                },
1481                RpcConfig {
1482                    url: "https://test-both-rl2.example.com".to_string(),
1483                    weight: 100,
1484                    ..Default::default()
1485                },
1486            ];
1487
1488            // Create selector with threshold=1 for testing
1489            let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1490
1491            // Select and mark first provider as failed (pauses it)
1492            selector.select_url().unwrap();
1493            selector.mark_current_as_failed();
1494            assert_eq!(selector.available_provider_count(), 1);
1495
1496            // Select and mark second provider as failed (pauses it)
1497            selector.select_url().unwrap();
1498            selector.mark_current_as_failed();
1499            assert_eq!(selector.available_provider_count(), 0);
1500
1501            // Now all providers are paused, but select_url should still work (selecting paused providers as last resort)
1502            let result = selector.select_url();
1503            assert!(result.is_ok());
1504            let url = result.unwrap();
1505            assert!(
1506                url == "https://test-both-rl1.example.com"
1507                    || url == "https://test-both-rl2.example.com"
1508            );
1509        }
1510
1511        /// Test that rate limiting works correctly with round-robin fallback.
1512        ///
1513        /// This test verifies that when weighted selection fails due to rate limiting,
1514        /// the selector correctly falls back to round-robin selection.
1515        #[test]
1516        #[serial]
1517        fn test_rpc_selector_rate_limit_round_robin_fallback() {
1518            RpcHealthStore::instance().clear_all();
1519            let configs = vec![
1520                RpcConfig {
1521                    url: "https://test-rr-fallback1.example.com".to_string(),
1522                    weight: 100,
1523                    ..Default::default()
1524                },
1525                RpcConfig {
1526                    url: "https://test-rr-fallback2.example.com".to_string(),
1527                    weight: 100,
1528                    ..Default::default()
1529                },
1530                RpcConfig {
1531                    url: "https://test-rr-fallback3.example.com".to_string(),
1532                    weight: 100,
1533                    ..Default::default()
1534                },
1535            ];
1536
1537            // Create selector with threshold=1 for testing
1538            let selector = RpcSelector::new(configs, 1, 60, 60).unwrap();
1539
1540            // Mark rpc1 as failed (simulating rate limit)
1541            selector.select_url().unwrap();
1542            let first_url = selector.get_current_url().unwrap();
1543
1544            // If we got rpc1, mark it as failed
1545            if first_url == "https://test-rr-fallback1.example.com" {
1546                selector.mark_current_as_failed();
1547            } else {
1548                // Otherwise, select until we get rpc1, then mark it as failed
1549                loop {
1550                    let url = selector.select_url().unwrap();
1551                    if url == "https://test-rr-fallback1.example.com" {
1552                        selector.mark_current_as_failed();
1553                        break;
1554                    }
1555                }
1556            }
1557
1558            // Now rpc1 should be paused, and selections should prefer rpc2 and rpc3 (non-paused)
1559            let mut selected_urls = std::collections::HashSet::new();
1560            for _ in 0..20 {
1561                let url = selector.select_url().unwrap();
1562                selected_urls.insert(url.to_string());
1563                // rpc1 should not be selected (prefer non-paused)
1564                assert_ne!(url, "https://test-rr-fallback1.example.com");
1565            }
1566
1567            // Should have selected from both rpc2 and rpc3
1568            assert!(selected_urls.contains("https://test-rr-fallback2.example.com"));
1569            assert!(selected_urls.contains("https://test-rr-fallback3.example.com"));
1570            assert_eq!(selected_urls.len(), 2);
1571        }
1572
1573        #[test]
1574        #[serial]
1575        fn test_select_url_excludes_tried_providers() {
1576            RpcHealthStore::instance().clear_all();
1577            let configs = vec![
1578                RpcConfig {
1579                    url: "https://provider1.com".to_string(),
1580                    weight: 1,
1581                    ..Default::default()
1582                },
1583                RpcConfig {
1584                    url: "https://provider2.com".to_string(),
1585                    weight: 1,
1586                    ..Default::default()
1587                },
1588                RpcConfig {
1589                    url: "https://provider3.com".to_string(),
1590                    weight: 1,
1591                    ..Default::default()
1592                },
1593            ];
1594
1595            let selector = RpcSelector::new_with_defaults(configs).unwrap();
1596
1597            // Exclude provider1
1598            let mut excluded = std::collections::HashSet::new();
1599            excluded.insert("https://provider1.com".to_string());
1600
1601            // Should select provider2 or provider3, not provider1
1602            for _ in 0..10 {
1603                let url = selector.get_next_url(&excluded).unwrap();
1604                assert_ne!(url, "https://provider1.com");
1605            }
1606        }
1607
1608        #[test]
1609        #[serial]
1610        fn test_select_url_fallback_to_paused_providers() {
1611            RpcHealthStore::instance().clear_all();
1612            let configs = vec![
1613                RpcConfig {
1614                    url: "https://provider1.com".to_string(),
1615                    weight: 1,
1616                    ..Default::default()
1617                },
1618                RpcConfig {
1619                    url: "https://provider2.com".to_string(),
1620                    weight: 1,
1621                    ..Default::default()
1622                },
1623            ];
1624
1625            let selector = RpcSelector::new_with_defaults(configs).unwrap();
1626            let health_store = RpcHealthStore::instance();
1627            let expiration = chrono::Duration::seconds(60);
1628
1629            // Pause both providers
1630            health_store.mark_failed(
1631                "https://provider1.com",
1632                3,
1633                chrono::Duration::seconds(60),
1634                expiration,
1635            );
1636            health_store.mark_failed(
1637                "https://provider1.com",
1638                3,
1639                chrono::Duration::seconds(60),
1640                expiration,
1641            );
1642            health_store.mark_failed(
1643                "https://provider1.com",
1644                3,
1645                chrono::Duration::seconds(60),
1646                expiration,
1647            );
1648
1649            health_store.mark_failed(
1650                "https://provider2.com",
1651                3,
1652                chrono::Duration::seconds(60),
1653                expiration,
1654            );
1655            health_store.mark_failed(
1656                "https://provider2.com",
1657                3,
1658                chrono::Duration::seconds(60),
1659                expiration,
1660            );
1661            health_store.mark_failed(
1662                "https://provider2.com",
1663                3,
1664                chrono::Duration::seconds(60),
1665                expiration,
1666            );
1667
1668            // Both should be paused
1669            assert!(health_store.is_paused("https://provider1.com", 3, expiration));
1670            assert!(health_store.is_paused("https://provider2.com", 3, expiration));
1671
1672            // Should still be able to select (fallback to paused providers)
1673            let url = selector
1674                .get_next_url(&std::collections::HashSet::new())
1675                .unwrap();
1676            assert!(url == "https://provider1.com" || url == "https://provider2.com");
1677        }
1678
1679        #[test]
1680        #[serial]
1681        fn test_select_url_single_provider_excluded() {
1682            RpcHealthStore::instance().clear_all();
1683            let configs = vec![RpcConfig {
1684                url: "https://single-provider.com".to_string(),
1685                weight: 1,
1686                ..Default::default()
1687            }];
1688
1689            let selector = RpcSelector::new_with_defaults(configs).unwrap();
1690
1691            // Exclude the only provider
1692            let mut excluded = std::collections::HashSet::new();
1693            excluded.insert("https://single-provider.com".to_string());
1694
1695            // Should return error
1696            let result = selector.get_next_url(&excluded);
1697            assert!(result.is_err());
1698            assert!(matches!(
1699                result.unwrap_err(),
1700                RpcSelectorError::AllProvidersFailed
1701            ));
1702        }
1703
1704        #[test]
1705        #[serial]
1706        fn test_select_url_all_providers_excluded() {
1707            RpcHealthStore::instance().clear_all();
1708            let configs = vec![
1709                RpcConfig {
1710                    url: "https://provider1.com".to_string(),
1711                    weight: 1,
1712                    ..Default::default()
1713                },
1714                RpcConfig {
1715                    url: "https://provider2.com".to_string(),
1716                    weight: 1,
1717                    ..Default::default()
1718                },
1719            ];
1720
1721            let selector = RpcSelector::new_with_defaults(configs).unwrap();
1722
1723            // Exclude all providers
1724            let mut excluded = std::collections::HashSet::new();
1725            excluded.insert("https://provider1.com".to_string());
1726            excluded.insert("https://provider2.com".to_string());
1727
1728            // Should return error
1729            let result = selector.get_next_url(&excluded);
1730            assert!(result.is_err());
1731            assert!(matches!(
1732                result.unwrap_err(),
1733                RpcSelectorError::AllProvidersFailed
1734            ));
1735        }
1736
1737        #[test]
1738        #[serial]
1739        fn test_select_url_excluded_providers_with_weighted_selection() {
1740            RpcHealthStore::instance().clear_all();
1741            let configs = vec![
1742                RpcConfig {
1743                    url: "https://provider1.com".to_string(),
1744                    weight: 10,
1745                    ..Default::default()
1746                },
1747                RpcConfig {
1748                    url: "https://provider2.com".to_string(),
1749                    weight: 1,
1750                    ..Default::default()
1751                },
1752                RpcConfig {
1753                    url: "https://provider3.com".to_string(),
1754                    weight: 1,
1755                    ..Default::default()
1756                },
1757            ];
1758
1759            let selector = RpcSelector::new_with_defaults(configs).unwrap();
1760
1761            // Exclude provider1 (highest weight)
1762            let mut excluded = std::collections::HashSet::new();
1763            excluded.insert("https://provider1.com".to_string());
1764
1765            // Should select from provider2 or provider3, never provider1
1766            for _ in 0..20 {
1767                let url = selector.get_next_url(&excluded).unwrap();
1768                assert_ne!(url, "https://provider1.com");
1769            }
1770        }
1771    }
1772}