1use rand::Rng;
24use std::collections::HashSet;
25use std::future::Future;
26use std::time::Duration;
27
28use super::rpc_selector::RpcSelector;
29use crate::config::ServerConfig;
30use crate::constants::RETRY_JITTER_PERCENT;
31use crate::metrics::RPC_CALL_LATENCY;
32use std::time::Instant;
33
34pub fn calculate_retry_delay(attempt: u8, base_delay_ms: u64, max_delay_ms: u64) -> Duration {
44 if base_delay_ms == 0 || max_delay_ms == 0 {
45 return Duration::from_millis(0);
46 }
47
48 let exp_backoff = if attempt > 63 {
50 max_delay_ms
51 } else {
52 let multiplier = 1u64.checked_shl(attempt as u32).unwrap_or(u64::MAX);
54 base_delay_ms.saturating_mul(multiplier)
55 };
56
57 let delay_ms = exp_backoff.min(max_delay_ms);
58
59 apply_jitter(delay_ms)
60}
61
62fn apply_jitter(delay_ms: u64) -> Duration {
74 if delay_ms == 0 {
75 return Duration::from_millis(0);
76 }
77
78 let jitter_range = (delay_ms as f64 * RETRY_JITTER_PERCENT).floor() as u64;
80
81 if jitter_range == 0 {
82 return Duration::from_millis(delay_ms);
83 }
84
85 let mut rng = rand::rng();
86 let jitter_value = rng.random_range(0..=jitter_range);
87
88 let final_delay = if rng.random_bool(0.5) {
89 delay_ms.saturating_add(jitter_value)
90 } else {
91 delay_ms.saturating_sub(jitter_value)
92 };
93
94 Duration::from_millis(final_delay)
95}
96
97#[derive(Debug)]
99enum InternalRetryError<E> {
100 NonRetriable(E),
101 RetriesExhausted(E),
102}
103
104#[derive(Debug, Clone)]
106pub struct RetryConfig {
107 pub max_retries: u8,
109 pub max_failovers: u8,
111 pub base_delay_ms: u64,
113 pub max_delay_ms: u64,
115}
116
117impl RetryConfig {
118 pub fn new(max_retries: u8, max_failovers: u8, base_delay_ms: u64, max_delay_ms: u64) -> Self {
130 if (base_delay_ms == 0) != (max_delay_ms == 0) {
132 panic!(
133 "Delay values must be consistent: both zero (no delays) or both non-zero. Got base_delay_ms={base_delay_ms}, max_delay_ms={max_delay_ms}"
134 );
135 }
136
137 if base_delay_ms > 0 && max_delay_ms > 0 && max_delay_ms < base_delay_ms {
139 panic!(
140 "max_delay_ms ({max_delay_ms}) must be >= base_delay_ms ({base_delay_ms}) when both are non-zero"
141 );
142 }
143
144 Self {
145 max_retries,
146 max_failovers,
147 base_delay_ms,
148 max_delay_ms,
149 }
150 }
151
152 pub fn from_env() -> Self {
154 let config = ServerConfig::from_env();
155 Self::new(
156 config.provider_max_retries,
157 config.provider_max_failovers,
158 config.provider_retry_base_delay_ms,
159 config.provider_retry_max_delay_ms,
160 )
161 }
162}
163
164pub async fn retry_rpc_call<P, T, E, F, Fut, I>(
194 selector: &RpcSelector,
195 operation_name: &str,
196 is_retriable_error: impl Fn(&E) -> bool,
197 should_mark_provider_failed: impl Fn(&E) -> bool,
198 provider_initializer: I,
199 operation: F,
200 config: Option<RetryConfig>,
201) -> Result<T, E>
202where
203 P: Clone,
204 E: std::fmt::Display + From<String>,
205 F: Fn(P) -> Fut,
206 Fut: Future<Output = Result<T, E>>,
207 I: Fn(&str) -> Result<P, E>,
208{
209 let config = config.unwrap_or_else(RetryConfig::from_env);
210 let total_providers = selector.provider_count();
211 let max_failovers = std::cmp::min(config.max_failovers as usize, total_providers - 1);
212 let mut failover_count = 0;
213 let mut total_attempts = 0;
214 let mut last_error = None;
215 let mut tried_urls = HashSet::new();
217
218 tracing::debug!(
219 operation_name = %operation_name,
220 max_retries = %config.max_retries,
221 max_failovers = %max_failovers,
222 total_providers = %total_providers,
223 "starting rpc call"
224 );
225
226 while failover_count <= max_failovers && selector.provider_count() > 0 {
228 let (provider, provider_url) =
230 match get_provider(selector, operation_name, &provider_initializer, &tried_urls) {
231 Ok((provider, url)) => {
232 tried_urls.insert(url.clone());
234 (provider, url)
235 }
236 Err(e) => {
237 last_error = Some(e);
238 failover_count += 1;
239
240 if failover_count > max_failovers || selector.provider_count() == 0 {
242 break;
243 }
244
245 selector.mark_current_as_failed();
247 continue;
248 }
249 };
250
251 tracing::debug!(
252 provider_url = %provider_url,
253 operation_name = %operation_name,
254 tried_providers = %tried_urls.len(),
255 "selected provider"
256 );
257
258 match try_with_retries(
260 &provider,
261 &provider_url,
262 operation_name,
263 &operation,
264 &is_retriable_error,
265 &config,
266 &mut total_attempts,
267 )
268 .await
269 {
270 Ok(result) => {
271 tracing::debug!(
272 operation_name = %operation_name,
273 provider_url = %provider_url,
274 total_attempts = %total_attempts,
275 "rpc call succeeded"
276 );
277 return Ok(result);
278 }
279 Err(internal_err) => {
280 match internal_err {
281 InternalRetryError::NonRetriable(original_err) => {
282 if should_mark_provider_failed(&original_err) {
284 tracing::warn!(
285 error = %original_err,
286 provider_url = %provider_url,
287 operation_name = %operation_name,
288 "non-retriable error should mark provider as failed, marking as failed and switching to next provider"
289 );
290 selector.mark_current_as_failed();
291 }
292 return Err(original_err);
293 }
294 InternalRetryError::RetriesExhausted(original_err) => {
295 last_error = Some(original_err);
296
297 tracing::warn!(
299 max_retries = %config.max_retries,
300 provider_url = %provider_url,
301 operation_name = %operation_name,
302 error = %last_error.as_ref().unwrap(),
303 failover_count = %(failover_count + 1),
304 max_failovers = %max_failovers,
305 "all retry attempts failed, marking as failed and switching to next provider"
306 );
307 selector.mark_current_as_failed();
308 failover_count += 1;
309 }
310 }
311 }
312 }
313 }
314
315 match &last_error {
316 Some(e) => {
317 tracing::error!(
318 operation_name = %operation_name,
319 total_attempts = %total_attempts,
320 failover_count = %failover_count,
321 error = %e,
322 "rpc call failed after attempts across providers"
323 );
324 }
325 None => {
326 tracing::error!(
327 operation_name = %operation_name,
328 total_attempts = %total_attempts,
329 failover_count = %failover_count,
330 "rpc call failed after attempts across providers with no error details"
331 );
332 }
333 }
334
335 let error_message = match &last_error {
336 Some(e) => format!(
337 "RPC call '{operation_name}' failed after {total_attempts} total attempts across {failover_count} providers: {e}"
338 ),
339 None => format!(
340 "RPC call '{operation_name}' failed after {total_attempts} total attempts across {failover_count} providers with no error details"
341 )
342 };
343
344 Err(last_error.unwrap_or_else(|| E::from(error_message)))
346}
347
348fn get_provider<P, E, I>(
350 selector: &RpcSelector,
351 operation_name: &str,
352 provider_initializer: &I,
353 excluded_urls: &HashSet<String>,
354) -> Result<(P, String), E>
355where
356 E: std::fmt::Display + From<String>,
357 I: Fn(&str) -> Result<P, E>,
358{
359 let provider_url = selector
361 .get_client(|url| Ok::<_, eyre::Report>(url.to_string()), excluded_urls)
362 .map_err(|e| {
363 let err_msg = format!("Failed to get provider URL for {operation_name}: {e}");
364 tracing::warn!(operation_name = %operation_name, error = %e, "failed to get provider url");
365 E::from(err_msg)
366 })?;
367
368 let provider = provider_initializer(&provider_url).map_err(|e| {
370 tracing::warn!(
371 provider_url = %provider_url,
372 operation_name = %operation_name,
373 error = %e,
374 "failed to initialize provider"
375 );
376 e
377 })?;
378
379 Ok((provider, provider_url))
380}
381
382async fn try_with_retries<P, T, E, F, Fut>(
384 provider: &P,
385 provider_url: &str,
386 operation_name: &str,
387 operation: &F,
388 is_retriable_error: &impl Fn(&E) -> bool,
389 config: &RetryConfig,
390 total_attempts: &mut usize,
391) -> Result<T, InternalRetryError<E>>
392where
393 P: Clone,
394 E: std::fmt::Display + From<String>,
395 F: Fn(P) -> Fut,
396 Fut: Future<Output = Result<T, E>>,
397{
398 if config.max_retries <= 1 {
400 *total_attempts += 1;
401 let start_time = Instant::now();
402 let result = operation(provider.clone())
403 .await
404 .map_err(InternalRetryError::NonRetriable);
405
406 if result.is_ok() {
408 let latency_seconds = start_time.elapsed().as_secs_f64();
409 RPC_CALL_LATENCY
410 .with_label_values(&["unknown", "unknown", operation_name])
411 .observe(latency_seconds);
412 }
413
414 return result;
415 }
416
417 for current_attempt_idx in 0..config.max_retries {
418 *total_attempts += 1;
419
420 let start_time = Instant::now();
422
423 match operation(provider.clone()).await {
424 Ok(result) => {
425 let latency_seconds = start_time.elapsed().as_secs_f64();
427 RPC_CALL_LATENCY
428 .with_label_values(&["unknown", "unknown", operation_name])
429 .observe(latency_seconds);
430
431 tracing::debug!(
432 operation_name = %operation_name,
433 provider_url = %provider_url,
434 attempt = %(current_attempt_idx + 1),
435 max_retries = %config.max_retries,
436 total_attempts = %*total_attempts,
437 latency_seconds = %latency_seconds,
438 "rpc call succeeded"
439 );
440 return Ok(result);
441 }
442 Err(e) => {
443 let is_retriable = is_retriable_error(&e);
444 let is_last_attempt = current_attempt_idx + 1 >= config.max_retries;
445
446 tracing::warn!(
447 operation_name = %operation_name,
448 provider_url = %provider_url,
449 attempt = %(current_attempt_idx + 1),
450 max_retries = %config.max_retries,
451 error = %e,
452 retriable = %is_retriable,
453 "rpc call failed (will retry if retriable)"
454 );
455
456 if !is_retriable {
457 return Err(InternalRetryError::NonRetriable(e));
458 }
459
460 if is_last_attempt {
461 tracing::warn!(
462 max_retries = %config.max_retries,
463 operation_name = %operation_name,
464 provider_url = %provider_url,
465 error = %e,
466 "all retries exhausted"
467 );
468 return Err(InternalRetryError::RetriesExhausted(e));
469 }
470
471 let delay = calculate_retry_delay(
473 current_attempt_idx + 1,
474 config.base_delay_ms,
475 config.max_delay_ms,
476 );
477
478 tracing::debug!(
479 operation_name = %operation_name,
480 provider_url = %provider_url,
481 delay = ?delay,
482 next_attempt = %(current_attempt_idx + 2),
483 max_retries = %config.max_retries,
484 "retrying rpc call after delay"
485 );
486 tokio::time::sleep(delay).await;
487 }
488 }
489 }
490
491 unreachable!(
492 "Loop should have returned if max_retries > 1; max_retries=0 or 1 case is handled above."
493 );
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499 use crate::models::RpcConfig;
500 use crate::services::provider::rpc_health_store::RpcHealthStore;
501 use lazy_static::lazy_static;
502 use serial_test::serial;
503 use std::cmp::Ordering;
504 use std::collections::HashSet;
505 use std::env;
506 use std::sync::atomic::{AtomicU8, Ordering as AtomicOrdering};
507 use std::sync::Arc;
508 use std::sync::Mutex;
509
510 lazy_static! {
512 static ref RETRY_TEST_ENV_MUTEX: Mutex<()> = Mutex::new(());
513 }
514
515 #[derive(Debug, Clone)]
517 struct TestError(String);
518
519 impl std::fmt::Display for TestError {
520 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
521 write!(f, "TestError: {}", self.0)
522 }
523 }
524
525 impl From<String> for TestError {
526 fn from(msg: String) -> Self {
527 TestError(msg)
528 }
529 }
530
531 struct EnvGuard {
533 keys: Vec<String>,
534 old_values: Vec<Option<String>>,
535 }
536
537 impl EnvGuard {
538 fn new() -> Self {
539 Self {
540 keys: Vec::new(),
541 old_values: Vec::new(),
542 }
543 }
544
545 fn set(&mut self, key: &str, value: &str) {
546 let old_value = env::var(key).ok();
547 self.keys.push(key.to_string());
548 self.old_values.push(old_value);
549 env::set_var(key, value);
550 }
551 }
552
553 impl Drop for EnvGuard {
554 fn drop(&mut self) {
555 for i in 0..self.keys.len() {
556 match &self.old_values[i] {
557 Some(value) => env::set_var(&self.keys[i], value),
558 None => env::remove_var(&self.keys[i]),
559 }
560 }
561 }
562 }
563
564 fn setup_test_env() -> EnvGuard {
566 let mut guard = EnvGuard::new();
567 guard.set("API_KEY", "fake-api-key-for-tests-01234567890123456789");
568 guard.set("PROVIDER_MAX_RETRIES", "2");
569 guard.set("PROVIDER_MAX_FAILOVERS", "1");
570 guard.set("PROVIDER_RETRY_BASE_DELAY_MS", "1");
571 guard.set("PROVIDER_RETRY_MAX_DELAY_MS", "5");
572 guard.set("PROVIDER_FAILURE_THRESHOLD", "1");
573 guard.set("REDIS_URL", "redis://localhost:6379");
574 guard.set(
575 "RELAYER_PRIVATE_KEY",
576 "0x1234567890123456789012345678901234567890123456789012345678901234",
577 );
578 RpcHealthStore::instance().clear_all();
581 guard
582 }
583
584 #[test]
585 fn test_calculate_retry_delay() {
586 let base_delay_ms = 10;
588 let max_delay_ms = 10000;
589
590 let expected_backoffs = [
591 10, 20, 40, 80, 160, 320, ];
598
599 for (i, expected) in expected_backoffs.iter().enumerate() {
600 let attempt = i as u8;
601 let delay = calculate_retry_delay(attempt, base_delay_ms, max_delay_ms);
602
603 let min_expected = (*expected as f64 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
604 let max_expected = (*expected as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
605
606 assert!(
607 (min_expected..=max_expected).contains(&delay.as_millis()),
608 "Delay {} outside expected range {}..={}",
609 delay.as_millis(),
610 min_expected,
611 max_expected
612 );
613 }
614
615 let base_delay_ms = 100;
617 let max_delay_ms = 1000;
618 let delay = calculate_retry_delay(4, base_delay_ms, max_delay_ms);
619 let min_expected = (max_delay_ms as f64 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
620 let max_expected = (max_delay_ms as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
621 assert!(
622 (min_expected..=max_expected).contains(&delay.as_millis()),
623 "Delay {} outside expected range {}..={}",
624 delay.as_millis(),
625 min_expected,
626 max_expected
627 );
628
629 assert_eq!(calculate_retry_delay(5, 0, 1000).as_millis(), 0);
631 assert_eq!(calculate_retry_delay(5, 100, 0).as_millis(), 0);
632 assert_eq!(calculate_retry_delay(5, 0, 0).as_millis(), 0);
633
634 let max_delay_ms = 10_000;
636 let delay = calculate_retry_delay(u8::MAX, 1, max_delay_ms);
637 assert!(
638 delay.as_millis()
639 <= (max_delay_ms as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128
640 );
641 }
642
643 #[test]
644 fn test_apply_jitter() {
645 let base_delay = 1000;
646 let jittered = apply_jitter(base_delay);
647
648 let min_expected = (base_delay as f64 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u64;
649 let max_expected = (base_delay as f64 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u64;
650
651 assert!(
652 (min_expected as u128..=max_expected as u128).contains(&jittered.as_millis()),
653 "Jittered value {} outside expected range {}..={}",
654 jittered.as_millis(),
655 min_expected,
656 max_expected
657 );
658
659 assert_eq!(apply_jitter(0).as_millis(), 0);
661
662 for delay in 1..5 {
664 let jittered = apply_jitter(delay);
665 let jitter_range = (delay as f64 * RETRY_JITTER_PERCENT).floor() as u64;
666
667 if jitter_range == 0 {
668 assert_eq!(jittered.as_millis(), delay as u128);
669 } else {
670 let min_expected = delay.saturating_sub(jitter_range);
671 let max_expected = delay.saturating_add(jitter_range);
672 assert!(
673 (min_expected as u128..=max_expected as u128).contains(&jittered.as_millis()),
674 "Jittered value {} outside expected range {}..={}",
675 jittered.as_millis(),
676 min_expected,
677 max_expected
678 );
679 }
680 }
681
682 let base_delay = 10000;
683 let iterations = 200;
684 let mut additions = 0;
685 let mut subtractions = 0;
686
687 for _ in 0..iterations {
688 let jittered = apply_jitter(base_delay);
689 let j_millis = jittered.as_millis();
690 let b_delay = base_delay as u128;
691
692 match j_millis.cmp(&b_delay) {
693 Ordering::Greater => {
694 additions += 1;
695 }
696 Ordering::Less => {
697 subtractions += 1;
698 }
699 Ordering::Equal => {}
700 }
701 }
702
703 assert!(additions > 0, "No additions were observed");
704 assert!(subtractions > 0, "No subtractions were observed");
705 }
706
707 #[test]
708 fn test_retry_config() {
709 let config = RetryConfig::new(5, 2, 100, 5000);
710 assert_eq!(config.max_retries, 5);
711 assert_eq!(config.max_failovers, 2);
712 assert_eq!(config.base_delay_ms, 100);
713 assert_eq!(config.max_delay_ms, 5000);
714 }
715
716 #[test]
717 fn test_retry_config_from_env() {
718 let _lock = RETRY_TEST_ENV_MUTEX
719 .lock()
720 .unwrap_or_else(|e| e.into_inner());
721 let mut guard = setup_test_env();
722 guard.set("REDIS_URL", "redis://localhost:6379");
724 guard.set(
725 "RELAYER_PRIVATE_KEY",
726 "0x1234567890123456789012345678901234567890123456789012345678901234",
727 );
728
729 let config = RetryConfig::from_env();
730 assert_eq!(config.max_retries, 2);
731 assert_eq!(config.max_failovers, 1);
732 assert_eq!(config.base_delay_ms, 1);
733 assert_eq!(config.max_delay_ms, 5);
734 }
735
736 #[test]
737 fn test_calculate_retry_delay_edge_cases() {
738 let delay = calculate_retry_delay(0, 100, 1000);
740 let min_expected = (100.0 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
741 let max_expected = (100.0 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
742 assert!(
743 (min_expected..=max_expected).contains(&delay.as_millis()),
744 "Delay {} outside expected range {}..={}",
745 delay.as_millis(),
746 min_expected,
747 max_expected
748 );
749
750 let delay = calculate_retry_delay(5, 100, 100);
752 let min_expected = (100.0 * (1.0 - RETRY_JITTER_PERCENT)).floor() as u128;
753 let max_expected = (100.0 * (1.0 + RETRY_JITTER_PERCENT)).ceil() as u128;
754 assert!(
755 (min_expected..=max_expected).contains(&delay.as_millis()),
756 "Delay {} outside expected range {}..={}",
757 delay.as_millis(),
758 min_expected,
759 max_expected
760 );
761
762 let delay = calculate_retry_delay(60, 1000, u64::MAX);
764 assert!(delay.as_millis() > 0);
765
766 let delay = calculate_retry_delay(1, 1, 1);
768 assert_eq!(delay.as_millis(), 1);
769 }
770
771 #[test]
772 fn test_retry_config_validation() {
773 let _config = RetryConfig::new(3, 1, 100, 1000);
775 let _config = RetryConfig::new(3, 1, 0, 0); let _config = RetryConfig::new(3, 1, 100, 100); let _config = RetryConfig::new(0, 0, 1, 1); let _config = RetryConfig::new(255, 255, 1, 1000); }
780
781 #[test]
782 #[should_panic(
783 expected = "max_delay_ms (50) must be >= base_delay_ms (100) when both are non-zero"
784 )]
785 fn test_retry_config_validation_panic_delay_ordering() {
786 let _config = RetryConfig::new(3, 1, 100, 50);
788 }
789
790 #[test]
791 #[should_panic(
792 expected = "Delay values must be consistent: both zero (no delays) or both non-zero"
793 )]
794 fn test_retry_config_validation_panic_inconsistent_delays_base_zero() {
795 let _config = RetryConfig::new(3, 1, 0, 1000);
797 }
798
799 #[test]
800 #[should_panic(
801 expected = "Delay values must be consistent: both zero (no delays) or both non-zero"
802 )]
803 fn test_retry_config_validation_panic_inconsistent_delays_max_zero() {
804 let _config = RetryConfig::new(3, 1, 100, 0);
806 }
807
808 #[test]
809 fn test_get_provider() {
810 let _guard = setup_test_env();
811
812 let configs = vec![
813 RpcConfig::new("http://localhost:8545".to_string()),
814 RpcConfig::new("http://localhost:8546".to_string()),
815 ];
816 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
817
818 let initializer =
819 |url: &str| -> Result<String, TestError> { Ok(format!("provider-{url}")) };
820
821 let result = get_provider(&selector, "test_operation", &initializer, &HashSet::new());
822 assert!(result.is_ok());
823 let (provider, url) = result.unwrap();
824 assert!(url == "http://localhost:8545" || url == "http://localhost:8546");
826 assert_eq!(provider, format!("provider-{url}"));
827
828 let initializer = |_: &str| -> Result<String, TestError> {
829 Err(TestError("Failed to initialize".to_string()))
830 };
831
832 let result = get_provider(&selector, "test_operation", &initializer, &HashSet::new());
833 assert!(result.is_err());
834 let err = result.unwrap_err();
835 assert!(format!("{err}").contains("Failed to initialize"));
836 }
837
838 #[tokio::test]
839 async fn test_try_with_retries() {
840 let provider = "test_provider".to_string();
841 let provider_url = "http://localhost:8545";
842 let mut total_attempts = 0;
843 let config = RetryConfig::new(3, 1, 5, 10);
844
845 let operation = |p: String| async move {
846 assert_eq!(p, "test_provider");
847 Ok::<_, TestError>(42)
848 };
849
850 let result = try_with_retries(
851 &provider,
852 provider_url,
853 "test_operation",
854 &operation,
855 &|_| false,
856 &config,
857 &mut total_attempts,
858 )
859 .await;
860
861 assert!(result.is_ok());
862 assert_eq!(result.unwrap(), 42);
863 assert_eq!(total_attempts, 1);
864
865 let attempts = Arc::new(AtomicU8::new(0));
866 let attempts_clone = attempts.clone();
867 let operation = move |_: String| {
868 let attempts = attempts_clone.clone();
869 async move {
870 let current = attempts.fetch_add(1, AtomicOrdering::SeqCst);
871 if current < 2 {
872 Err(TestError("Retriable error".to_string()))
873 } else {
874 Ok(42)
875 }
876 }
877 };
878
879 let mut total_attempts = 0;
880 let result = try_with_retries(
881 &provider,
882 provider_url,
883 "test_operation",
884 &operation,
885 &|_| true,
886 &config,
887 &mut total_attempts,
888 )
889 .await;
890
891 assert!(result.is_ok());
892 assert_eq!(result.unwrap(), 42);
893 assert_eq!(total_attempts, 3);
894
895 let operation = |_: String| async { Err(TestError("Non-retriable error".to_string())) };
897
898 let mut total_attempts = 0;
899 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
900 &provider,
901 provider_url,
902 "test_operation",
903 &operation,
904 &|_| false,
905 &config,
906 &mut total_attempts,
907 )
908 .await;
909
910 assert!(result.is_err());
911 assert_eq!(total_attempts, 1);
912 let err = result.unwrap_err();
913 assert!(matches!(err, InternalRetryError::NonRetriable(_)));
914
915 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
917
918 let mut total_attempts = 0;
919 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
920 &provider,
921 provider_url,
922 "test_operation",
923 &operation,
924 &|_| true,
925 &config,
926 &mut total_attempts,
927 )
928 .await;
929
930 assert!(result.is_err());
931 assert_eq!(total_attempts, 3); let error = result.unwrap_err();
933 assert!(matches!(error, InternalRetryError::RetriesExhausted(_)));
934 }
935
936 #[tokio::test]
937 async fn test_try_with_retries_max_retries_zero() {
938 let provider = "test_provider".to_string();
939 let provider_url = "http://localhost:8545";
940 let mut total_attempts = 0;
941 let config = RetryConfig::new(0, 1, 5, 10);
942
943 let operation = |_p: String| async move { Ok::<_, TestError>(42) };
945
946 let result = try_with_retries(
947 &provider,
948 provider_url,
949 "test_operation",
950 &operation,
951 &|_| false,
952 &config,
953 &mut total_attempts,
954 )
955 .await;
956
957 assert!(result.is_ok());
958 assert_eq!(result.unwrap(), 42);
959
960 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
962
963 let mut total_attempts = 0;
964 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
965 &provider,
966 provider_url,
967 "test_operation",
968 &operation,
969 &|_| true,
970 &config,
971 &mut total_attempts,
972 )
973 .await;
974
975 assert!(result.is_err());
976 let error = result.unwrap_err();
977 assert!(matches!(error, InternalRetryError::NonRetriable(_))); }
979
980 #[tokio::test]
981 async fn test_try_with_retries_max_retries_one() {
982 let provider = "test_provider".to_string();
983 let provider_url = "http://localhost:8545";
984 let mut total_attempts = 0;
985 let config = RetryConfig::new(1, 1, 5, 10);
986
987 let operation = |p: String| async move {
989 assert_eq!(p, "test_provider");
990 Ok::<_, TestError>(42)
991 };
992
993 let result = try_with_retries(
994 &provider,
995 provider_url,
996 "test_operation",
997 &operation,
998 &|_| false,
999 &config,
1000 &mut total_attempts,
1001 )
1002 .await;
1003
1004 assert!(result.is_ok());
1005 assert_eq!(result.unwrap(), 42);
1006
1007 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
1009
1010 let mut total_attempts = 0;
1011 let result: Result<i32, InternalRetryError<TestError>> = try_with_retries(
1012 &provider,
1013 provider_url,
1014 "test_operation",
1015 &operation,
1016 &|_| true,
1017 &config,
1018 &mut total_attempts,
1019 )
1020 .await;
1021
1022 assert!(result.is_err());
1023 let error = result.unwrap_err();
1024 assert!(matches!(error, InternalRetryError::NonRetriable(_))); }
1026
1027 #[tokio::test]
1028 #[serial]
1029 async fn test_non_retriable_error_does_not_mark_provider_failed() {
1030 let _guard = setup_test_env();
1031 RpcHealthStore::instance().clear_all();
1032
1033 let configs = vec![
1035 RpcConfig::new("http://localhost:9986".to_string()),
1036 RpcConfig::new("http://localhost:9985".to_string()),
1037 ];
1038 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1039
1040 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1041
1042 let operation =
1044 |_provider: String| async move { Err(TestError("Non-retriable error".to_string())) };
1045
1046 let config = RetryConfig::new(3, 1, 0, 0);
1047
1048 let initial_available_count = selector.available_provider_count();
1050 assert_eq!(
1051 initial_available_count, 2,
1052 "Both providers should be available after clearing"
1053 );
1054
1055 let result: Result<i32, TestError> = retry_rpc_call(
1056 &selector,
1057 "test_operation",
1058 |_| false, |_| false, provider_initializer,
1061 operation,
1062 Some(config),
1063 )
1064 .await;
1065
1066 assert!(result.is_err());
1067
1068 let final_available_count = selector.available_provider_count();
1070 assert_eq!(
1071 initial_available_count, final_available_count,
1072 "Provider count should remain the same for non-retriable errors"
1073 );
1074 }
1075
1076 #[tokio::test]
1077 #[serial]
1078 async fn test_retriable_error_marks_provider_failed_after_retries_exhausted() {
1079 let _guard = setup_test_env();
1080 RpcHealthStore::instance().clear_all();
1081
1082 let configs = vec![
1084 RpcConfig::new("http://localhost:9984".to_string()),
1085 RpcConfig::new("http://localhost:9983".to_string()),
1086 ];
1087 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1088
1089 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1090
1091 let operation = |_provider: String| async { Err(TestError("Retriable error".to_string())) };
1093
1094 let config = RetryConfig::new(2, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1098 assert_eq!(
1099 initial_available_count, 2,
1100 "Both providers should be available after clearing"
1101 );
1102
1103 let result: Result<i32, TestError> = retry_rpc_call(
1104 &selector,
1105 "test_operation",
1106 |_| true, |_| true, provider_initializer,
1109 operation,
1110 Some(config),
1111 )
1112 .await;
1113
1114 assert!(result.is_err());
1115
1116 let final_available_count = selector.available_provider_count();
1118 assert!(final_available_count < initial_available_count,
1119 "At least one provider should be marked as failed after retriable errors exhaust retries");
1120 }
1121
1122 #[tokio::test]
1123 #[serial]
1124 async fn test_retry_rpc_call_success() {
1125 let _guard = setup_test_env();
1126 RpcHealthStore::instance().clear_all();
1127
1128 let configs = vec![
1129 RpcConfig::new("http://localhost:8545".to_string()),
1130 RpcConfig::new("http://localhost:8546".to_string()),
1131 ];
1132 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1133
1134 let attempts = Arc::new(AtomicU8::new(0));
1135 let attempts_clone = attempts.clone();
1136
1137 let provider_initializer =
1138 |_url: &str| -> Result<String, TestError> { Ok("mock_provider".to_string()) };
1139
1140 let operation = move |_provider: String| {
1141 let attempts = attempts_clone.clone();
1142 async move {
1143 attempts.fetch_add(1, AtomicOrdering::SeqCst);
1144 Ok::<_, TestError>(42)
1145 }
1146 };
1147
1148 let config = RetryConfig::new(1, 1, 0, 0);
1149
1150 let result = retry_rpc_call(
1151 &selector,
1152 "test_operation",
1153 |_| false, |_| false, provider_initializer,
1156 operation,
1157 Some(config),
1158 )
1159 .await;
1160
1161 assert!(result.is_ok(), "Expected OK result but got: {result:?}");
1162 assert_eq!(result.unwrap(), 42);
1163 assert_eq!(attempts.load(AtomicOrdering::SeqCst), 1); }
1165
1166 #[tokio::test]
1167 #[serial]
1168 async fn test_retry_rpc_call_with_provider_failover() {
1169 let _guard = setup_test_env();
1170 RpcHealthStore::instance().clear_all();
1171
1172 let configs = vec![
1173 RpcConfig::new("http://localhost:8545".to_string()),
1174 RpcConfig::new("http://localhost:8546".to_string()),
1175 ];
1176 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1177
1178 let current_provider = Arc::new(Mutex::new(String::new()));
1179 let current_provider_clone = current_provider.clone();
1180
1181 let provider_initializer = move |url: &str| -> Result<String, TestError> {
1182 let mut provider = current_provider_clone.lock().unwrap();
1183 *provider = url.to_string();
1184 Ok(url.to_string())
1185 };
1186
1187 let operation = move |provider: String| async move {
1188 if provider.contains("8545") {
1189 Err(TestError("First provider error".to_string()))
1190 } else {
1191 Ok(42)
1192 }
1193 };
1194
1195 let config = RetryConfig::new(2, 1, 0, 0); let result = retry_rpc_call(
1198 &selector,
1199 "test_operation",
1200 |_| true, |_| true, provider_initializer,
1203 operation,
1204 Some(config),
1205 )
1206 .await;
1207
1208 assert!(result.is_ok(), "Expected OK result but got: {result:?}");
1209 assert_eq!(result.unwrap(), 42);
1210
1211 let final_provider = current_provider.lock().unwrap().clone();
1213 assert!(
1214 final_provider.contains("8546"),
1215 "Wrong provider selected: {final_provider}"
1216 );
1217 }
1218
1219 #[tokio::test]
1220 #[serial]
1221 async fn test_retry_rpc_call_all_providers_fail() {
1222 let _guard = setup_test_env();
1223 RpcHealthStore::instance().clear_all();
1224
1225 let configs = vec![
1226 RpcConfig::new("http://localhost:8545".to_string()),
1227 RpcConfig::new("http://localhost:8546".to_string()),
1228 ];
1229 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1230
1231 let provider_initializer =
1232 |_: &str| -> Result<String, TestError> { Ok("mock_provider".to_string()) };
1233
1234 let operation = |_: String| async { Err(TestError("Always fails".to_string())) };
1235
1236 let config = RetryConfig::new(2, 1, 0, 0); let result: Result<i32, TestError> = retry_rpc_call(
1239 &selector,
1240 "test_operation",
1241 |_| true, |_| false, provider_initializer,
1244 operation,
1245 Some(config),
1246 )
1247 .await;
1248
1249 assert!(result.is_err(), "Expected an error but got: {result:?}");
1250 }
1251
1252 #[tokio::test]
1253 async fn test_retry_rpc_call_with_default_config() {
1254 let (_guard, selector) = {
1255 let _lock = RETRY_TEST_ENV_MUTEX
1256 .lock()
1257 .unwrap_or_else(|e| e.into_inner());
1258 let guard = setup_test_env();
1259
1260 let configs = vec![RpcConfig::new("http://localhost:8545".to_string())];
1261 let selector =
1262 RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1263 (guard, selector)
1264 };
1265
1266 let provider_initializer =
1267 |_url: &str| -> Result<String, TestError> { Ok("mock_provider".to_string()) };
1268
1269 let operation = |_provider: String| async move { Ok::<_, TestError>(42) };
1270
1271 let result = retry_rpc_call(
1273 &selector,
1274 "test_operation",
1275 |_| false,
1276 |_| false,
1277 provider_initializer,
1278 operation,
1279 None, )
1281 .await;
1282
1283 assert!(result.is_ok());
1284 assert_eq!(result.unwrap(), 42);
1285 }
1286
1287 #[tokio::test]
1288 #[serial]
1289 async fn test_retry_rpc_call_provider_initialization_failures() {
1290 let _guard = setup_test_env();
1291 RpcHealthStore::instance().clear_all();
1292
1293 let configs = vec![
1295 RpcConfig::new("http://localhost:9988".to_string()),
1296 RpcConfig::new("http://localhost:9987".to_string()),
1297 ];
1298 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1299
1300 let attempt_count = Arc::new(AtomicU8::new(0));
1301 let attempt_count_clone = attempt_count.clone();
1302
1303 let attempted_urls = Arc::new(std::sync::Mutex::new(Vec::new()));
1305 let attempted_urls_clone = attempted_urls.clone();
1306
1307 let provider_initializer = move |url: &str| -> Result<String, TestError> {
1310 let count = attempt_count_clone.fetch_add(1, AtomicOrdering::SeqCst);
1311 attempted_urls_clone.lock().unwrap().push(url.to_string());
1312 if count == 0 {
1313 Err(TestError("First provider init failed".to_string()))
1315 } else {
1316 Ok(url.to_string())
1317 }
1318 };
1319
1320 let operation = |_provider: String| async move { Ok::<_, TestError>(42) };
1321
1322 let config = RetryConfig::new(2, 1, 0, 0);
1323
1324 let result = retry_rpc_call(
1325 &selector,
1326 "test_operation",
1327 |_| true,
1328 |_| false,
1329 provider_initializer,
1330 operation,
1331 Some(config),
1332 )
1333 .await;
1334
1335 assert!(result.is_ok());
1336 assert_eq!(result.unwrap(), 42);
1337
1338 let final_count = attempt_count.load(AtomicOrdering::SeqCst);
1340 assert_eq!(
1341 final_count, 2,
1342 "Expected exactly 2 provider init attempts, got {final_count}"
1343 );
1344
1345 let urls = attempted_urls.lock().unwrap();
1347 assert_eq!(urls.len(), 2, "Expected 2 URLs attempted, got {urls:?}");
1348 assert_ne!(
1349 urls[0], urls[1],
1350 "Expected different URLs to be tried, got {urls:?}"
1351 );
1352 }
1353
1354 #[test]
1355 fn test_get_provider_selector_errors() {
1356 let _guard = setup_test_env();
1357
1358 let configs = vec![RpcConfig::new("http://localhost:8545".to_string())];
1360 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1361
1362 let _ = selector.get_current_url().unwrap(); selector.mark_current_as_failed(); let provider_initializer =
1367 |url: &str| -> Result<String, TestError> { Ok(format!("provider-{url}")) };
1368
1369 let result = get_provider(
1372 &selector,
1373 "test_operation",
1374 &provider_initializer,
1375 &HashSet::new(),
1376 );
1377 assert!(result.is_ok());
1378 let (provider, url) = result.unwrap();
1379 assert_eq!(url, "http://localhost:8545");
1380 assert_eq!(provider, "provider-http://localhost:8545");
1381 }
1382
1383 #[tokio::test]
1384 #[serial]
1385 async fn test_last_provider_never_marked_as_failed() {
1386 let _guard = setup_test_env();
1387 RpcHealthStore::instance().clear_all();
1388
1389 let unique_url = "http://localhost:9999".to_string();
1391 let configs = vec![RpcConfig::new(unique_url.clone())];
1392 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1393
1394 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1395
1396 let operation = |_provider: String| async { Err(TestError("Always fails".to_string())) };
1398
1399 let config = RetryConfig::new(2, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1403 assert_eq!(
1404 initial_available_count, 1,
1405 "Provider should be available after clearing health store"
1406 );
1407
1408 let result: Result<i32, TestError> = retry_rpc_call(
1409 &selector,
1410 "test_operation",
1411 |_| true, |_| true, provider_initializer,
1414 operation,
1415 Some(config),
1416 )
1417 .await;
1418
1419 assert!(result.is_err());
1420
1421 let final_available_count = selector.available_provider_count();
1423 assert_eq!(
1424 final_available_count, 0,
1425 "Provider should be marked as failed, but selector can still use paused providers"
1426 );
1427 }
1428
1429 #[tokio::test]
1430 #[serial]
1431 async fn test_last_provider_behavior_with_multiple_providers() {
1432 let _guard = setup_test_env();
1433 RpcHealthStore::instance().clear_all();
1434
1435 let configs = vec![
1437 RpcConfig::new("http://localhost:9991".to_string()),
1438 RpcConfig::new("http://localhost:9990".to_string()),
1439 RpcConfig::new("http://localhost:9989".to_string()),
1440 ];
1441 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1442
1443 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1444
1445 let operation = |_provider: String| async { Err(TestError("Always fails".to_string())) };
1447
1448 let config = RetryConfig::new(2, 2, 0, 0); let initial_available_count = selector.available_provider_count();
1452 assert_eq!(
1453 initial_available_count, 3,
1454 "All 3 providers should be available after clearing"
1455 );
1456
1457 let result: Result<i32, TestError> = retry_rpc_call(
1458 &selector,
1459 "test_operation",
1460 |_| true, |_| true, provider_initializer,
1463 operation,
1464 Some(config),
1465 )
1466 .await;
1467
1468 assert!(result.is_err());
1469
1470 let final_available_count = selector.available_provider_count();
1472 assert_eq!(
1473 final_available_count, 0,
1474 "All providers should be marked as failed, but paused providers can still be used"
1475 );
1476 }
1477
1478 #[tokio::test]
1479 #[serial]
1480 async fn test_non_retriable_error_should_mark_provider_failed() {
1481 let _guard = setup_test_env();
1482 RpcHealthStore::instance().clear_all();
1483
1484 let configs = vec![
1486 RpcConfig::new("http://localhost:9995".to_string()),
1487 RpcConfig::new("http://localhost:9994".to_string()),
1488 ];
1489 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1490
1491 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1492
1493 let operation = |_provider: String| async move {
1495 Err(TestError("Critical non-retriable error".to_string()))
1496 };
1497
1498 let config = RetryConfig::new(3, 1, 0, 0);
1499
1500 let initial_available_count = selector.available_provider_count();
1502 assert_eq!(
1503 initial_available_count, 2,
1504 "Both providers should be available after clearing health store"
1505 );
1506
1507 let result: Result<i32, TestError> = retry_rpc_call(
1508 &selector,
1509 "test_operation",
1510 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1513 operation,
1514 Some(config),
1515 )
1516 .await;
1517
1518 assert!(result.is_err());
1519
1520 let final_available_count = selector.available_provider_count();
1522 assert_eq!(final_available_count, 1,
1523 "Provider should be marked as failed when should_mark_provider_failed returns true for non-retriable error");
1524 }
1525
1526 #[tokio::test]
1527 #[serial]
1528 async fn test_non_retriable_error_should_not_mark_provider_failed() {
1529 let _guard = setup_test_env();
1530 RpcHealthStore::instance().clear_all();
1531
1532 let configs = vec![
1534 RpcConfig::new("http://localhost:9997".to_string()),
1535 RpcConfig::new("http://localhost:9996".to_string()),
1536 ];
1537 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1538
1539 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1540
1541 let operation = |_provider: String| async move {
1543 Err(TestError("Minor non-retriable error".to_string()))
1544 };
1545
1546 let config = RetryConfig::new(3, 1, 0, 0);
1547
1548 let initial_available_count = selector.available_provider_count();
1550 assert_eq!(
1551 initial_available_count, 2,
1552 "Both providers should be available after clearing health store"
1553 );
1554
1555 let result: Result<i32, TestError> = retry_rpc_call(
1556 &selector,
1557 "test_operation",
1558 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1561 operation,
1562 Some(config),
1563 )
1564 .await;
1565
1566 assert!(result.is_err());
1567
1568 let final_available_count = selector.available_provider_count();
1570 assert_eq!(final_available_count, initial_available_count,
1571 "Provider should NOT be marked as failed when should_mark_provider_failed returns false for non-retriable error");
1572 }
1573
1574 #[tokio::test]
1575 #[serial]
1576 async fn test_retriable_error_ignores_should_mark_provider_failed() {
1577 let _guard = setup_test_env();
1578 RpcHealthStore::instance().clear_all();
1579
1580 let configs = vec![
1582 RpcConfig::new("http://localhost:9982".to_string()),
1583 RpcConfig::new("http://localhost:9981".to_string()),
1584 ];
1585 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1586
1587 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1588
1589 let operation =
1591 |_provider: String| async { Err(TestError("Retriable network error".to_string())) };
1592
1593 let config = RetryConfig::new(2, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1597 assert_eq!(
1598 initial_available_count, 2,
1599 "Both providers should be available after clearing"
1600 );
1601
1602 let result: Result<i32, TestError> = retry_rpc_call(
1603 &selector,
1604 "test_operation",
1605 |_| true, |_| false, provider_initializer,
1608 operation,
1609 Some(config),
1610 )
1611 .await;
1612
1613 assert!(result.is_err());
1614
1615 let final_available_count = selector.available_provider_count();
1618 assert!(final_available_count < initial_available_count,
1619 "Provider should be marked as failed when retriable errors exhaust retries, regardless of should_mark_provider_failed");
1620 }
1621
1622 #[tokio::test]
1623 #[serial]
1624 async fn test_mixed_error_scenarios_with_different_marking_behavior() {
1625 let _guard = setup_test_env();
1626 RpcHealthStore::instance().clear_all();
1627
1628 let configs = vec![
1631 RpcConfig::new("http://localhost:9993".to_string()),
1632 RpcConfig::new("http://localhost:9992".to_string()),
1633 ];
1634 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1635
1636 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1637
1638 let operation =
1639 |_provider: String| async move { Err(TestError("Critical network error".to_string())) };
1640
1641 let config = RetryConfig::new(1, 1, 0, 0);
1642 let initial_count = selector.available_provider_count();
1643 assert_eq!(
1644 initial_count, 2,
1645 "Both providers should be available after clearing"
1646 );
1647
1648 let result: Result<i32, TestError> = retry_rpc_call(
1649 &selector,
1650 "test_operation",
1651 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1654 operation,
1655 Some(config.clone()),
1656 )
1657 .await;
1658
1659 assert!(result.is_err());
1660 let after_critical_count = selector.available_provider_count();
1661 assert_eq!(
1662 after_critical_count,
1663 initial_count - 1,
1664 "Critical error should mark provider as failed"
1665 );
1666
1667 let operation =
1669 |_provider: String| async move { Err(TestError("Minor validation error".to_string())) };
1670
1671 let result: Result<i32, TestError> = retry_rpc_call(
1672 &selector,
1673 "test_operation",
1674 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1677 operation,
1678 Some(config),
1679 )
1680 .await;
1681
1682 assert!(result.is_err());
1683 let final_count = selector.available_provider_count();
1684 assert_eq!(
1685 final_count, after_critical_count,
1686 "Minor error should NOT mark provider as failed"
1687 );
1688 }
1689
1690 #[tokio::test]
1691 #[serial]
1692 async fn test_should_mark_provider_failed_respects_last_provider_protection() {
1693 let _guard = setup_test_env();
1694 RpcHealthStore::instance().clear_all();
1695
1696 let unique_url = "http://localhost:9998".to_string();
1698 let configs = vec![RpcConfig::new(unique_url.clone())];
1699 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1700
1701 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1702
1703 let operation =
1705 |_provider: String| async move { Err(TestError("Critical network error".to_string())) };
1706
1707 let config = RetryConfig::new(1, 1, 0, 0);
1708
1709 let initial_available_count = selector.available_provider_count();
1711 assert_eq!(
1712 initial_available_count, 1,
1713 "Provider should be available after clearing health store"
1714 );
1715
1716 let result: Result<i32, TestError> = retry_rpc_call(
1717 &selector,
1718 "test_operation",
1719 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1722 operation,
1723 Some(config),
1724 )
1725 .await;
1726
1727 assert!(result.is_err());
1728
1729 let final_available_count = selector.available_provider_count();
1731 assert_eq!(
1732 final_available_count, 0,
1733 "Provider should be marked as failed, but selector can still use paused providers"
1734 );
1735 }
1736
1737 #[tokio::test]
1738 #[serial]
1739 async fn test_should_mark_provider_failed_with_multiple_providers_last_protection() {
1740 let _guard = setup_test_env();
1741 RpcHealthStore::instance().clear_all();
1742
1743 let configs = vec![
1745 RpcConfig::new("http://localhost:8545".to_string()),
1746 RpcConfig::new("http://localhost:8546".to_string()),
1747 ];
1748 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1749
1750 let attempt_count = Arc::new(AtomicU8::new(0));
1751 let attempt_count_clone = attempt_count.clone();
1752
1753 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1754
1755 let operation = move |_provider: String| {
1757 let attempt_count = attempt_count_clone.clone();
1758 async move {
1759 let count = attempt_count.fetch_add(1, AtomicOrdering::SeqCst);
1760 Err(TestError(format!("Critical error #{count}")))
1761 }
1762 };
1763
1764 let config = RetryConfig::new(1, 1, 0, 0); let initial_available_count = selector.available_provider_count();
1768 assert_eq!(initial_available_count, 2);
1769
1770 let result: Result<i32, TestError> = retry_rpc_call(
1771 &selector,
1772 "test_operation",
1773 |_| false, |e| e.0.contains("Critical"), provider_initializer,
1776 operation,
1777 Some(config),
1778 )
1779 .await;
1780
1781 assert!(result.is_err());
1782
1783 let final_available_count = selector.available_provider_count();
1785 assert_eq!(
1786 final_available_count, 1,
1787 "First provider should be marked as failed, second provider remains available"
1788 );
1789 }
1790
1791 #[tokio::test]
1792 #[serial]
1793 async fn test_tried_urls_tracking_prevents_duplicate_selection() {
1794 let _guard = setup_test_env();
1795 RpcHealthStore::instance().clear_all();
1796
1797 let configs = vec![
1798 RpcConfig::new("http://localhost:8545".to_string()),
1799 RpcConfig::new("http://localhost:8546".to_string()),
1800 RpcConfig::new("http://localhost:8547".to_string()),
1801 ];
1802 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1803
1804 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1805
1806 let operation = |provider: String| async move {
1808 if provider.contains("8545") {
1809 Err(TestError("Provider 1 failed".to_string()))
1810 } else if provider.contains("8546") {
1811 Err(TestError("Provider 2 failed".to_string()))
1812 } else {
1813 Ok(42)
1814 }
1815 };
1816
1817 let config = RetryConfig::new(2, 10, 0, 0); let result: Result<i32, TestError> = retry_rpc_call(
1820 &selector,
1821 "test_operation",
1822 |_| true, |_| true, provider_initializer,
1825 operation,
1826 Some(config),
1827 )
1828 .await;
1829
1830 assert!(result.is_ok());
1831 assert_eq!(result.unwrap(), 42);
1832 }
1833
1834 #[tokio::test]
1835 #[serial]
1836 async fn test_all_providers_tried_returns_error() {
1837 let _guard = setup_test_env();
1838 RpcHealthStore::instance().clear_all();
1839
1840 let configs = vec![
1841 RpcConfig::new("http://localhost:8545".to_string()),
1842 RpcConfig::new("http://localhost:8546".to_string()),
1843 ];
1844 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1845
1846 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1847
1848 let operation = |_provider: String| async { Err(TestError("Always fails".to_string())) };
1850
1851 let config = RetryConfig::new(2, 10, 0, 0); let result: Result<i32, TestError> = retry_rpc_call(
1854 &selector,
1855 "test_operation",
1856 |_| true, |_| true, provider_initializer,
1859 operation,
1860 Some(config),
1861 )
1862 .await;
1863
1864 assert!(result.is_err());
1865 assert_eq!(
1867 selector.available_provider_count(),
1868 0,
1869 "Both providers should be marked as failed"
1870 );
1871 }
1872
1873 #[tokio::test]
1874 #[serial]
1875 async fn test_tried_urls_passed_to_selector() {
1876 let _guard = setup_test_env();
1877 RpcHealthStore::instance().clear_all();
1878
1879 let url1 = "http://localhost:9980".to_string();
1881 let url2 = "http://localhost:9979".to_string();
1882 let url3 = "http://localhost:9978".to_string();
1883 let configs = vec![
1884 RpcConfig::new(url1.clone()),
1885 RpcConfig::new(url2.clone()),
1886 RpcConfig::new(url3.clone()),
1887 ];
1888 let selector = RpcSelector::new_with_defaults(configs).expect("Failed to create selector");
1889
1890 let provider_initializer = |url: &str| -> Result<String, TestError> { Ok(url.to_string()) };
1891
1892 let selected_providers = Arc::new(Mutex::new(Vec::new()));
1894 let selected_providers_clone = selected_providers.clone();
1895 let url3_clone = url3.clone();
1896
1897 let operation = move |provider: String| {
1898 let selected = selected_providers_clone.clone();
1899 let url3 = url3_clone.clone();
1900 async move {
1901 let mut selected_guard = selected.lock().unwrap();
1902 selected_guard.push(provider.clone());
1903
1904 if provider == url3 {
1906 Ok(42)
1907 } else {
1908 Err(TestError("Provider failed".to_string()))
1909 }
1910 }
1911 };
1912
1913 let config = RetryConfig::new(2, 10, 0, 0); let result: Result<i32, TestError> = retry_rpc_call(
1916 &selector,
1917 "test_operation",
1918 |_| true,
1919 |_| true,
1920 provider_initializer,
1921 operation,
1922 Some(config),
1923 )
1924 .await;
1925
1926 assert!(
1927 result.is_ok(),
1928 "Operation should succeed eventually, got error: {result:?}"
1929 );
1930 let selected = selected_providers.lock().unwrap();
1931 let unique_providers: HashSet<_> = selected.iter().collect();
1933 assert!(
1934 !unique_providers.is_empty(),
1935 "Should have tried at least 1 provider: {selected:?}"
1936 );
1937 assert!(
1939 unique_providers.contains(&url3),
1940 "Should have tried provider 3: {selected:?}"
1941 );
1942 assert!(
1946 !selected.is_empty(),
1947 "Should have at least 1 total attempt, got: {}",
1948 selected.len()
1949 );
1950 }
1951}