1use crate::{
7 constants::WORKER_DEFAULT_MAXIMUM_RETRIES,
8 domain::{get_network_relayer, Relayer},
9 jobs::{handle_result, Job, JobProducerTrait, RelayerHealthCheck},
10 models::{
11 produce_relayer_enabled_payload, DefaultAppState, DisabledReason, NetworkRepoModel,
12 NotificationRepoModel, RelayerRepoModel, SignerRepoModel, ThinDataAppState,
13 TransactionRepoModel,
14 },
15 observability::request_id::set_request_id,
16 queues::{HandlerError, WorkerContext},
17 repositories::{
18 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
19 Repository, TransactionCounterTrait, TransactionRepository,
20 },
21 utils::calculate_scheduled_timestamp,
22};
23use actix_web::web::ThinData;
24use eyre::Result;
25use std::time::Duration;
26use tracing::{debug, info, instrument, warn};
27
28#[instrument(
53 level = "debug",
54 skip(job, app_state),
55 fields(
56 request_id = ?job.request_id,
57 job_id = %job.message_id,
58 job_type = %job.job_type.to_string(),
59 attempt = %ctx.attempt,
60 relayer_id = %job.data.relayer_id,
61 task_id = %ctx.task_id,
62 )
63)]
64pub async fn relayer_health_check_handler(
65 job: Job<RelayerHealthCheck>,
66 app_state: ThinData<DefaultAppState>,
67 ctx: WorkerContext,
68) -> Result<(), HandlerError> {
69 if let Some(request_id) = job.request_id.clone() {
70 set_request_id(request_id);
71 }
72
73 relayer_health_check_handler_impl(job, app_state, ctx).await
74}
75
76#[allow(clippy::type_complexity)]
78async fn relayer_health_check_handler_impl<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
79 job: Job<RelayerHealthCheck>,
80 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
81 ctx: WorkerContext,
82) -> Result<(), HandlerError>
83where
84 J: JobProducerTrait + Send + Sync + 'static,
85 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
86 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
87 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
88 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
89 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
90 TCR: TransactionCounterTrait + Send + Sync + 'static,
91 PR: PluginRepositoryTrait + Send + Sync + 'static,
92 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
93{
94 let result = check_and_reenable_relayer(job.data, &app_state).await;
95 handle_result(
96 result,
97 &ctx,
98 "relayer_health_check",
99 WORKER_DEFAULT_MAXIMUM_RETRIES,
100 )
101}
102
103async fn check_and_reenable_relayer<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
104 data: RelayerHealthCheck,
105 app_state: &ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
106) -> Result<()>
107where
108 J: JobProducerTrait + Send + Sync + 'static,
109 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
110 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
111 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
112 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
113 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
114 TCR: TransactionCounterTrait + Send + Sync + 'static,
115 PR: PluginRepositoryTrait + Send + Sync + 'static,
116 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
117{
118 let relayer_id = data.relayer_id.clone();
119
120 debug!(
121 relayer_id = %relayer_id,
122 retry_count = data.retry_count,
123 "Running health check on disabled relayer"
124 );
125
126 let relayer = app_state
128 .relayer_repository
129 .get_by_id(relayer_id.clone())
130 .await
131 .map_err(|e| eyre::eyre!("Failed to get relayer: {}", e))?;
132
133 if !relayer.system_disabled {
134 info!(
135 relayer_id = %relayer_id,
136 "Relayer is not disabled, skipping health check"
137 );
138 return Ok(());
139 }
140
141 let relayer_service = get_network_relayer(relayer_id.clone(), app_state)
143 .await
144 .map_err(|e| eyre::eyre!("Failed to get relayer: {}", e))?;
145
146 match relayer_service.check_health().await {
148 Ok(_) => {
149 info!(
151 relayer_id = %relayer_id,
152 retry_count = data.retry_count,
153 "Health checks passed, re-enabling relayer"
154 );
155
156 let enabled_relayer = app_state
158 .relayer_repository
159 .enable_relayer(relayer_id.clone())
160 .await
161 .map_err(|e| eyre::eyre!("Failed to enable relayer: {}", e))?;
162
163 if let Some(notification_id) = &enabled_relayer.notification_id {
165 app_state
166 .job_producer
167 .produce_send_notification_job(
168 produce_relayer_enabled_payload(
169 notification_id,
170 &enabled_relayer,
171 data.retry_count,
172 ),
173 None,
174 )
175 .await
176 .map_err(|e| eyre::eyre!("Failed to send notification: {}", e))?;
177
178 info!(
179 relayer_id = %relayer_id,
180 notification_id = %notification_id,
181 "Sent relayer recovery notification"
182 );
183 }
184
185 Ok(())
186 }
187 Err(failures) => {
188 let reason = DisabledReason::from_health_failures(failures).unwrap_or_else(|| {
190 DisabledReason::RpcValidationFailed("Unknown error".to_string())
191 });
192
193 warn!(
194 relayer_id = %relayer_id,
195 retry_count = data.retry_count,
196 reason = %reason,
197 "Health checks failed, scheduling retry"
198 );
199
200 let should_update = match &relayer.disabled_reason {
203 Some(old_reason) => !old_reason.same_variant(&reason),
204 None => true, };
206
207 if should_update {
208 debug!(
209 relayer_id = %relayer_id,
210 old_reason = ?relayer.disabled_reason,
211 new_reason = %reason,
212 "Disabled reason variant has changed, updating"
213 );
214
215 app_state
216 .relayer_repository
217 .disable_relayer(relayer_id.clone(), reason.clone())
218 .await
219 .map_err(|e| eyre::eyre!("Failed to update disabled reason: {}", e))?;
220 } else {
221 debug!(
222 relayer_id = %relayer_id,
223 reason = %reason,
224 "Disabled reason variant unchanged, skipping update"
225 );
226 }
227
228 let delay = calculate_backoff_delay(data.retry_count);
230
231 debug!(
232 relayer_id = %relayer_id,
233 next_retry = data.retry_count + 1,
234 delay_seconds = delay.as_secs(),
235 "Scheduling next health check attempt"
236 );
237
238 app_state
240 .job_producer
241 .produce_relayer_health_check_job(
242 RelayerHealthCheck::with_retry_count(relayer_id, data.retry_count + 1),
243 Some(calculate_scheduled_timestamp(delay.as_secs() as i64)),
244 )
245 .await
246 .map_err(|e| eyre::eyre!("Failed to schedule retry: {}", e))?;
247
248 Ok(())
249 }
250 }
251}
252
253fn calculate_backoff_delay(retry_count: u32) -> Duration {
262 let seconds = match retry_count {
263 0 => 10,
264 1 => 20,
265 2 => 30,
266 3 => 45,
267 _ => 60, };
269 Duration::from_secs(seconds)
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275 use crate::models::{
276 DisabledReason, NetworkType, RelayerEvmPolicy, RelayerNetworkPolicy, RelayerRepoModel,
277 };
278
279 #[test]
280 fn test_calculate_backoff_delay() {
281 assert_eq!(calculate_backoff_delay(0), Duration::from_secs(10)); assert_eq!(calculate_backoff_delay(1), Duration::from_secs(20)); assert_eq!(calculate_backoff_delay(2), Duration::from_secs(30)); assert_eq!(calculate_backoff_delay(3), Duration::from_secs(45)); assert_eq!(calculate_backoff_delay(4), Duration::from_secs(60)); assert_eq!(calculate_backoff_delay(10), Duration::from_secs(60)); assert_eq!(calculate_backoff_delay(100), Duration::from_secs(60)); }
289
290 #[test]
291 fn test_relayer_health_check_creation() {
292 let health_check = RelayerHealthCheck::new("test-relayer".to_string());
293 assert_eq!(health_check.relayer_id, "test-relayer");
294 assert_eq!(health_check.retry_count, 0);
295
296 let health_check_with_retry =
297 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), 3);
298 assert_eq!(health_check_with_retry.relayer_id, "test-relayer");
299 assert_eq!(health_check_with_retry.retry_count, 3);
300 }
301
302 fn create_disabled_relayer(id: &str) -> RelayerRepoModel {
303 RelayerRepoModel {
304 id: id.to_string(),
305 name: format!("Relayer {id}"),
306 network: "sepolia".to_string(),
307 paused: false,
308 network_type: NetworkType::Evm,
309 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
310 gas_price_cap: None,
311 whitelist_receivers: None,
312 eip1559_pricing: Some(false),
313 private_transactions: Some(false),
314 min_balance: Some(0),
315 gas_limit_estimation: Some(false),
316 }),
317 signer_id: "test-signer".to_string(),
318 address: "0x742d35Cc6634C0532925a3b8D8C2e48a73F6ba2E".to_string(),
319 notification_id: Some("test-notification".to_string()),
320 system_disabled: true,
321 disabled_reason: Some(DisabledReason::RpcValidationFailed(
322 "RPC unavailable".to_string(),
323 )),
324 custom_rpc_urls: None,
325 }
326 }
327
328 #[tokio::test]
329 async fn test_health_check_data_structure() {
330 let health_check = RelayerHealthCheck::new("test-relayer".to_string());
332 assert_eq!(health_check.relayer_id, "test-relayer");
333 assert_eq!(health_check.retry_count, 0);
334
335 let health_check_retry =
337 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), 5);
338 assert_eq!(health_check_retry.retry_count, 5);
339
340 let expected_delay = calculate_backoff_delay(5);
342 assert_eq!(expected_delay, Duration::from_secs(60)); }
344
345 #[tokio::test]
347 async fn test_relayer_health_check_handler_impl_exits_on_enabled() {
348 use crate::jobs::MockJobProducerTrait;
349 use crate::models::AppState;
350 use crate::repositories::{
351 ApiKeyRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage,
352 PluginRepositoryStorage, RelayerRepositoryStorage, Repository, SignerRepositoryStorage,
353 TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
354 };
355 use std::sync::Arc;
356
357 let mock_job_producer = MockJobProducerTrait::new();
359
360 let relayer_repo = Arc::new(RelayerRepositoryStorage::new_in_memory());
362
363 let mut relayer = create_disabled_relayer("test-handler-enabled");
365 relayer.system_disabled = false;
366 relayer.disabled_reason = None;
367 relayer_repo.create(relayer).await.unwrap();
368
369 let app_state = actix_web::web::ThinData(AppState {
371 relayer_repository: relayer_repo,
372 transaction_repository: Arc::new(TransactionRepositoryStorage::new_in_memory()),
373 signer_repository: Arc::new(SignerRepositoryStorage::new_in_memory()),
374 notification_repository: Arc::new(NotificationRepositoryStorage::new_in_memory()),
375 network_repository: Arc::new(NetworkRepositoryStorage::new_in_memory()),
376 transaction_counter_store: Arc::new(
377 TransactionCounterRepositoryStorage::new_in_memory(),
378 ),
379 job_producer: Arc::new(mock_job_producer),
380 plugin_repository: Arc::new(PluginRepositoryStorage::new_in_memory()),
381 api_key_repository: Arc::new(ApiKeyRepositoryStorage::new_in_memory()),
382 });
383
384 let health_check = RelayerHealthCheck::new("test-handler-enabled".to_string());
386 let job = Job::new(crate::jobs::JobType::RelayerHealthCheck, health_check);
387 let ctx = WorkerContext::new(1, "test-task".into());
388
389 let result = relayer_health_check_handler_impl(job, app_state, ctx).await;
391
392 assert!(result.is_ok());
394 }
395
396 #[tokio::test]
397 async fn test_relayer_health_check_backoff_progression() {
398 let delays: Vec<Duration> = (0..6).map(calculate_backoff_delay).collect();
400
401 assert_eq!(delays[0], Duration::from_secs(10)); assert_eq!(delays[1], Duration::from_secs(20)); assert_eq!(delays[2], Duration::from_secs(30)); assert_eq!(delays[3], Duration::from_secs(45)); assert_eq!(delays[4], Duration::from_secs(60)); assert_eq!(delays[5], Duration::from_secs(60)); for i in 0..4 {
411 assert!(
412 delays[i] < delays[i + 1],
413 "Delay should increase with retry count"
414 );
415 }
416
417 assert_eq!(delays[4], delays[5], "Delay should cap at 60 seconds");
419 }
420
421 #[tokio::test]
422 async fn test_disabled_reason_is_preserved() {
423 use crate::repositories::RelayerRepositoryStorage;
425 let repo = RelayerRepositoryStorage::new_in_memory();
426
427 let relayer = create_disabled_relayer("test-relayer-2");
428 let disabled_reason = relayer.disabled_reason.clone();
429
430 repo.create(relayer).await.unwrap();
431
432 let retrieved = repo.get_by_id("test-relayer-2".to_string()).await.unwrap();
434
435 assert!(retrieved.system_disabled);
436 assert_eq!(retrieved.disabled_reason, disabled_reason);
437
438 if let Some(reason) = &retrieved.disabled_reason {
440 let description = reason.description();
441 assert!(description.contains("RPC"));
442 }
443 }
444
445 #[tokio::test]
446 async fn test_check_and_reenable_relayer_exits_early_if_not_disabled() {
447 use crate::jobs::MockJobProducerTrait;
448 use crate::models::AppState;
449 use crate::repositories::{
450 ApiKeyRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage,
451 PluginRepositoryStorage, RelayerRepositoryStorage, Repository, SignerRepositoryStorage,
452 TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
453 };
454 use std::sync::Arc;
455
456 let relayer_repo = Arc::new(RelayerRepositoryStorage::new_in_memory());
458
459 let mut relayer = create_disabled_relayer("test-check-enabled");
461 relayer.system_disabled = false;
462 relayer.disabled_reason = None;
463 relayer_repo.create(relayer).await.unwrap();
464
465 let mock_job_producer = MockJobProducerTrait::new();
467
468 let app_state = AppState {
470 relayer_repository: relayer_repo.clone(),
471 transaction_repository: Arc::new(TransactionRepositoryStorage::new_in_memory()),
472 signer_repository: Arc::new(SignerRepositoryStorage::new_in_memory()),
473 notification_repository: Arc::new(NotificationRepositoryStorage::new_in_memory()),
474 network_repository: Arc::new(NetworkRepositoryStorage::new_in_memory()),
475 transaction_counter_store: Arc::new(
476 TransactionCounterRepositoryStorage::new_in_memory(),
477 ),
478 job_producer: Arc::new(mock_job_producer),
479 plugin_repository: Arc::new(PluginRepositoryStorage::new_in_memory()),
480 api_key_repository: Arc::new(ApiKeyRepositoryStorage::new_in_memory()),
481 };
482
483 let health_check = RelayerHealthCheck::new("test-check-enabled".to_string());
485
486 let thin_app_state = actix_web::web::ThinData(app_state);
488
489 let result = check_and_reenable_relayer(health_check, &thin_app_state).await;
491
492 assert!(result.is_ok());
494
495 let retrieved = relayer_repo
497 .get_by_id("test-check-enabled".to_string())
498 .await
499 .unwrap();
500 assert!(!retrieved.system_disabled);
501 assert!(retrieved.disabled_reason.is_none());
502 }
503
504 #[tokio::test]
505 async fn test_check_and_reenable_variant_comparison() {
506 use crate::models::DisabledReason;
508
509 let reason1 = DisabledReason::RpcValidationFailed("Error A".to_string());
511 let reason2 = DisabledReason::RpcValidationFailed("Error B".to_string());
512 assert!(reason1.same_variant(&reason2));
513
514 let reason3 = DisabledReason::NonceSyncFailed("Error".to_string());
516 assert!(!reason1.same_variant(&reason3));
517
518 let multi1 = DisabledReason::Multiple(vec![
520 DisabledReason::RpcValidationFailed("A".to_string()),
521 DisabledReason::NonceSyncFailed("B".to_string()),
522 ]);
523 let multi2 = DisabledReason::Multiple(vec![
524 DisabledReason::RpcValidationFailed("C".to_string()),
525 DisabledReason::NonceSyncFailed("D".to_string()),
526 ]);
527 assert!(multi1.same_variant(&multi2));
528
529 let multi3 = DisabledReason::Multiple(vec![
531 DisabledReason::RpcValidationFailed("A".to_string()),
532 DisabledReason::BalanceCheckFailed("B".to_string()),
533 ]);
534 assert!(!multi1.same_variant(&multi3));
535 }
536
537 #[tokio::test]
538 async fn test_backoff_delay_calculation_edge_cases() {
539 let delay0 = calculate_backoff_delay(0);
543 assert_eq!(delay0, Duration::from_secs(10));
544
545 let delay_large = calculate_backoff_delay(100);
547 assert_eq!(delay_large, Duration::from_secs(60));
548
549 let mut prev_delay = Duration::from_secs(0);
551 for retry in 0..10 {
552 let delay = calculate_backoff_delay(retry);
553 if delay < Duration::from_secs(60) {
554 assert!(delay > prev_delay, "Retry {retry}: delay should increase");
556 } else {
557 assert_eq!(
559 delay,
560 Duration::from_secs(60),
561 "Retry {retry}: should cap at 60s"
562 );
563 }
564 prev_delay = delay;
565 }
566 }
567
568 #[tokio::test]
569 async fn test_disabled_reason_from_health_failures() {
570 use crate::models::{DisabledReason, HealthCheckFailure};
571
572 let empty_result = DisabledReason::from_health_failures(vec![]);
574 assert!(empty_result.is_none());
575
576 let single_failure = vec![HealthCheckFailure::RpcValidationFailed(
578 "RPC down".to_string(),
579 )];
580 let single_result = DisabledReason::from_health_failures(single_failure);
581 assert!(single_result.is_some());
582 match single_result.unwrap() {
583 DisabledReason::RpcValidationFailed(msg) => {
584 assert_eq!(msg, "RPC down");
585 }
586 _ => panic!("Expected RpcValidationFailed variant"),
587 }
588
589 let multiple_failures = vec![
591 HealthCheckFailure::RpcValidationFailed("RPC error".to_string()),
592 HealthCheckFailure::NonceSyncFailed("Nonce error".to_string()),
593 ];
594 let multiple_result = DisabledReason::from_health_failures(multiple_failures);
595 assert!(multiple_result.is_some());
596 match multiple_result.unwrap() {
597 DisabledReason::Multiple(reasons) => {
598 assert_eq!(reasons.len(), 2);
599 assert!(matches!(reasons[0], DisabledReason::RpcValidationFailed(_)));
600 assert!(matches!(reasons[1], DisabledReason::NonceSyncFailed(_)));
601 }
602 _ => panic!("Expected Multiple variant"),
603 }
604 }
605
606 #[tokio::test]
607 async fn test_relayer_health_check_retry_count_increments() {
608 let retry_counts = vec![0, 1, 2, 5, 10];
610
611 for retry_count in retry_counts {
612 let health_check =
613 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), retry_count);
614
615 assert_eq!(health_check.retry_count, retry_count);
617
618 let next_health_check =
620 RelayerHealthCheck::with_retry_count("test-relayer".to_string(), retry_count + 1);
621 assert_eq!(next_health_check.retry_count, retry_count + 1);
622
623 let current_delay = calculate_backoff_delay(retry_count);
625 let next_delay = calculate_backoff_delay(retry_count + 1);
626
627 if current_delay < Duration::from_secs(60) {
628 assert!(next_delay >= current_delay);
629 } else {
630 assert_eq!(next_delay, Duration::from_secs(60));
631 }
632 }
633 }
634
635 #[tokio::test]
636 async fn test_repository_enable_disable_operations() {
637 use crate::models::DisabledReason;
638 use crate::repositories::{RelayerRepositoryStorage, Repository};
639
640 let repo = RelayerRepositoryStorage::new_in_memory();
641
642 let mut relayer = create_disabled_relayer("test-enable-disable");
644 relayer.system_disabled = false;
645 relayer.disabled_reason = None;
646 repo.create(relayer).await.unwrap();
647
648 let reason = DisabledReason::RpcValidationFailed("Test error".to_string());
650 let disabled = repo
651 .disable_relayer("test-enable-disable".to_string(), reason.clone())
652 .await
653 .unwrap();
654
655 assert!(disabled.system_disabled);
656 assert_eq!(disabled.disabled_reason, Some(reason));
657
658 let enabled = repo
660 .enable_relayer("test-enable-disable".to_string())
661 .await
662 .unwrap();
663
664 assert!(!enabled.system_disabled);
665 assert!(enabled.disabled_reason.is_none());
666
667 let retrieved = repo
669 .get_by_id("test-enable-disable".to_string())
670 .await
671 .unwrap();
672 assert!(!retrieved.system_disabled);
673 assert!(retrieved.disabled_reason.is_none());
674 }
675
676 #[tokio::test]
677 async fn test_disabled_reason_safe_description() {
678 use crate::models::DisabledReason;
679
680 let reasons = vec![
682 DisabledReason::NonceSyncFailed("Error with API key abc123".to_string()),
683 DisabledReason::RpcValidationFailed(
684 "RPC error: http://secret-rpc.com:8545".to_string(),
685 ),
686 DisabledReason::BalanceCheckFailed("Balance: 1.5 ETH at address 0x123...".to_string()),
687 ];
688
689 for reason in reasons {
690 let safe_desc = reason.safe_description();
691
692 assert!(!safe_desc.contains("abc123"));
694 assert!(!safe_desc.contains("http://"));
695 assert!(!safe_desc.contains("0x123"));
696 assert!(!safe_desc.contains("1.5 ETH"));
697
698 assert!(!safe_desc.is_empty());
700 }
701
702 let multiple = DisabledReason::Multiple(vec![
704 DisabledReason::RpcValidationFailed("Secret RPC info".to_string()),
705 DisabledReason::NonceSyncFailed("Secret nonce info".to_string()),
706 ]);
707
708 let safe_desc = multiple.safe_description();
709 assert!(!safe_desc.contains("Secret"));
710 assert!(safe_desc.contains("RPC endpoint validation failed"));
711 assert!(safe_desc.contains("Nonce synchronization failed"));
712 }
713}