1use actix_web::web::ThinData;
8
9use crate::{
10 config::ServerConfig,
11 constants::{
12 SYSTEM_CLEANUP_CRON_SCHEDULE, TRANSACTION_CLEANUP_CRON_SCHEDULE,
13 WORKER_SYSTEM_CLEANUP_RETRIES, WORKER_TOKEN_SWAP_REQUEST_RETRIES,
14 WORKER_TRANSACTION_CLEANUP_RETRIES,
15 },
16 jobs::{
17 notification_handler, relayer_health_check_handler, system_cleanup_handler,
18 token_swap_cron_handler, token_swap_request_handler, transaction_cleanup_handler,
19 transaction_request_handler, transaction_status_handler, transaction_submission_handler,
20 Job, JobProducerTrait, NotificationSend, RelayerHealthCheck, SystemCleanupCronReminder,
21 TokenSwapCronReminder, TokenSwapRequest, TransactionCleanupCronReminder,
22 TransactionRequest, TransactionSend, TransactionStatusCheck,
23 },
24 models::{
25 DefaultAppState, NetworkRepoModel, NotificationRepoModel, RelayerNetworkPolicy,
26 RelayerRepoModel, SignerRepoModel, ThinDataAppState, TransactionRepoModel,
27 },
28 repositories::{
29 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
30 Repository, TransactionCounterTrait, TransactionRepository,
31 },
32};
33use apalis::prelude::*;
34
35use apalis::layers::retry::backoff::MakeBackoff;
36use apalis::layers::retry::{backoff::ExponentialBackoffMaker, RetryPolicy};
37use apalis::layers::ErrorHandlingLayer;
38
39pub use tower::util::rng::HasherRng;
41
42use apalis_cron::CronStream;
43use eyre::Result;
44use std::{str::FromStr, time::Duration};
45use tokio::signal::unix::SignalKind;
46use tracing::{debug, error, info};
47
48use super::{filter_relayers_for_swap, QueueType, WorkerContext};
49use crate::queues::retry_config::{
50 RetryBackoffConfig, NOTIFICATION_BACKOFF, RELAYER_HEALTH_BACKOFF, STATUS_EVM_BACKOFF,
51 STATUS_GENERIC_BACKOFF, STATUS_STELLAR_BACKOFF, SYSTEM_CLEANUP_BACKOFF,
52 TOKEN_SWAP_CRON_BACKOFF, TOKEN_SWAP_REQUEST_BACKOFF, TX_CLEANUP_BACKOFF, TX_REQUEST_BACKOFF,
53 TX_SUBMISSION_BACKOFF,
54};
55
56async fn apalis_transaction_request_handler(
66 job: Job<TransactionRequest>,
67 state: Data<ThinData<DefaultAppState>>,
68 attempt: Attempt,
69 task_id: TaskId,
70) -> Result<(), apalis::prelude::Error> {
71 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
72 transaction_request_handler(job, (*state).clone(), ctx)
73 .await
74 .map_err(Into::into)
75}
76
77async fn apalis_transaction_submission_handler(
78 job: Job<TransactionSend>,
79 state: Data<ThinData<DefaultAppState>>,
80 attempt: Attempt,
81 task_id: TaskId,
82) -> Result<(), apalis::prelude::Error> {
83 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
84 transaction_submission_handler(job, (*state).clone(), ctx)
85 .await
86 .map_err(Into::into)
87}
88
89async fn apalis_transaction_status_handler(
90 job: Job<TransactionStatusCheck>,
91 state: Data<ThinData<DefaultAppState>>,
92 attempt: Attempt,
93 task_id: TaskId,
94) -> Result<(), apalis::prelude::Error> {
95 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
96 transaction_status_handler(job, (*state).clone(), ctx)
97 .await
98 .map_err(Into::into)
99}
100
101async fn apalis_notification_handler(
102 job: Job<NotificationSend>,
103 state: Data<ThinData<DefaultAppState>>,
104 attempt: Attempt,
105 task_id: TaskId,
106) -> Result<(), apalis::prelude::Error> {
107 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
108 notification_handler(job, (*state).clone(), ctx)
109 .await
110 .map_err(Into::into)
111}
112
113async fn apalis_token_swap_request_handler(
114 job: Job<TokenSwapRequest>,
115 state: Data<ThinData<DefaultAppState>>,
116 attempt: Attempt,
117 task_id: TaskId,
118) -> Result<(), apalis::prelude::Error> {
119 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
120 token_swap_request_handler(job, (*state).clone(), ctx)
121 .await
122 .map_err(Into::into)
123}
124
125async fn apalis_relayer_health_check_handler(
126 job: Job<RelayerHealthCheck>,
127 state: Data<ThinData<DefaultAppState>>,
128 attempt: Attempt,
129 task_id: TaskId,
130) -> Result<(), apalis::prelude::Error> {
131 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
132 relayer_health_check_handler(job, (*state).clone(), ctx)
133 .await
134 .map_err(Into::into)
135}
136
137async fn apalis_transaction_cleanup_handler(
138 _job: TransactionCleanupCronReminder,
139 state: Data<ThinData<DefaultAppState>>,
140 attempt: Attempt,
141 task_id: TaskId,
142) -> Result<(), apalis::prelude::Error> {
143 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
144 transaction_cleanup_handler(TransactionCleanupCronReminder(), (*state).clone(), ctx)
145 .await
146 .map_err(Into::into)
147}
148
149async fn apalis_system_cleanup_handler(
150 _job: SystemCleanupCronReminder,
151 state: Data<ThinData<DefaultAppState>>,
152 attempt: Attempt,
153 task_id: TaskId,
154) -> Result<(), apalis::prelude::Error> {
155 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
156 system_cleanup_handler(SystemCleanupCronReminder(), (*state).clone(), ctx)
157 .await
158 .map_err(Into::into)
159}
160
161async fn apalis_token_swap_cron_handler(
162 _job: TokenSwapCronReminder,
163 relayer_id: Data<String>,
164 state: Data<ThinData<DefaultAppState>>,
165 attempt: Attempt,
166 task_id: TaskId,
167) -> Result<(), apalis::prelude::Error> {
168 let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
169 token_swap_cron_handler(
170 TokenSwapCronReminder(),
171 (*relayer_id).clone(),
172 (*state).clone(),
173 ctx,
174 )
175 .await
176 .map_err(Into::into)
177}
178
179const TRANSACTION_REQUEST: &str = "transaction_request";
180const TRANSACTION_SENDER: &str = "transaction_sender";
181const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
183const TRANSACTION_STATUS_CHECKER_EVM: &str = "transaction_status_checker_evm";
185const TRANSACTION_STATUS_CHECKER_STELLAR: &str = "transaction_status_checker_stellar";
186const NOTIFICATION_SENDER: &str = "notification_sender";
187const TOKEN_SWAP_REQUEST: &str = "token_swap_request";
188const TRANSACTION_CLEANUP: &str = "transaction_cleanup";
189const RELAYER_HEALTH_CHECK: &str = "relayer_health_check";
190const SYSTEM_CLEANUP: &str = "system_cleanup";
191
192fn create_backoff(initial_ms: u64, max_ms: u64, jitter: f64) -> Result<ExponentialBackoffMaker> {
202 let maker = ExponentialBackoffMaker::new(
203 Duration::from_millis(initial_ms),
204 Duration::from_millis(max_ms),
205 jitter,
206 HasherRng::default(),
207 )?;
208
209 Ok(maker)
210}
211
212fn create_backoff_from_config(cfg: RetryBackoffConfig) -> Result<ExponentialBackoffMaker> {
213 create_backoff(cfg.initial_ms, cfg.max_ms, cfg.jitter)
214}
215
216pub async fn initialize_redis_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
221 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
222) -> Result<()>
223where
224 J: JobProducerTrait + Send + Sync + 'static,
225 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
226 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
227 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
228 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
229 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
230 TCR: TransactionCounterTrait + Send + Sync + 'static,
231 PR: PluginRepositoryTrait + Send + Sync + 'static,
232 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
233{
234 let queue_backend = app_state
235 .job_producer
236 .get_queue_backend()
237 .ok_or_else(|| eyre::eyre!("Queue backend is not available"))?;
238 let queue = queue_backend
239 .queue()
240 .cloned()
241 .ok_or_else(|| eyre::eyre!("Redis queue is not available for active backend"))?;
242
243 let transaction_request_queue_worker = WorkerBuilder::new(TRANSACTION_REQUEST)
244 .layer(ErrorHandlingLayer::new())
245 .retry(
246 RetryPolicy::retries(QueueType::TransactionRequest.max_retries())
247 .with_backoff(create_backoff_from_config(TX_REQUEST_BACKOFF)?.make_backoff()),
248 )
249 .enable_tracing()
250 .catch_panic()
251 .concurrency(ServerConfig::get_worker_concurrency(
252 QueueType::TransactionRequest.concurrency_env_key(),
253 QueueType::TransactionRequest.default_concurrency(),
254 ))
255 .data(app_state.clone())
256 .backend(queue.transaction_request_queue.clone())
257 .build_fn(apalis_transaction_request_handler);
258
259 let transaction_submission_queue_worker = WorkerBuilder::new(TRANSACTION_SENDER)
260 .layer(ErrorHandlingLayer::new())
261 .enable_tracing()
262 .catch_panic()
263 .retry(
264 RetryPolicy::retries(QueueType::TransactionSubmission.max_retries())
265 .with_backoff(create_backoff_from_config(TX_SUBMISSION_BACKOFF)?.make_backoff()),
266 )
267 .concurrency(ServerConfig::get_worker_concurrency(
268 QueueType::TransactionSubmission.concurrency_env_key(),
269 QueueType::TransactionSubmission.default_concurrency(),
270 ))
271 .data(app_state.clone())
272 .backend(queue.transaction_submission_queue.clone())
273 .build_fn(apalis_transaction_submission_handler);
274
275 let transaction_status_queue_worker = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER)
278 .layer(ErrorHandlingLayer::new())
279 .enable_tracing()
280 .catch_panic()
281 .retry(
282 RetryPolicy::retries(QueueType::StatusCheck.max_retries())
283 .with_backoff(create_backoff_from_config(STATUS_GENERIC_BACKOFF)?.make_backoff()),
284 )
285 .concurrency(ServerConfig::get_worker_concurrency(
286 QueueType::StatusCheck.concurrency_env_key(),
287 QueueType::StatusCheck.default_concurrency(),
288 ))
289 .data(app_state.clone())
290 .backend(queue.transaction_status_queue.clone())
291 .build_fn(apalis_transaction_status_handler);
292
293 let transaction_status_queue_worker_evm = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_EVM)
296 .layer(ErrorHandlingLayer::new())
297 .enable_tracing()
298 .catch_panic()
299 .retry(
300 RetryPolicy::retries(QueueType::StatusCheck.max_retries())
301 .with_backoff(create_backoff_from_config(STATUS_EVM_BACKOFF)?.make_backoff()),
302 )
303 .concurrency(ServerConfig::get_worker_concurrency(
304 QueueType::StatusCheckEvm.concurrency_env_key(),
305 QueueType::StatusCheckEvm.default_concurrency(),
306 ))
307 .data(app_state.clone())
308 .backend(queue.transaction_status_queue_evm.clone())
309 .build_fn(apalis_transaction_status_handler);
310
311 let transaction_status_queue_worker_stellar =
314 WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_STELLAR)
315 .layer(ErrorHandlingLayer::new())
316 .enable_tracing()
317 .catch_panic()
318 .retry(
319 RetryPolicy::retries(QueueType::StatusCheckStellar.max_retries()).with_backoff(
320 create_backoff_from_config(STATUS_STELLAR_BACKOFF)?.make_backoff(),
321 ),
322 )
323 .concurrency(ServerConfig::get_worker_concurrency(
324 QueueType::StatusCheckStellar.concurrency_env_key(),
325 QueueType::StatusCheckStellar.default_concurrency(),
326 ))
327 .data(app_state.clone())
328 .backend(queue.transaction_status_queue_stellar.clone())
329 .build_fn(apalis_transaction_status_handler);
330
331 let notification_queue_worker = WorkerBuilder::new(NOTIFICATION_SENDER)
332 .layer(ErrorHandlingLayer::new())
333 .enable_tracing()
334 .catch_panic()
335 .retry(
336 RetryPolicy::retries(QueueType::Notification.max_retries())
337 .with_backoff(create_backoff_from_config(NOTIFICATION_BACKOFF)?.make_backoff()),
338 )
339 .concurrency(ServerConfig::get_worker_concurrency(
340 QueueType::Notification.concurrency_env_key(),
341 QueueType::Notification.default_concurrency(),
342 ))
343 .data(app_state.clone())
344 .backend(queue.notification_queue.clone())
345 .build_fn(apalis_notification_handler);
346
347 let token_swap_request_queue_worker = WorkerBuilder::new(TOKEN_SWAP_REQUEST)
348 .layer(ErrorHandlingLayer::new())
349 .enable_tracing()
350 .catch_panic()
351 .retry(
352 RetryPolicy::retries(QueueType::TokenSwapRequest.max_retries()).with_backoff(
353 create_backoff_from_config(TOKEN_SWAP_REQUEST_BACKOFF)?.make_backoff(),
354 ),
355 )
356 .concurrency(ServerConfig::get_worker_concurrency(
357 QueueType::TokenSwapRequest.concurrency_env_key(),
358 QueueType::TokenSwapRequest.default_concurrency(),
359 ))
360 .data(app_state.clone())
361 .backend(queue.token_swap_request_queue.clone())
362 .build_fn(apalis_token_swap_request_handler);
363
364 let transaction_cleanup_queue_worker = WorkerBuilder::new(TRANSACTION_CLEANUP)
365 .layer(ErrorHandlingLayer::new())
366 .enable_tracing()
367 .catch_panic()
368 .retry(
369 RetryPolicy::retries(WORKER_TRANSACTION_CLEANUP_RETRIES)
370 .with_backoff(create_backoff_from_config(TX_CLEANUP_BACKOFF)?.make_backoff()),
371 )
372 .concurrency(ServerConfig::get_worker_concurrency(TRANSACTION_CLEANUP, 1)) .data(app_state.clone())
374 .backend(CronStream::new(
375 apalis_cron::Schedule::from_str(TRANSACTION_CLEANUP_CRON_SCHEDULE)?,
376 ))
377 .build_fn(apalis_transaction_cleanup_handler);
378
379 let system_cleanup_queue_worker = WorkerBuilder::new(SYSTEM_CLEANUP)
380 .layer(ErrorHandlingLayer::new())
381 .enable_tracing()
382 .catch_panic()
383 .retry(
384 RetryPolicy::retries(WORKER_SYSTEM_CLEANUP_RETRIES)
385 .with_backoff(create_backoff_from_config(SYSTEM_CLEANUP_BACKOFF)?.make_backoff()),
386 )
387 .concurrency(1)
388 .data(app_state.clone())
389 .backend(CronStream::new(apalis_cron::Schedule::from_str(
390 SYSTEM_CLEANUP_CRON_SCHEDULE,
391 )?))
392 .build_fn(apalis_system_cleanup_handler);
393
394 let relayer_health_check_worker = WorkerBuilder::new(RELAYER_HEALTH_CHECK)
395 .layer(ErrorHandlingLayer::new())
396 .enable_tracing()
397 .catch_panic()
398 .retry(
399 RetryPolicy::retries(QueueType::RelayerHealthCheck.max_retries())
400 .with_backoff(create_backoff_from_config(RELAYER_HEALTH_BACKOFF)?.make_backoff()),
401 )
402 .concurrency(ServerConfig::get_worker_concurrency(
403 QueueType::RelayerHealthCheck.concurrency_env_key(),
404 QueueType::RelayerHealthCheck.default_concurrency(),
405 ))
406 .data(app_state.clone())
407 .backend(queue.relayer_health_check_queue.clone())
408 .build_fn(apalis_relayer_health_check_handler);
409
410 let monitor = Monitor::new()
411 .register(transaction_request_queue_worker)
412 .register(transaction_submission_queue_worker)
413 .register(transaction_status_queue_worker)
414 .register(transaction_status_queue_worker_evm)
415 .register(transaction_status_queue_worker_stellar)
416 .register(notification_queue_worker)
417 .register(token_swap_request_queue_worker)
418 .register(transaction_cleanup_queue_worker)
419 .register(system_cleanup_queue_worker)
420 .register(relayer_health_check_worker)
421 .on_event(monitor_handle_event)
422 .shutdown_timeout(Duration::from_millis(5000));
423
424 let monitor_future = monitor.run_with_signal(async {
425 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
426 .map_err(|e| std::io::Error::other(format!("Failed to create SIGINT signal: {e}")))?;
427 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
428 .map_err(|e| std::io::Error::other(format!("Failed to create SIGTERM signal: {e}")))?;
429
430 debug!("Workers monitor started");
431
432 tokio::select! {
433 _ = sigint.recv() => debug!("Received SIGINT."),
434 _ = sigterm.recv() => debug!("Received SIGTERM."),
435 };
436
437 debug!("Workers monitor shutting down");
438
439 Ok(())
440 });
441 tokio::spawn(async move {
442 if let Err(e) = monitor_future.await {
443 error!(error = %e, "monitor error");
444 }
445 });
446 debug!("Workers monitor shutdown complete");
447
448 Ok(())
449}
450
451pub async fn initialize_redis_token_swap_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
454 app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
455) -> Result<()>
456where
457 J: JobProducerTrait + Send + Sync + 'static,
458 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
459 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
460 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
461 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
462 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
463 TCR: TransactionCounterTrait + Send + Sync + 'static,
464 PR: PluginRepositoryTrait + Send + Sync + 'static,
465 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
466{
467 let active_relayers = app_state.relayer_repository.list_active().await?;
468 let relayers_with_swap_enabled = filter_relayers_for_swap(active_relayers);
469
470 if relayers_with_swap_enabled.is_empty() {
471 debug!("No relayers with swap enabled");
472 return Ok(());
473 }
474 info!(
475 "Found {} relayers with swap enabled",
476 relayers_with_swap_enabled.len()
477 );
478
479 let mut workers = Vec::new();
480
481 let swap_backoff = create_backoff_from_config(TOKEN_SWAP_CRON_BACKOFF)?.make_backoff();
482
483 for relayer in relayers_with_swap_enabled {
484 debug!(relayer = ?relayer, "found relayer with swap enabled");
485
486 let (cron_schedule, network_type) = match &relayer.policies {
487 RelayerNetworkPolicy::Solana(policy) => match policy.get_swap_config() {
488 Some(config) => match config.cron_schedule {
489 Some(schedule) => (schedule, "solana".to_string()),
490 None => {
491 debug!(relayer_id = %relayer.id, "No cron schedule specified for Solana relayer; skipping");
492 continue;
493 }
494 },
495 None => {
496 debug!(relayer_id = %relayer.id, "No swap configuration specified for Solana relayer; skipping");
497 continue;
498 }
499 },
500 RelayerNetworkPolicy::Stellar(policy) => match policy.get_swap_config() {
501 Some(config) => match config.cron_schedule {
502 Some(schedule) => (schedule, "stellar".to_string()),
503 None => {
504 debug!(relayer_id = %relayer.id, "No cron schedule specified for Stellar relayer; skipping");
505 continue;
506 }
507 },
508 None => {
509 debug!(relayer_id = %relayer.id, "No swap configuration specified for Stellar relayer; skipping");
510 continue;
511 }
512 },
513 RelayerNetworkPolicy::Evm(_) => {
514 debug!(relayer_id = %relayer.id, "EVM relayers do not support swap; skipping");
515 continue;
516 }
517 };
518
519 let calendar_schedule = match apalis_cron::Schedule::from_str(&cron_schedule) {
520 Ok(schedule) => schedule,
521 Err(e) => {
522 error!(relayer_id = %relayer.id, error = %e, "Failed to parse cron schedule; skipping");
523 continue;
524 }
525 };
526
527 let worker = WorkerBuilder::new(format!(
529 "{}-swap-schedule-{}",
530 network_type,
531 relayer.id.clone()
532 ))
533 .layer(ErrorHandlingLayer::new())
534 .enable_tracing()
535 .catch_panic()
536 .retry(
537 RetryPolicy::retries(WORKER_TOKEN_SWAP_REQUEST_RETRIES)
538 .with_backoff(swap_backoff.clone()),
539 )
540 .concurrency(1)
541 .data(relayer.id.clone())
542 .data(app_state.clone())
543 .backend(CronStream::new(calendar_schedule))
544 .build_fn(apalis_token_swap_cron_handler);
545
546 workers.push(worker);
547 debug!(
548 relayer_id = %relayer.id,
549 network_type = %network_type,
550 "Created worker for relayer with swap enabled"
551 );
552 }
553
554 let mut monitor = Monitor::new()
555 .on_event(monitor_handle_event)
556 .shutdown_timeout(Duration::from_millis(5000));
557
558 for worker in workers {
560 monitor = monitor.register(worker);
561 }
562
563 let monitor_future = monitor.run_with_signal(async {
564 let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
565 .map_err(|e| std::io::Error::other(format!("Failed to create SIGINT signal: {e}")))?;
566 let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
567 .map_err(|e| std::io::Error::other(format!("Failed to create SIGTERM signal: {e}")))?;
568
569 debug!("Swap Monitor started");
570
571 tokio::select! {
572 _ = sigint.recv() => debug!("Received SIGINT."),
573 _ = sigterm.recv() => debug!("Received SIGTERM."),
574 };
575
576 debug!("Swap Monitor shutting down");
577
578 Ok(())
579 });
580 tokio::spawn(async move {
581 if let Err(e) = monitor_future.await {
582 error!(error = %e, "monitor error");
583 }
584 });
585 Ok(())
586}
587
588fn monitor_handle_event(e: Worker<Event>) {
589 let worker_id = e.id();
590 match e.inner() {
591 Event::Engage(task_id) => {
592 debug!(worker_id = %worker_id, task_id = %task_id, "worker got a job");
593 }
594 Event::Error(e) => {
595 error!(worker_id = %worker_id, error = %e, "worker encountered an error");
596 }
597 Event::Exit => {
598 debug!(worker_id = %worker_id, "worker exited");
599 }
600 Event::Idle => {
601 debug!(worker_id = %worker_id, "worker is idle");
602 }
603 Event::Start => {
604 debug!(worker_id = %worker_id, "worker started");
605 }
606 Event::Stop => {
607 debug!(worker_id = %worker_id, "worker stopped");
608 }
609 _ => {}
610 }
611}
612
613#[cfg(test)]
614mod tests {
615 use super::*;
616 use crate::queues::retry_config::{
617 NOTIFICATION_BACKOFF, RELAYER_HEALTH_BACKOFF, STATUS_EVM_BACKOFF, STATUS_GENERIC_BACKOFF,
618 STATUS_STELLAR_BACKOFF, SYSTEM_CLEANUP_BACKOFF, TOKEN_SWAP_CRON_BACKOFF,
619 TOKEN_SWAP_REQUEST_BACKOFF, TX_CLEANUP_BACKOFF, TX_REQUEST_BACKOFF, TX_SUBMISSION_BACKOFF,
620 };
621
622 #[test]
625 fn test_create_backoff_with_valid_parameters() {
626 let result = create_backoff(200, 5000, 0.99);
627 assert!(
628 result.is_ok(),
629 "Should create backoff with valid parameters"
630 );
631 }
632
633 #[test]
634 fn test_create_backoff_with_zero_initial() {
635 let result = create_backoff(0, 5000, 0.99);
636 assert!(
637 result.is_ok(),
638 "Should handle zero initial delay (edge case)"
639 );
640 }
641
642 #[test]
643 fn test_create_backoff_with_equal_initial_and_max() {
644 let result = create_backoff(1000, 1000, 0.5);
645 assert!(result.is_ok(), "Should handle equal initial and max delays");
646 }
647
648 #[test]
649 fn test_create_backoff_with_zero_jitter() {
650 let result = create_backoff(500, 5000, 0.0);
651 assert!(result.is_ok(), "Should handle zero jitter");
652 }
653
654 #[test]
655 fn test_create_backoff_with_max_jitter() {
656 let result = create_backoff(500, 5000, 1.0);
657 assert!(result.is_ok(), "Should handle maximum jitter (1.0)");
658 }
659
660 #[test]
661 fn test_create_backoff_with_small_values() {
662 let result = create_backoff(1, 10, 0.5);
663 assert!(result.is_ok(), "Should handle very small delay values");
664 }
665
666 #[test]
667 fn test_create_backoff_with_large_values() {
668 let result = create_backoff(10000, 60000, 0.99);
669 assert!(result.is_ok(), "Should handle large delay values");
670 }
671
672 #[test]
673 fn test_create_backoff_from_config_profiles() {
674 let profiles = [
675 TX_REQUEST_BACKOFF,
676 TX_SUBMISSION_BACKOFF,
677 STATUS_GENERIC_BACKOFF,
678 STATUS_EVM_BACKOFF,
679 STATUS_STELLAR_BACKOFF,
680 NOTIFICATION_BACKOFF,
681 TOKEN_SWAP_REQUEST_BACKOFF,
682 TX_CLEANUP_BACKOFF,
683 SYSTEM_CLEANUP_BACKOFF,
684 RELAYER_HEALTH_BACKOFF,
685 TOKEN_SWAP_CRON_BACKOFF,
686 ];
687
688 for cfg in profiles {
689 let result = create_backoff_from_config(cfg);
690 assert!(
691 result.is_ok(),
692 "backoff profile should be constructible: {:?}",
693 cfg
694 );
695 }
696 }
697
698 #[test]
699 fn test_create_backoff_from_config_produces_usable_backoff() {
700 let profiles = [
701 TX_REQUEST_BACKOFF,
702 TX_SUBMISSION_BACKOFF,
703 STATUS_GENERIC_BACKOFF,
704 STATUS_EVM_BACKOFF,
705 STATUS_STELLAR_BACKOFF,
706 NOTIFICATION_BACKOFF,
707 TOKEN_SWAP_REQUEST_BACKOFF,
708 TX_CLEANUP_BACKOFF,
709 SYSTEM_CLEANUP_BACKOFF,
710 RELAYER_HEALTH_BACKOFF,
711 TOKEN_SWAP_CRON_BACKOFF,
712 ];
713
714 for cfg in profiles {
715 let mut maker = create_backoff_from_config(cfg).unwrap();
716 let _backoff = maker.make_backoff();
718 }
719 }
720
721 #[test]
722 fn test_create_backoff_with_initial_greater_than_max_errors() {
723 let result = create_backoff(10000, 100, 0.5);
724 assert!(
725 result.is_err(),
726 "initial > max should be rejected by ExponentialBackoffMaker"
727 );
728 }
729
730 #[test]
733 fn test_all_backoff_configs_have_valid_initial_le_max() {
734 let profiles: &[(&str, RetryBackoffConfig)] = &[
735 ("TX_REQUEST", TX_REQUEST_BACKOFF),
736 ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
737 ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
738 ("STATUS_EVM", STATUS_EVM_BACKOFF),
739 ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
740 ("NOTIFICATION", NOTIFICATION_BACKOFF),
741 ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
742 ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
743 ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
744 ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
745 ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
746 ];
747
748 for (name, cfg) in profiles {
749 assert!(
750 cfg.initial_ms <= cfg.max_ms,
751 "{name}: initial_ms ({}) must be <= max_ms ({})",
752 cfg.initial_ms,
753 cfg.max_ms
754 );
755 }
756 }
757
758 #[test]
759 fn test_all_backoff_configs_have_valid_jitter_range() {
760 let profiles: &[(&str, RetryBackoffConfig)] = &[
761 ("TX_REQUEST", TX_REQUEST_BACKOFF),
762 ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
763 ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
764 ("STATUS_EVM", STATUS_EVM_BACKOFF),
765 ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
766 ("NOTIFICATION", NOTIFICATION_BACKOFF),
767 ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
768 ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
769 ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
770 ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
771 ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
772 ];
773
774 for (name, cfg) in profiles {
775 assert!(
776 (0.0..=1.0).contains(&cfg.jitter),
777 "{name}: jitter ({}) must be in [0.0, 1.0]",
778 cfg.jitter
779 );
780 }
781 }
782
783 #[test]
784 fn test_all_backoff_configs_have_positive_initial_ms() {
785 let profiles: &[(&str, RetryBackoffConfig)] = &[
786 ("TX_REQUEST", TX_REQUEST_BACKOFF),
787 ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
788 ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
789 ("STATUS_EVM", STATUS_EVM_BACKOFF),
790 ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
791 ("NOTIFICATION", NOTIFICATION_BACKOFF),
792 ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
793 ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
794 ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
795 ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
796 ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
797 ];
798
799 for (name, cfg) in profiles {
800 assert!(
801 cfg.initial_ms > 0,
802 "{name}: initial_ms must be positive, got {}",
803 cfg.initial_ms
804 );
805 }
806 }
807
808 #[test]
811 fn test_worker_name_constants_are_nonempty() {
812 let names = [
813 TRANSACTION_REQUEST,
814 TRANSACTION_SENDER,
815 TRANSACTION_STATUS_CHECKER,
816 TRANSACTION_STATUS_CHECKER_EVM,
817 TRANSACTION_STATUS_CHECKER_STELLAR,
818 NOTIFICATION_SENDER,
819 TOKEN_SWAP_REQUEST,
820 TRANSACTION_CLEANUP,
821 RELAYER_HEALTH_CHECK,
822 SYSTEM_CLEANUP,
823 ];
824
825 for name in &names {
826 assert!(!name.is_empty(), "Worker name constant must not be empty");
827 }
828 }
829
830 #[test]
831 fn test_worker_name_constants_are_unique() {
832 let names = [
833 TRANSACTION_REQUEST,
834 TRANSACTION_SENDER,
835 TRANSACTION_STATUS_CHECKER,
836 TRANSACTION_STATUS_CHECKER_EVM,
837 TRANSACTION_STATUS_CHECKER_STELLAR,
838 NOTIFICATION_SENDER,
839 TOKEN_SWAP_REQUEST,
840 TRANSACTION_CLEANUP,
841 RELAYER_HEALTH_CHECK,
842 SYSTEM_CLEANUP,
843 ];
844
845 for (i, a) in names.iter().enumerate() {
846 for (j, b) in names.iter().enumerate() {
847 if i != j {
848 assert_ne!(
849 a, b,
850 "Worker names must be unique: '{}' at index {} and {}",
851 a, i, j
852 );
853 }
854 }
855 }
856 }
857
858 #[test]
859 fn test_worker_names_match_concurrency_env_keys() {
860 assert_eq!(
864 TRANSACTION_REQUEST,
865 QueueType::TransactionRequest.concurrency_env_key()
866 );
867 assert_eq!(
868 TRANSACTION_SENDER,
869 QueueType::TransactionSubmission.concurrency_env_key()
870 );
871 assert_eq!(
872 TRANSACTION_STATUS_CHECKER,
873 QueueType::StatusCheck.concurrency_env_key()
874 );
875 assert_eq!(
876 TRANSACTION_STATUS_CHECKER_EVM,
877 QueueType::StatusCheckEvm.concurrency_env_key()
878 );
879 assert_eq!(
880 TRANSACTION_STATUS_CHECKER_STELLAR,
881 QueueType::StatusCheckStellar.concurrency_env_key()
882 );
883 assert_eq!(
884 NOTIFICATION_SENDER,
885 QueueType::Notification.concurrency_env_key()
886 );
887 assert_eq!(
888 TOKEN_SWAP_REQUEST,
889 QueueType::TokenSwapRequest.concurrency_env_key()
890 );
891 assert_eq!(
892 RELAYER_HEALTH_CHECK,
893 QueueType::RelayerHealthCheck.concurrency_env_key()
894 );
895 }
896
897 fn make_worker_event(event: Event) -> Worker<Event> {
900 let worker_id = WorkerId::from_str("test-worker").unwrap();
901 Worker::new(worker_id, event)
902 }
903
904 #[test]
905 fn test_monitor_handle_event_start_does_not_panic() {
906 monitor_handle_event(make_worker_event(Event::Start));
907 }
908
909 #[test]
910 fn test_monitor_handle_event_engage_does_not_panic() {
911 let task_id = TaskId::new();
912 monitor_handle_event(make_worker_event(Event::Engage(task_id)));
913 }
914
915 #[test]
916 fn test_monitor_handle_event_idle_does_not_panic() {
917 monitor_handle_event(make_worker_event(Event::Idle));
918 }
919
920 #[test]
921 fn test_monitor_handle_event_error_does_not_panic() {
922 let error: Box<dyn std::error::Error + Send + Sync> = "test error".to_string().into();
923 monitor_handle_event(make_worker_event(Event::Error(error)));
924 }
925
926 #[test]
927 fn test_monitor_handle_event_stop_does_not_panic() {
928 monitor_handle_event(make_worker_event(Event::Stop));
929 }
930
931 #[test]
932 fn test_monitor_handle_event_exit_does_not_panic() {
933 monitor_handle_event(make_worker_event(Event::Exit));
934 }
935
936 #[test]
937 fn test_monitor_handle_event_custom_does_not_panic() {
938 monitor_handle_event(make_worker_event(Event::Custom("test-custom".to_string())));
939 }
940}