openzeppelin_relayer/queues/redis/
worker.rs

1//! Redis/Apalis worker initialization.
2//!
3//! This module contains all Apalis-specific worker creation logic for the Redis
4//! queue backend, including WorkerBuilder configurations, Monitor setup,
5//! backoff strategies, and token swap cron workers.
6
7use actix_web::web::ThinData;
8
9use crate::{
10    config::ServerConfig,
11    constants::{
12        SYSTEM_CLEANUP_CRON_SCHEDULE, TRANSACTION_CLEANUP_CRON_SCHEDULE,
13        WORKER_SYSTEM_CLEANUP_RETRIES, WORKER_TOKEN_SWAP_REQUEST_RETRIES,
14        WORKER_TRANSACTION_CLEANUP_RETRIES,
15    },
16    jobs::{
17        notification_handler, relayer_health_check_handler, system_cleanup_handler,
18        token_swap_cron_handler, token_swap_request_handler, transaction_cleanup_handler,
19        transaction_request_handler, transaction_status_handler, transaction_submission_handler,
20        Job, JobProducerTrait, NotificationSend, RelayerHealthCheck, SystemCleanupCronReminder,
21        TokenSwapCronReminder, TokenSwapRequest, TransactionCleanupCronReminder,
22        TransactionRequest, TransactionSend, TransactionStatusCheck,
23    },
24    models::{
25        DefaultAppState, NetworkRepoModel, NotificationRepoModel, RelayerNetworkPolicy,
26        RelayerRepoModel, SignerRepoModel, ThinDataAppState, TransactionRepoModel,
27    },
28    repositories::{
29        ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
30        Repository, TransactionCounterTrait, TransactionRepository,
31    },
32};
33use apalis::prelude::*;
34
35use apalis::layers::retry::backoff::MakeBackoff;
36use apalis::layers::retry::{backoff::ExponentialBackoffMaker, RetryPolicy};
37use apalis::layers::ErrorHandlingLayer;
38
39/// Re-exports from [`tower::util`]
40pub use tower::util::rng::HasherRng;
41
42use apalis_cron::CronStream;
43use eyre::Result;
44use std::{str::FromStr, time::Duration};
45use tokio::signal::unix::SignalKind;
46use tracing::{debug, error, info};
47
48use super::{filter_relayers_for_swap, QueueType, WorkerContext};
49use crate::queues::retry_config::{
50    RetryBackoffConfig, NOTIFICATION_BACKOFF, RELAYER_HEALTH_BACKOFF, STATUS_EVM_BACKOFF,
51    STATUS_GENERIC_BACKOFF, STATUS_STELLAR_BACKOFF, SYSTEM_CLEANUP_BACKOFF,
52    TOKEN_SWAP_CRON_BACKOFF, TOKEN_SWAP_REQUEST_BACKOFF, TX_CLEANUP_BACKOFF, TX_REQUEST_BACKOFF,
53    TX_SUBMISSION_BACKOFF,
54};
55
56// ---------------------------------------------------------------------------
57// Apalis adapter functions
58//
59// These thin adapters are the ONLY place where Apalis-specific handler types
60// (Data, Attempt, Worker<Context>, TaskId, RedisContext) appear. They convert
61// Apalis types → WorkerContext and HandlerError → apalis::prelude::Error,
62// keeping all handler business logic backend-neutral.
63// ---------------------------------------------------------------------------
64
65async fn apalis_transaction_request_handler(
66    job: Job<TransactionRequest>,
67    state: Data<ThinData<DefaultAppState>>,
68    attempt: Attempt,
69    task_id: TaskId,
70) -> Result<(), apalis::prelude::Error> {
71    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
72    transaction_request_handler(job, (*state).clone(), ctx)
73        .await
74        .map_err(Into::into)
75}
76
77async fn apalis_transaction_submission_handler(
78    job: Job<TransactionSend>,
79    state: Data<ThinData<DefaultAppState>>,
80    attempt: Attempt,
81    task_id: TaskId,
82) -> Result<(), apalis::prelude::Error> {
83    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
84    transaction_submission_handler(job, (*state).clone(), ctx)
85        .await
86        .map_err(Into::into)
87}
88
89async fn apalis_transaction_status_handler(
90    job: Job<TransactionStatusCheck>,
91    state: Data<ThinData<DefaultAppState>>,
92    attempt: Attempt,
93    task_id: TaskId,
94) -> Result<(), apalis::prelude::Error> {
95    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
96    transaction_status_handler(job, (*state).clone(), ctx)
97        .await
98        .map_err(Into::into)
99}
100
101async fn apalis_notification_handler(
102    job: Job<NotificationSend>,
103    state: Data<ThinData<DefaultAppState>>,
104    attempt: Attempt,
105    task_id: TaskId,
106) -> Result<(), apalis::prelude::Error> {
107    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
108    notification_handler(job, (*state).clone(), ctx)
109        .await
110        .map_err(Into::into)
111}
112
113async fn apalis_token_swap_request_handler(
114    job: Job<TokenSwapRequest>,
115    state: Data<ThinData<DefaultAppState>>,
116    attempt: Attempt,
117    task_id: TaskId,
118) -> Result<(), apalis::prelude::Error> {
119    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
120    token_swap_request_handler(job, (*state).clone(), ctx)
121        .await
122        .map_err(Into::into)
123}
124
125async fn apalis_relayer_health_check_handler(
126    job: Job<RelayerHealthCheck>,
127    state: Data<ThinData<DefaultAppState>>,
128    attempt: Attempt,
129    task_id: TaskId,
130) -> Result<(), apalis::prelude::Error> {
131    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
132    relayer_health_check_handler(job, (*state).clone(), ctx)
133        .await
134        .map_err(Into::into)
135}
136
137async fn apalis_transaction_cleanup_handler(
138    _job: TransactionCleanupCronReminder,
139    state: Data<ThinData<DefaultAppState>>,
140    attempt: Attempt,
141    task_id: TaskId,
142) -> Result<(), apalis::prelude::Error> {
143    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
144    transaction_cleanup_handler(TransactionCleanupCronReminder(), (*state).clone(), ctx)
145        .await
146        .map_err(Into::into)
147}
148
149async fn apalis_system_cleanup_handler(
150    _job: SystemCleanupCronReminder,
151    state: Data<ThinData<DefaultAppState>>,
152    attempt: Attempt,
153    task_id: TaskId,
154) -> Result<(), apalis::prelude::Error> {
155    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
156    system_cleanup_handler(SystemCleanupCronReminder(), (*state).clone(), ctx)
157        .await
158        .map_err(Into::into)
159}
160
161async fn apalis_token_swap_cron_handler(
162    _job: TokenSwapCronReminder,
163    relayer_id: Data<String>,
164    state: Data<ThinData<DefaultAppState>>,
165    attempt: Attempt,
166    task_id: TaskId,
167) -> Result<(), apalis::prelude::Error> {
168    let ctx = WorkerContext::new(attempt.current(), task_id.to_string());
169    token_swap_cron_handler(
170        TokenSwapCronReminder(),
171        (*relayer_id).clone(),
172        (*state).clone(),
173        ctx,
174    )
175    .await
176    .map_err(Into::into)
177}
178
179const TRANSACTION_REQUEST: &str = "transaction_request";
180const TRANSACTION_SENDER: &str = "transaction_sender";
181// Generic transaction status checker
182const TRANSACTION_STATUS_CHECKER: &str = "transaction_status_checker";
183// Network specific status checkers
184const TRANSACTION_STATUS_CHECKER_EVM: &str = "transaction_status_checker_evm";
185const TRANSACTION_STATUS_CHECKER_STELLAR: &str = "transaction_status_checker_stellar";
186const NOTIFICATION_SENDER: &str = "notification_sender";
187const TOKEN_SWAP_REQUEST: &str = "token_swap_request";
188const TRANSACTION_CLEANUP: &str = "transaction_cleanup";
189const RELAYER_HEALTH_CHECK: &str = "relayer_health_check";
190const SYSTEM_CLEANUP: &str = "system_cleanup";
191
192/// Creates an exponential backoff with configurable parameters
193///
194/// # Arguments
195/// * `initial_ms` - Initial delay in milliseconds (e.g., 200)
196/// * `max_ms` - Maximum delay in milliseconds (e.g., 5000)
197/// * `jitter` - Jitter factor 0.0-1.0 (e.g., 0.99 for high jitter)
198///
199/// # Returns
200/// A configured backoff instance ready for use with RetryPolicy
201fn create_backoff(initial_ms: u64, max_ms: u64, jitter: f64) -> Result<ExponentialBackoffMaker> {
202    let maker = ExponentialBackoffMaker::new(
203        Duration::from_millis(initial_ms),
204        Duration::from_millis(max_ms),
205        jitter,
206        HasherRng::default(),
207    )?;
208
209    Ok(maker)
210}
211
212fn create_backoff_from_config(cfg: RetryBackoffConfig) -> Result<ExponentialBackoffMaker> {
213    create_backoff(cfg.initial_ms, cfg.max_ms, cfg.jitter)
214}
215
216/// Initializes Redis/Apalis workers and starts the lifecycle monitor.
217///
218/// # Arguments
219/// * `app_state` - Application state containing the job producer and configuration
220pub async fn initialize_redis_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
221    app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
222) -> Result<()>
223where
224    J: JobProducerTrait + Send + Sync + 'static,
225    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
226    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
227    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
228    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
229    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
230    TCR: TransactionCounterTrait + Send + Sync + 'static,
231    PR: PluginRepositoryTrait + Send + Sync + 'static,
232    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
233{
234    let queue_backend = app_state
235        .job_producer
236        .get_queue_backend()
237        .ok_or_else(|| eyre::eyre!("Queue backend is not available"))?;
238    let queue = queue_backend
239        .queue()
240        .cloned()
241        .ok_or_else(|| eyre::eyre!("Redis queue is not available for active backend"))?;
242
243    let transaction_request_queue_worker = WorkerBuilder::new(TRANSACTION_REQUEST)
244        .layer(ErrorHandlingLayer::new())
245        .retry(
246            RetryPolicy::retries(QueueType::TransactionRequest.max_retries())
247                .with_backoff(create_backoff_from_config(TX_REQUEST_BACKOFF)?.make_backoff()),
248        )
249        .enable_tracing()
250        .catch_panic()
251        .concurrency(ServerConfig::get_worker_concurrency(
252            QueueType::TransactionRequest.concurrency_env_key(),
253            QueueType::TransactionRequest.default_concurrency(),
254        ))
255        .data(app_state.clone())
256        .backend(queue.transaction_request_queue.clone())
257        .build_fn(apalis_transaction_request_handler);
258
259    let transaction_submission_queue_worker = WorkerBuilder::new(TRANSACTION_SENDER)
260        .layer(ErrorHandlingLayer::new())
261        .enable_tracing()
262        .catch_panic()
263        .retry(
264            RetryPolicy::retries(QueueType::TransactionSubmission.max_retries())
265                .with_backoff(create_backoff_from_config(TX_SUBMISSION_BACKOFF)?.make_backoff()),
266        )
267        .concurrency(ServerConfig::get_worker_concurrency(
268            QueueType::TransactionSubmission.concurrency_env_key(),
269            QueueType::TransactionSubmission.default_concurrency(),
270        ))
271        .data(app_state.clone())
272        .backend(queue.transaction_submission_queue.clone())
273        .build_fn(apalis_transaction_submission_handler);
274
275    // Generic status checker
276    // Uses medium settings that work reasonably for most chains
277    let transaction_status_queue_worker = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER)
278        .layer(ErrorHandlingLayer::new())
279        .enable_tracing()
280        .catch_panic()
281        .retry(
282            RetryPolicy::retries(QueueType::StatusCheck.max_retries())
283                .with_backoff(create_backoff_from_config(STATUS_GENERIC_BACKOFF)?.make_backoff()),
284        )
285        .concurrency(ServerConfig::get_worker_concurrency(
286            QueueType::StatusCheck.concurrency_env_key(),
287            QueueType::StatusCheck.default_concurrency(),
288        ))
289        .data(app_state.clone())
290        .backend(queue.transaction_status_queue.clone())
291        .build_fn(apalis_transaction_status_handler);
292
293    // EVM status checker - slower retries to avoid premature resubmission
294    // EVM has longer block times (~12s) and needs time for resubmission logic
295    let transaction_status_queue_worker_evm = WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_EVM)
296        .layer(ErrorHandlingLayer::new())
297        .enable_tracing()
298        .catch_panic()
299        .retry(
300            RetryPolicy::retries(QueueType::StatusCheck.max_retries())
301                .with_backoff(create_backoff_from_config(STATUS_EVM_BACKOFF)?.make_backoff()),
302        )
303        .concurrency(ServerConfig::get_worker_concurrency(
304            QueueType::StatusCheckEvm.concurrency_env_key(),
305            QueueType::StatusCheckEvm.default_concurrency(),
306        ))
307        .data(app_state.clone())
308        .backend(queue.transaction_status_queue_evm.clone())
309        .build_fn(apalis_transaction_status_handler);
310
311    // Stellar status checker - fast retries for fast finality
312    // Stellar has sub-second finality, needs more frequent status checks
313    let transaction_status_queue_worker_stellar =
314        WorkerBuilder::new(TRANSACTION_STATUS_CHECKER_STELLAR)
315            .layer(ErrorHandlingLayer::new())
316            .enable_tracing()
317            .catch_panic()
318            .retry(
319                RetryPolicy::retries(QueueType::StatusCheckStellar.max_retries()).with_backoff(
320                    create_backoff_from_config(STATUS_STELLAR_BACKOFF)?.make_backoff(),
321                ),
322            )
323            .concurrency(ServerConfig::get_worker_concurrency(
324                QueueType::StatusCheckStellar.concurrency_env_key(),
325                QueueType::StatusCheckStellar.default_concurrency(),
326            ))
327            .data(app_state.clone())
328            .backend(queue.transaction_status_queue_stellar.clone())
329            .build_fn(apalis_transaction_status_handler);
330
331    let notification_queue_worker = WorkerBuilder::new(NOTIFICATION_SENDER)
332        .layer(ErrorHandlingLayer::new())
333        .enable_tracing()
334        .catch_panic()
335        .retry(
336            RetryPolicy::retries(QueueType::Notification.max_retries())
337                .with_backoff(create_backoff_from_config(NOTIFICATION_BACKOFF)?.make_backoff()),
338        )
339        .concurrency(ServerConfig::get_worker_concurrency(
340            QueueType::Notification.concurrency_env_key(),
341            QueueType::Notification.default_concurrency(),
342        ))
343        .data(app_state.clone())
344        .backend(queue.notification_queue.clone())
345        .build_fn(apalis_notification_handler);
346
347    let token_swap_request_queue_worker = WorkerBuilder::new(TOKEN_SWAP_REQUEST)
348        .layer(ErrorHandlingLayer::new())
349        .enable_tracing()
350        .catch_panic()
351        .retry(
352            RetryPolicy::retries(QueueType::TokenSwapRequest.max_retries()).with_backoff(
353                create_backoff_from_config(TOKEN_SWAP_REQUEST_BACKOFF)?.make_backoff(),
354            ),
355        )
356        .concurrency(ServerConfig::get_worker_concurrency(
357            QueueType::TokenSwapRequest.concurrency_env_key(),
358            QueueType::TokenSwapRequest.default_concurrency(),
359        ))
360        .data(app_state.clone())
361        .backend(queue.token_swap_request_queue.clone())
362        .build_fn(apalis_token_swap_request_handler);
363
364    let transaction_cleanup_queue_worker = WorkerBuilder::new(TRANSACTION_CLEANUP)
365        .layer(ErrorHandlingLayer::new())
366        .enable_tracing()
367        .catch_panic()
368        .retry(
369            RetryPolicy::retries(WORKER_TRANSACTION_CLEANUP_RETRIES)
370                .with_backoff(create_backoff_from_config(TX_CLEANUP_BACKOFF)?.make_backoff()),
371        )
372        .concurrency(ServerConfig::get_worker_concurrency(TRANSACTION_CLEANUP, 1)) // Default to 1 to avoid DB conflicts
373        .data(app_state.clone())
374        .backend(CronStream::new(
375            apalis_cron::Schedule::from_str(TRANSACTION_CLEANUP_CRON_SCHEDULE)?,
376        ))
377        .build_fn(apalis_transaction_cleanup_handler);
378
379    let system_cleanup_queue_worker = WorkerBuilder::new(SYSTEM_CLEANUP)
380        .layer(ErrorHandlingLayer::new())
381        .enable_tracing()
382        .catch_panic()
383        .retry(
384            RetryPolicy::retries(WORKER_SYSTEM_CLEANUP_RETRIES)
385                .with_backoff(create_backoff_from_config(SYSTEM_CLEANUP_BACKOFF)?.make_backoff()),
386        )
387        .concurrency(1)
388        .data(app_state.clone())
389        .backend(CronStream::new(apalis_cron::Schedule::from_str(
390            SYSTEM_CLEANUP_CRON_SCHEDULE,
391        )?))
392        .build_fn(apalis_system_cleanup_handler);
393
394    let relayer_health_check_worker = WorkerBuilder::new(RELAYER_HEALTH_CHECK)
395        .layer(ErrorHandlingLayer::new())
396        .enable_tracing()
397        .catch_panic()
398        .retry(
399            RetryPolicy::retries(QueueType::RelayerHealthCheck.max_retries())
400                .with_backoff(create_backoff_from_config(RELAYER_HEALTH_BACKOFF)?.make_backoff()),
401        )
402        .concurrency(ServerConfig::get_worker_concurrency(
403            QueueType::RelayerHealthCheck.concurrency_env_key(),
404            QueueType::RelayerHealthCheck.default_concurrency(),
405        ))
406        .data(app_state.clone())
407        .backend(queue.relayer_health_check_queue.clone())
408        .build_fn(apalis_relayer_health_check_handler);
409
410    let monitor = Monitor::new()
411        .register(transaction_request_queue_worker)
412        .register(transaction_submission_queue_worker)
413        .register(transaction_status_queue_worker)
414        .register(transaction_status_queue_worker_evm)
415        .register(transaction_status_queue_worker_stellar)
416        .register(notification_queue_worker)
417        .register(token_swap_request_queue_worker)
418        .register(transaction_cleanup_queue_worker)
419        .register(system_cleanup_queue_worker)
420        .register(relayer_health_check_worker)
421        .on_event(monitor_handle_event)
422        .shutdown_timeout(Duration::from_millis(5000));
423
424    let monitor_future = monitor.run_with_signal(async {
425        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
426            .map_err(|e| std::io::Error::other(format!("Failed to create SIGINT signal: {e}")))?;
427        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
428            .map_err(|e| std::io::Error::other(format!("Failed to create SIGTERM signal: {e}")))?;
429
430        debug!("Workers monitor started");
431
432        tokio::select! {
433            _ = sigint.recv() => debug!("Received SIGINT."),
434            _ = sigterm.recv() => debug!("Received SIGTERM."),
435        };
436
437        debug!("Workers monitor shutting down");
438
439        Ok(())
440    });
441    tokio::spawn(async move {
442        if let Err(e) = monitor_future.await {
443            error!(error = %e, "monitor error");
444        }
445    });
446    debug!("Workers monitor shutdown complete");
447
448    Ok(())
449}
450
451/// Initializes swap workers for Solana and Stellar relayers.
452/// This function creates and registers workers for relayers that have swap enabled and cron schedule set.
453pub async fn initialize_redis_token_swap_workers<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
454    app_state: ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>,
455) -> Result<()>
456where
457    J: JobProducerTrait + Send + Sync + 'static,
458    RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
459    TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
460    NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
461    NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
462    SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
463    TCR: TransactionCounterTrait + Send + Sync + 'static,
464    PR: PluginRepositoryTrait + Send + Sync + 'static,
465    AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
466{
467    let active_relayers = app_state.relayer_repository.list_active().await?;
468    let relayers_with_swap_enabled = filter_relayers_for_swap(active_relayers);
469
470    if relayers_with_swap_enabled.is_empty() {
471        debug!("No relayers with swap enabled");
472        return Ok(());
473    }
474    info!(
475        "Found {} relayers with swap enabled",
476        relayers_with_swap_enabled.len()
477    );
478
479    let mut workers = Vec::new();
480
481    let swap_backoff = create_backoff_from_config(TOKEN_SWAP_CRON_BACKOFF)?.make_backoff();
482
483    for relayer in relayers_with_swap_enabled {
484        debug!(relayer = ?relayer, "found relayer with swap enabled");
485
486        let (cron_schedule, network_type) = match &relayer.policies {
487            RelayerNetworkPolicy::Solana(policy) => match policy.get_swap_config() {
488                Some(config) => match config.cron_schedule {
489                    Some(schedule) => (schedule, "solana".to_string()),
490                    None => {
491                        debug!(relayer_id = %relayer.id, "No cron schedule specified for Solana relayer; skipping");
492                        continue;
493                    }
494                },
495                None => {
496                    debug!(relayer_id = %relayer.id, "No swap configuration specified for Solana relayer; skipping");
497                    continue;
498                }
499            },
500            RelayerNetworkPolicy::Stellar(policy) => match policy.get_swap_config() {
501                Some(config) => match config.cron_schedule {
502                    Some(schedule) => (schedule, "stellar".to_string()),
503                    None => {
504                        debug!(relayer_id = %relayer.id, "No cron schedule specified for Stellar relayer; skipping");
505                        continue;
506                    }
507                },
508                None => {
509                    debug!(relayer_id = %relayer.id, "No swap configuration specified for Stellar relayer; skipping");
510                    continue;
511                }
512            },
513            RelayerNetworkPolicy::Evm(_) => {
514                debug!(relayer_id = %relayer.id, "EVM relayers do not support swap; skipping");
515                continue;
516            }
517        };
518
519        let calendar_schedule = match apalis_cron::Schedule::from_str(&cron_schedule) {
520            Ok(schedule) => schedule,
521            Err(e) => {
522                error!(relayer_id = %relayer.id, error = %e, "Failed to parse cron schedule; skipping");
523                continue;
524            }
525        };
526
527        // Create worker and add to the workers vector
528        let worker = WorkerBuilder::new(format!(
529            "{}-swap-schedule-{}",
530            network_type,
531            relayer.id.clone()
532        ))
533        .layer(ErrorHandlingLayer::new())
534        .enable_tracing()
535        .catch_panic()
536        .retry(
537            RetryPolicy::retries(WORKER_TOKEN_SWAP_REQUEST_RETRIES)
538                .with_backoff(swap_backoff.clone()),
539        )
540        .concurrency(1)
541        .data(relayer.id.clone())
542        .data(app_state.clone())
543        .backend(CronStream::new(calendar_schedule))
544        .build_fn(apalis_token_swap_cron_handler);
545
546        workers.push(worker);
547        debug!(
548            relayer_id = %relayer.id,
549            network_type = %network_type,
550            "Created worker for relayer with swap enabled"
551        );
552    }
553
554    let mut monitor = Monitor::new()
555        .on_event(monitor_handle_event)
556        .shutdown_timeout(Duration::from_millis(5000));
557
558    // Register all workers with the monitor
559    for worker in workers {
560        monitor = monitor.register(worker);
561    }
562
563    let monitor_future = monitor.run_with_signal(async {
564        let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())
565            .map_err(|e| std::io::Error::other(format!("Failed to create SIGINT signal: {e}")))?;
566        let mut sigterm = tokio::signal::unix::signal(SignalKind::terminate())
567            .map_err(|e| std::io::Error::other(format!("Failed to create SIGTERM signal: {e}")))?;
568
569        debug!("Swap Monitor started");
570
571        tokio::select! {
572            _ = sigint.recv() => debug!("Received SIGINT."),
573            _ = sigterm.recv() => debug!("Received SIGTERM."),
574        };
575
576        debug!("Swap Monitor shutting down");
577
578        Ok(())
579    });
580    tokio::spawn(async move {
581        if let Err(e) = monitor_future.await {
582            error!(error = %e, "monitor error");
583        }
584    });
585    Ok(())
586}
587
588fn monitor_handle_event(e: Worker<Event>) {
589    let worker_id = e.id();
590    match e.inner() {
591        Event::Engage(task_id) => {
592            debug!(worker_id = %worker_id, task_id = %task_id, "worker got a job");
593        }
594        Event::Error(e) => {
595            error!(worker_id = %worker_id, error = %e, "worker encountered an error");
596        }
597        Event::Exit => {
598            debug!(worker_id = %worker_id, "worker exited");
599        }
600        Event::Idle => {
601            debug!(worker_id = %worker_id, "worker is idle");
602        }
603        Event::Start => {
604            debug!(worker_id = %worker_id, "worker started");
605        }
606        Event::Stop => {
607            debug!(worker_id = %worker_id, "worker stopped");
608        }
609        _ => {}
610    }
611}
612
613#[cfg(test)]
614mod tests {
615    use super::*;
616    use crate::queues::retry_config::{
617        NOTIFICATION_BACKOFF, RELAYER_HEALTH_BACKOFF, STATUS_EVM_BACKOFF, STATUS_GENERIC_BACKOFF,
618        STATUS_STELLAR_BACKOFF, SYSTEM_CLEANUP_BACKOFF, TOKEN_SWAP_CRON_BACKOFF,
619        TOKEN_SWAP_REQUEST_BACKOFF, TX_CLEANUP_BACKOFF, TX_REQUEST_BACKOFF, TX_SUBMISSION_BACKOFF,
620    };
621
622    // ── create_backoff tests ───────────────────────────────────────────
623
624    #[test]
625    fn test_create_backoff_with_valid_parameters() {
626        let result = create_backoff(200, 5000, 0.99);
627        assert!(
628            result.is_ok(),
629            "Should create backoff with valid parameters"
630        );
631    }
632
633    #[test]
634    fn test_create_backoff_with_zero_initial() {
635        let result = create_backoff(0, 5000, 0.99);
636        assert!(
637            result.is_ok(),
638            "Should handle zero initial delay (edge case)"
639        );
640    }
641
642    #[test]
643    fn test_create_backoff_with_equal_initial_and_max() {
644        let result = create_backoff(1000, 1000, 0.5);
645        assert!(result.is_ok(), "Should handle equal initial and max delays");
646    }
647
648    #[test]
649    fn test_create_backoff_with_zero_jitter() {
650        let result = create_backoff(500, 5000, 0.0);
651        assert!(result.is_ok(), "Should handle zero jitter");
652    }
653
654    #[test]
655    fn test_create_backoff_with_max_jitter() {
656        let result = create_backoff(500, 5000, 1.0);
657        assert!(result.is_ok(), "Should handle maximum jitter (1.0)");
658    }
659
660    #[test]
661    fn test_create_backoff_with_small_values() {
662        let result = create_backoff(1, 10, 0.5);
663        assert!(result.is_ok(), "Should handle very small delay values");
664    }
665
666    #[test]
667    fn test_create_backoff_with_large_values() {
668        let result = create_backoff(10000, 60000, 0.99);
669        assert!(result.is_ok(), "Should handle large delay values");
670    }
671
672    #[test]
673    fn test_create_backoff_from_config_profiles() {
674        let profiles = [
675            TX_REQUEST_BACKOFF,
676            TX_SUBMISSION_BACKOFF,
677            STATUS_GENERIC_BACKOFF,
678            STATUS_EVM_BACKOFF,
679            STATUS_STELLAR_BACKOFF,
680            NOTIFICATION_BACKOFF,
681            TOKEN_SWAP_REQUEST_BACKOFF,
682            TX_CLEANUP_BACKOFF,
683            SYSTEM_CLEANUP_BACKOFF,
684            RELAYER_HEALTH_BACKOFF,
685            TOKEN_SWAP_CRON_BACKOFF,
686        ];
687
688        for cfg in profiles {
689            let result = create_backoff_from_config(cfg);
690            assert!(
691                result.is_ok(),
692                "backoff profile should be constructible: {:?}",
693                cfg
694            );
695        }
696    }
697
698    #[test]
699    fn test_create_backoff_from_config_produces_usable_backoff() {
700        let profiles = [
701            TX_REQUEST_BACKOFF,
702            TX_SUBMISSION_BACKOFF,
703            STATUS_GENERIC_BACKOFF,
704            STATUS_EVM_BACKOFF,
705            STATUS_STELLAR_BACKOFF,
706            NOTIFICATION_BACKOFF,
707            TOKEN_SWAP_REQUEST_BACKOFF,
708            TX_CLEANUP_BACKOFF,
709            SYSTEM_CLEANUP_BACKOFF,
710            RELAYER_HEALTH_BACKOFF,
711            TOKEN_SWAP_CRON_BACKOFF,
712        ];
713
714        for cfg in profiles {
715            let mut maker = create_backoff_from_config(cfg).unwrap();
716            // Calling make_backoff() should not panic
717            let _backoff = maker.make_backoff();
718        }
719    }
720
721    #[test]
722    fn test_create_backoff_with_initial_greater_than_max_errors() {
723        let result = create_backoff(10000, 100, 0.5);
724        assert!(
725            result.is_err(),
726            "initial > max should be rejected by ExponentialBackoffMaker"
727        );
728    }
729
730    // ── Backoff config invariant tests ─────────────────────────────────
731
732    #[test]
733    fn test_all_backoff_configs_have_valid_initial_le_max() {
734        let profiles: &[(&str, RetryBackoffConfig)] = &[
735            ("TX_REQUEST", TX_REQUEST_BACKOFF),
736            ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
737            ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
738            ("STATUS_EVM", STATUS_EVM_BACKOFF),
739            ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
740            ("NOTIFICATION", NOTIFICATION_BACKOFF),
741            ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
742            ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
743            ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
744            ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
745            ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
746        ];
747
748        for (name, cfg) in profiles {
749            assert!(
750                cfg.initial_ms <= cfg.max_ms,
751                "{name}: initial_ms ({}) must be <= max_ms ({})",
752                cfg.initial_ms,
753                cfg.max_ms
754            );
755        }
756    }
757
758    #[test]
759    fn test_all_backoff_configs_have_valid_jitter_range() {
760        let profiles: &[(&str, RetryBackoffConfig)] = &[
761            ("TX_REQUEST", TX_REQUEST_BACKOFF),
762            ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
763            ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
764            ("STATUS_EVM", STATUS_EVM_BACKOFF),
765            ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
766            ("NOTIFICATION", NOTIFICATION_BACKOFF),
767            ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
768            ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
769            ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
770            ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
771            ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
772        ];
773
774        for (name, cfg) in profiles {
775            assert!(
776                (0.0..=1.0).contains(&cfg.jitter),
777                "{name}: jitter ({}) must be in [0.0, 1.0]",
778                cfg.jitter
779            );
780        }
781    }
782
783    #[test]
784    fn test_all_backoff_configs_have_positive_initial_ms() {
785        let profiles: &[(&str, RetryBackoffConfig)] = &[
786            ("TX_REQUEST", TX_REQUEST_BACKOFF),
787            ("TX_SUBMISSION", TX_SUBMISSION_BACKOFF),
788            ("STATUS_GENERIC", STATUS_GENERIC_BACKOFF),
789            ("STATUS_EVM", STATUS_EVM_BACKOFF),
790            ("STATUS_STELLAR", STATUS_STELLAR_BACKOFF),
791            ("NOTIFICATION", NOTIFICATION_BACKOFF),
792            ("TOKEN_SWAP_REQUEST", TOKEN_SWAP_REQUEST_BACKOFF),
793            ("TX_CLEANUP", TX_CLEANUP_BACKOFF),
794            ("SYSTEM_CLEANUP", SYSTEM_CLEANUP_BACKOFF),
795            ("RELAYER_HEALTH", RELAYER_HEALTH_BACKOFF),
796            ("TOKEN_SWAP_CRON", TOKEN_SWAP_CRON_BACKOFF),
797        ];
798
799        for (name, cfg) in profiles {
800            assert!(
801                cfg.initial_ms > 0,
802                "{name}: initial_ms must be positive, got {}",
803                cfg.initial_ms
804            );
805        }
806    }
807
808    // ── Worker name constant tests ─────────────────────────────────────
809
810    #[test]
811    fn test_worker_name_constants_are_nonempty() {
812        let names = [
813            TRANSACTION_REQUEST,
814            TRANSACTION_SENDER,
815            TRANSACTION_STATUS_CHECKER,
816            TRANSACTION_STATUS_CHECKER_EVM,
817            TRANSACTION_STATUS_CHECKER_STELLAR,
818            NOTIFICATION_SENDER,
819            TOKEN_SWAP_REQUEST,
820            TRANSACTION_CLEANUP,
821            RELAYER_HEALTH_CHECK,
822            SYSTEM_CLEANUP,
823        ];
824
825        for name in &names {
826            assert!(!name.is_empty(), "Worker name constant must not be empty");
827        }
828    }
829
830    #[test]
831    fn test_worker_name_constants_are_unique() {
832        let names = [
833            TRANSACTION_REQUEST,
834            TRANSACTION_SENDER,
835            TRANSACTION_STATUS_CHECKER,
836            TRANSACTION_STATUS_CHECKER_EVM,
837            TRANSACTION_STATUS_CHECKER_STELLAR,
838            NOTIFICATION_SENDER,
839            TOKEN_SWAP_REQUEST,
840            TRANSACTION_CLEANUP,
841            RELAYER_HEALTH_CHECK,
842            SYSTEM_CLEANUP,
843        ];
844
845        for (i, a) in names.iter().enumerate() {
846            for (j, b) in names.iter().enumerate() {
847                if i != j {
848                    assert_ne!(
849                        a, b,
850                        "Worker names must be unique: '{}' at index {} and {}",
851                        a, i, j
852                    );
853                }
854            }
855        }
856    }
857
858    #[test]
859    fn test_worker_names_match_concurrency_env_keys() {
860        // The WorkerBuilder name for each queue-type-backed worker should match
861        // the concurrency_env_key used with ServerConfig::get_worker_concurrency,
862        // so that concurrency configuration picks up the correct env var.
863        assert_eq!(
864            TRANSACTION_REQUEST,
865            QueueType::TransactionRequest.concurrency_env_key()
866        );
867        assert_eq!(
868            TRANSACTION_SENDER,
869            QueueType::TransactionSubmission.concurrency_env_key()
870        );
871        assert_eq!(
872            TRANSACTION_STATUS_CHECKER,
873            QueueType::StatusCheck.concurrency_env_key()
874        );
875        assert_eq!(
876            TRANSACTION_STATUS_CHECKER_EVM,
877            QueueType::StatusCheckEvm.concurrency_env_key()
878        );
879        assert_eq!(
880            TRANSACTION_STATUS_CHECKER_STELLAR,
881            QueueType::StatusCheckStellar.concurrency_env_key()
882        );
883        assert_eq!(
884            NOTIFICATION_SENDER,
885            QueueType::Notification.concurrency_env_key()
886        );
887        assert_eq!(
888            TOKEN_SWAP_REQUEST,
889            QueueType::TokenSwapRequest.concurrency_env_key()
890        );
891        assert_eq!(
892            RELAYER_HEALTH_CHECK,
893            QueueType::RelayerHealthCheck.concurrency_env_key()
894        );
895    }
896
897    // ── monitor_handle_event tests ─────────────────────────────────────
898
899    fn make_worker_event(event: Event) -> Worker<Event> {
900        let worker_id = WorkerId::from_str("test-worker").unwrap();
901        Worker::new(worker_id, event)
902    }
903
904    #[test]
905    fn test_monitor_handle_event_start_does_not_panic() {
906        monitor_handle_event(make_worker_event(Event::Start));
907    }
908
909    #[test]
910    fn test_monitor_handle_event_engage_does_not_panic() {
911        let task_id = TaskId::new();
912        monitor_handle_event(make_worker_event(Event::Engage(task_id)));
913    }
914
915    #[test]
916    fn test_monitor_handle_event_idle_does_not_panic() {
917        monitor_handle_event(make_worker_event(Event::Idle));
918    }
919
920    #[test]
921    fn test_monitor_handle_event_error_does_not_panic() {
922        let error: Box<dyn std::error::Error + Send + Sync> = "test error".to_string().into();
923        monitor_handle_event(make_worker_event(Event::Error(error)));
924    }
925
926    #[test]
927    fn test_monitor_handle_event_stop_does_not_panic() {
928        monitor_handle_event(make_worker_event(Event::Stop));
929    }
930
931    #[test]
932    fn test_monitor_handle_event_exit_does_not_panic() {
933        monitor_handle_event(make_worker_event(Event::Exit));
934    }
935
936    #[test]
937    fn test_monitor_handle_event_custom_does_not_panic() {
938        monitor_handle_event(make_worker_event(Event::Custom("test-custom".to_string())));
939    }
940}