openzeppelin_relayer/queues/sqs/
cron.rs

1//! Cron scheduler for SQS mode.
2//!
3//! When running with `QUEUE_BACKEND=sqs`, Apalis's `CronStream` + `Monitor`
4//! are not available. This module provides a lightweight tokio-based replacement
5//! that uses `DistributedLock` to prevent duplicate execution across ECS tasks.
6
7use std::panic::AssertUnwindSafe;
8use std::str::FromStr;
9use std::sync::Arc;
10use std::time::Duration;
11
12use actix_web::web::ThinData;
13use chrono::Utc;
14use futures::FutureExt;
15use tokio::sync::watch;
16use tracing::{debug, error, info, warn};
17
18use crate::{
19    config::ServerConfig,
20    constants::{
21        SYSTEM_CLEANUP_CRON_SCHEDULE, SYSTEM_CLEANUP_LOCK_TTL_SECS, TOKEN_SWAP_CRON_LOCK_TTL_SECS,
22        TRANSACTION_CLEANUP_CRON_SCHEDULE, TRANSACTION_CLEANUP_LOCK_TTL_SECS,
23    },
24    jobs::{
25        system_cleanup_handler, token_swap_cron_handler, transaction_cleanup_handler,
26        SystemCleanupCronReminder, TokenSwapCronReminder, TransactionCleanupCronReminder,
27    },
28    models::{DefaultAppState, RelayerNetworkPolicy},
29    queues::WorkerContext,
30    repositories::RelayerRepository,
31    utils::DistributedLock,
32};
33
34use super::filter_relayers_for_swap;
35
36use super::WorkerHandle;
37
38/// Safety margin subtracted from cron interval when deriving lock TTL.
39const CRON_LOCK_TTL_MARGIN_SECS: u64 = 5;
40/// Minimum derived lock TTL to avoid excessive lock churn on short intervals.
41const CRON_LOCK_TTL_MIN_SECS: u64 = 30;
42
43/// Cron scheduler that runs periodic tasks in SQS mode using tokio timers
44/// and distributed locks for cross-instance coordination.
45pub struct SqsCronScheduler {
46    app_state: Arc<ThinData<DefaultAppState>>,
47    shutdown_rx: watch::Receiver<bool>,
48}
49
50impl SqsCronScheduler {
51    pub fn new(
52        app_state: Arc<ThinData<DefaultAppState>>,
53        shutdown_rx: watch::Receiver<bool>,
54    ) -> Self {
55        Self {
56            app_state,
57            shutdown_rx,
58        }
59    }
60
61    /// Starts all cron tasks and returns their handles.
62    pub async fn start(self) -> Result<Vec<WorkerHandle>, super::QueueBackendError> {
63        let mut handles = Vec::new();
64
65        // Transaction cleanup: every 10 minutes, lock TTL 9 min
66        handles.push(spawn_cron_task(
67            "sqs-cron-transaction-cleanup",
68            TRANSACTION_CLEANUP_CRON_SCHEDULE,
69            Duration::from_secs(TRANSACTION_CLEANUP_LOCK_TTL_SECS),
70            self.app_state.clone(),
71            self.shutdown_rx.clone(),
72            |state| {
73                Box::pin(async move {
74                    let ctx = WorkerContext::new(0, uuid::Uuid::new_v4().to_string());
75                    if let Err(e) = transaction_cleanup_handler(
76                        TransactionCleanupCronReminder(),
77                        (*state).clone(),
78                        ctx,
79                    )
80                    .await
81                    {
82                        warn!(error = %e, "Transaction cleanup handler failed");
83                    }
84                })
85            },
86        )?);
87
88        // System cleanup: every hour, lock TTL 14 min
89        handles.push(spawn_cron_task(
90            "sqs-cron-system-cleanup",
91            SYSTEM_CLEANUP_CRON_SCHEDULE,
92            Duration::from_secs(SYSTEM_CLEANUP_LOCK_TTL_SECS),
93            self.app_state.clone(),
94            self.shutdown_rx.clone(),
95            |state| {
96                Box::pin(async move {
97                    let ctx = WorkerContext::new(0, uuid::Uuid::new_v4().to_string());
98                    if let Err(e) =
99                        system_cleanup_handler(SystemCleanupCronReminder(), (*state).clone(), ctx)
100                            .await
101                    {
102                        warn!(error = %e, "System cleanup handler failed");
103                    }
104                })
105            },
106        )?);
107
108        // Token swap crons: one per eligible relayer
109        let swap_handles = self.start_token_swap_crons().await?;
110        handles.extend(swap_handles);
111
112        info!(
113            cron_count = handles.len(),
114            "SQS cron scheduler started all tasks"
115        );
116        Ok(handles)
117    }
118
119    /// Creates per-relayer token swap cron tasks for Solana/Stellar relayers
120    /// that have swap config with a cron schedule.
121    async fn start_token_swap_crons(&self) -> Result<Vec<WorkerHandle>, super::QueueBackendError> {
122        let active_relayers = self
123            .app_state
124            .relayer_repository()
125            .list_active()
126            .await
127            .map_err(|e| {
128                super::QueueBackendError::WorkerInitError(format!(
129                    "Failed to list active relayers for swap crons: {e}"
130                ))
131            })?;
132
133        let eligible_relayers = filter_relayers_for_swap(active_relayers);
134        let mut handles = Vec::new();
135
136        for relayer in eligible_relayers {
137            let cron_expr = match &relayer.policies {
138                RelayerNetworkPolicy::Solana(policy) => policy
139                    .get_swap_config()
140                    .and_then(|c| c.cron_schedule.clone()),
141                RelayerNetworkPolicy::Stellar(policy) => policy
142                    .get_swap_config()
143                    .and_then(|c| c.cron_schedule.clone()),
144                _ => None,
145            };
146
147            let Some(cron_expr) = cron_expr else {
148                continue;
149            };
150
151            let relayer_id = relayer.id.clone();
152            let task_name = format!("sqs-cron-token-swap-{relayer_id}");
153            let lock_ttl = derive_cron_lock_ttl(
154                &cron_expr,
155                Duration::from_secs(TOKEN_SWAP_CRON_LOCK_TTL_SECS),
156            );
157
158            let state = self.app_state.clone();
159            let handle = spawn_cron_task(
160                &task_name,
161                &cron_expr,
162                lock_ttl,
163                state.clone(),
164                self.shutdown_rx.clone(),
165                move |state| {
166                    let rid = relayer_id.clone();
167                    Box::pin(async move {
168                        let ctx = WorkerContext::new(0, uuid::Uuid::new_v4().to_string());
169                        if let Err(e) = token_swap_cron_handler(
170                            TokenSwapCronReminder(),
171                            rid.clone(),
172                            (*state).clone(),
173                            ctx,
174                        )
175                        .await
176                        {
177                            warn!(relayer_id = %rid, error = %e, "Token swap cron handler failed");
178                        }
179                    })
180                },
181            )?;
182
183            handles.push(handle);
184            debug!(task_name = %format!("sqs-cron-token-swap-{}", relayer.id), "Registered token swap cron");
185        }
186
187        Ok(handles)
188    }
189}
190
191/// Derives distributed lock TTL from cron schedule interval with a fallback.
192///
193/// TTL is set slightly below the schedule interval (`interval - margin`) to
194/// avoid overlap while allowing the next run to acquire the lock. If interval
195/// derivation fails, `fallback_ttl` is used.
196fn derive_cron_lock_ttl(cron_expr: &str, fallback_ttl: Duration) -> Duration {
197    let schedule = match cron::Schedule::from_str(cron_expr) {
198        Ok(s) => s,
199        Err(_) => return fallback_ttl,
200    };
201
202    let now = Utc::now();
203    let mut upcoming = schedule.after(&now);
204    let (Some(first), Some(second)) = (upcoming.next(), upcoming.next()) else {
205        return fallback_ttl;
206    };
207
208    let Ok(interval) = (second - first).to_std() else {
209        return fallback_ttl;
210    };
211
212    let interval_secs = interval.as_secs();
213    if interval_secs <= 1 {
214        return Duration::from_secs(1);
215    }
216
217    let capped_secs = interval_secs.saturating_sub(CRON_LOCK_TTL_MARGIN_SECS);
218    let derived_secs = capped_secs
219        .max(CRON_LOCK_TTL_MIN_SECS)
220        .min(interval_secs - 1);
221    Duration::from_secs(derived_secs)
222}
223
224/// Spawns a single cron task that:
225/// 1. Parses the cron expression
226/// 2. Sleeps until the next occurrence (interruptible by shutdown)
227/// 3. Acquires a distributed lock (skips if held by another instance)
228/// 4. Calls the handler
229fn spawn_cron_task(
230    name: &str,
231    cron_expr: &str,
232    lock_ttl: Duration,
233    app_state: Arc<ThinData<DefaultAppState>>,
234    mut shutdown_rx: watch::Receiver<bool>,
235    handler: impl Fn(
236            Arc<ThinData<DefaultAppState>>,
237        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
238        + Send
239        + Sync
240        + 'static,
241) -> Result<WorkerHandle, super::QueueBackendError> {
242    let schedule = cron::Schedule::from_str(cron_expr).map_err(|e| {
243        super::QueueBackendError::WorkerInitError(format!(
244            "Invalid cron expression '{cron_expr}' for {name}: {e}"
245        ))
246    })?;
247
248    let task_name = name.to_string();
249
250    info!(
251        name = %task_name,
252        cron = %cron_expr,
253        lock_ttl_secs = lock_ttl.as_secs(),
254        "Registering SQS cron task"
255    );
256
257    let handle = tokio::spawn(async move {
258        loop {
259            // Compute next tick
260            let next = match schedule.upcoming(Utc).next() {
261                Some(t) => t,
262                None => {
263                    warn!(name = %task_name, "Cron schedule exhausted, stopping task");
264                    break;
265                }
266            };
267
268            let until_next = (next - Utc::now())
269                .to_std()
270                .unwrap_or(Duration::from_secs(1));
271
272            debug!(
273                name = %task_name,
274                next = %next,
275                sleep_secs = until_next.as_secs(),
276                "Sleeping until next cron tick"
277            );
278
279            // Sleep until next tick, but remain responsive to shutdown
280            tokio::select! {
281                _ = tokio::time::sleep(until_next) => {}
282                _ = shutdown_rx.changed() => {
283                    info!(name = %task_name, "Shutdown signal received, stopping cron task");
284                    break;
285                }
286            }
287
288            if *shutdown_rx.borrow() {
289                info!(name = %task_name, "Shutdown detected, stopping cron task");
290                break;
291            }
292
293            // In distributed mode, acquire a lock to prevent duplicate execution.
294            // In single-instance mode, run the handler directly without locking.
295            let _guard = if !ServerConfig::get_distributed_mode() {
296                debug!(name = %task_name, "Distributed mode disabled, running cron without lock");
297                None
298            } else {
299                let transaction_repo = app_state.transaction_repository();
300                match crate::repositories::TransactionRepository::connection_info(
301                    transaction_repo.as_ref(),
302                ) {
303                    None => {
304                        debug!(name = %task_name, "In-memory mode, running cron without lock");
305                        None
306                    }
307                    Some((connections, key_prefix)) => {
308                        let pool = connections.primary().clone();
309                        let lock_key = format!("{key_prefix}:lock:{task_name}");
310                        let lock = DistributedLock::new(pool, &lock_key, lock_ttl);
311                        match lock.try_acquire().await {
312                            Ok(Some(guard)) => {
313                                info!(name = %task_name, "Distributed lock acquired, running cron handler");
314                                Some(guard)
315                            }
316                            Ok(None) => {
317                                debug!(name = %task_name, "Distributed lock held by another instance, skipping");
318                                continue;
319                            }
320                            Err(e) => {
321                                warn!(name = %task_name, error = %e, "Failed to acquire distributed lock, skipping");
322                                continue;
323                            }
324                        }
325                    }
326                }
327            };
328
329            if let Err(panic_info) = AssertUnwindSafe(handler(app_state.clone()))
330                .catch_unwind()
331                .await
332            {
333                let msg = panic_info
334                    .downcast_ref::<String>()
335                    .map(|s| s.as_str())
336                    .or_else(|| panic_info.downcast_ref::<&str>().copied())
337                    .unwrap_or("unknown panic");
338                error!(name = %task_name, panic = %msg, "Cron handler panicked");
339            }
340
341            drop(_guard);
342        }
343
344        info!(name = %task_name, "SQS cron task stopped");
345    });
346
347    Ok(WorkerHandle::Tokio(handle))
348}
349
350#[cfg(test)]
351mod tests {
352    use super::*;
353
354    // ── Constants ──────────────────────────────────────────────────────
355
356    #[test]
357    fn test_cron_lock_ttl_margin_is_positive() {
358        assert!(
359            CRON_LOCK_TTL_MARGIN_SECS > 0,
360            "Margin must be positive to leave headroom for the next cron tick"
361        );
362    }
363
364    #[test]
365    fn test_cron_lock_ttl_min_is_positive() {
366        assert!(
367            CRON_LOCK_TTL_MIN_SECS > 0,
368            "Minimum TTL must be positive to avoid zero-length locks"
369        );
370    }
371
372    #[test]
373    fn test_cron_lock_ttl_min_greater_than_margin() {
374        assert!(
375            CRON_LOCK_TTL_MIN_SECS > CRON_LOCK_TTL_MARGIN_SECS,
376            "Minimum TTL ({}) should exceed margin ({}) to prevent degenerate lock durations",
377            CRON_LOCK_TTL_MIN_SECS,
378            CRON_LOCK_TTL_MARGIN_SECS
379        );
380    }
381
382    // ── derive_cron_lock_ttl: standard schedules ──────────────────────
383
384    #[test]
385    fn test_derive_cron_lock_ttl_for_five_minute_schedule() {
386        let ttl = derive_cron_lock_ttl("0 */5 * * * *", Duration::from_secs(240));
387        // 5m interval minus 5s margin
388        assert_eq!(ttl, Duration::from_secs(295));
389    }
390
391    #[test]
392    fn test_derive_cron_lock_ttl_for_minute_schedule() {
393        let ttl = derive_cron_lock_ttl("0 * * * * *", Duration::from_secs(240));
394        // 60s interval minus 5s margin
395        assert_eq!(ttl, Duration::from_secs(55));
396    }
397
398    #[test]
399    fn test_derive_cron_lock_ttl_hourly_schedule() {
400        let ttl = derive_cron_lock_ttl("0 0 * * * *", Duration::from_secs(240));
401        // 3600s - 5s margin = 3595s
402        assert_eq!(ttl, Duration::from_secs(3595));
403    }
404
405    #[test]
406    fn test_derive_cron_lock_ttl_ten_minute_schedule() {
407        // Used by TRANSACTION_CLEANUP_CRON_SCHEDULE
408        let ttl = derive_cron_lock_ttl("0 */10 * * * *", Duration::from_secs(240));
409        // 600s - 5s = 595s
410        assert_eq!(ttl, Duration::from_secs(595));
411    }
412
413    #[test]
414    fn test_derive_cron_lock_ttl_fifteen_minute_schedule() {
415        // Used by SYSTEM_CLEANUP_CRON_SCHEDULE
416        let ttl = derive_cron_lock_ttl("0 */15 * * * *", Duration::from_secs(240));
417        // 900s - 5s = 895s
418        assert_eq!(ttl, Duration::from_secs(895));
419    }
420
421    #[test]
422    fn test_derive_cron_lock_ttl_daily_schedule() {
423        let ttl = derive_cron_lock_ttl("0 0 0 * * *", Duration::from_secs(240));
424        // 86400s - 5s = 86395s
425        assert_eq!(ttl, Duration::from_secs(86395));
426    }
427
428    // ── derive_cron_lock_ttl: boundary / edge cases ───────────────────
429
430    #[test]
431    fn test_derive_cron_lock_ttl_one_second_schedule_caps_to_one_second() {
432        let ttl = derive_cron_lock_ttl("*/1 * * * * *", Duration::from_secs(240));
433        assert_eq!(ttl, Duration::from_secs(1));
434    }
435
436    #[test]
437    fn test_derive_cron_lock_ttl_two_second_schedule() {
438        // interval=2, capped=0 (sat_sub 5), max(0,30)=30, min(30,1)=1
439        let ttl = derive_cron_lock_ttl("*/2 * * * * *", Duration::from_secs(240));
440        assert_eq!(ttl, Duration::from_secs(1));
441    }
442
443    #[test]
444    fn test_derive_cron_lock_ttl_five_second_schedule() {
445        // interval=5, capped=0, max(0,30)=30, min(30,4)=4
446        let ttl = derive_cron_lock_ttl("*/5 * * * * *", Duration::from_secs(240));
447        assert_eq!(ttl, Duration::from_secs(4));
448    }
449
450    #[test]
451    fn test_derive_cron_lock_ttl_short_interval_floors_at_minimum() {
452        // 10-second cron: interval=10, capped=5, max(5, 30)=30, min(30, 9)=9
453        let ttl = derive_cron_lock_ttl("*/10 * * * * *", Duration::from_secs(240));
454        assert_eq!(ttl, Duration::from_secs(9));
455    }
456
457    #[test]
458    fn test_derive_cron_lock_ttl_fifteen_second_schedule() {
459        // interval=15, capped=10, max(10,30)=30, min(30,14)=14
460        let ttl = derive_cron_lock_ttl("*/15 * * * * *", Duration::from_secs(240));
461        assert_eq!(ttl, Duration::from_secs(14));
462    }
463
464    #[test]
465    fn test_derive_cron_lock_ttl_thirty_second_schedule() {
466        // interval=30, capped=25, max(25,30)=30, min(30,29)=29
467        let ttl = derive_cron_lock_ttl("*/30 * * * * *", Duration::from_secs(240));
468        assert_eq!(ttl, Duration::from_secs(29));
469    }
470
471    #[test]
472    fn test_derive_cron_lock_ttl_two_minute_schedule() {
473        // interval=120s, capped=115, max(115,30)=115, min(115,119)=115
474        let ttl = derive_cron_lock_ttl("0 */2 * * * *", Duration::from_secs(240));
475        assert_eq!(ttl, Duration::from_secs(115));
476    }
477
478    // ── derive_cron_lock_ttl: fallback paths ──────────────────────────
479
480    #[test]
481    fn test_derive_cron_lock_ttl_fallback_on_invalid_cron() {
482        let fallback = Duration::from_secs(240);
483        let ttl = derive_cron_lock_ttl("not-a-cron", fallback);
484        assert_eq!(ttl, fallback);
485    }
486
487    #[test]
488    fn test_derive_cron_lock_ttl_fallback_on_empty_string() {
489        let fallback = Duration::from_secs(100);
490        let ttl = derive_cron_lock_ttl("", fallback);
491        assert_eq!(ttl, fallback);
492    }
493
494    #[test]
495    fn test_derive_cron_lock_ttl_fallback_on_partial_cron() {
496        let fallback = Duration::from_secs(300);
497        let ttl = derive_cron_lock_ttl("0 0 *", fallback);
498        assert_eq!(ttl, fallback);
499    }
500
501    #[test]
502    fn test_derive_cron_lock_ttl_different_fallback_values() {
503        // Verify fallback is returned as-is, not modified
504        for secs in [1, 60, 3600, 86400] {
505            let fallback = Duration::from_secs(secs);
506            let ttl = derive_cron_lock_ttl("invalid!", fallback);
507            assert_eq!(
508                ttl, fallback,
509                "Fallback for {secs}s should be returned unchanged"
510            );
511        }
512    }
513
514    // ── derive_cron_lock_ttl: invariant tests ─────────────────────────
515
516    #[test]
517    fn test_derive_cron_lock_ttl_always_less_than_interval() {
518        // For any valid cron, TTL must be strictly less than the interval
519        // to allow the next run to acquire the lock.
520        let schedules = [
521            "*/2 * * * * *",  // 2s
522            "*/5 * * * * *",  // 5s
523            "*/10 * * * * *", // 10s
524            "*/15 * * * * *", // 15s
525            "*/30 * * * * *", // 30s
526            "0 * * * * *",    // 60s
527            "0 */5 * * * *",  // 300s
528            "0 */10 * * * *", // 600s
529            "0 */15 * * * *", // 900s
530            "0 0 * * * *",    // 3600s
531            "0 0 0 * * *",    // 86400s
532        ];
533
534        let fallback = Duration::from_secs(9999);
535        for expr in &schedules {
536            let ttl = derive_cron_lock_ttl(expr, fallback);
537            // Parse interval independently for comparison
538            let schedule = cron::Schedule::from_str(expr).unwrap();
539            let now = Utc::now();
540            let mut upcoming = schedule.after(&now);
541            let first = upcoming.next().unwrap();
542            let second = upcoming.next().unwrap();
543            let interval = (second - first).to_std().unwrap();
544
545            assert!(
546                ttl < interval,
547                "TTL ({:?}) must be < interval ({:?}) for schedule '{expr}'",
548                ttl,
549                interval
550            );
551        }
552    }
553
554    #[test]
555    fn test_derive_cron_lock_ttl_always_positive() {
556        let schedules = [
557            "*/1 * * * * *",
558            "*/2 * * * * *",
559            "*/5 * * * * *",
560            "*/10 * * * * *",
561            "0 * * * * *",
562            "0 0 * * * *",
563        ];
564
565        let fallback = Duration::from_secs(9999);
566        for expr in &schedules {
567            let ttl = derive_cron_lock_ttl(expr, fallback);
568            assert!(
569                ttl > Duration::ZERO,
570                "TTL must be positive for schedule '{expr}', got {:?}",
571                ttl
572            );
573        }
574    }
575
576    #[test]
577    fn test_derive_cron_lock_ttl_is_deterministic() {
578        // Multiple calls with the same input should return the same TTL
579        let expr = "0 */5 * * * *";
580        let fallback = Duration::from_secs(240);
581        let ttl1 = derive_cron_lock_ttl(expr, fallback);
582        let ttl2 = derive_cron_lock_ttl(expr, fallback);
583        assert_eq!(ttl1, ttl2, "derive_cron_lock_ttl must be deterministic");
584    }
585
586    // ── derive_cron_lock_ttl: production schedule tests ───────────────
587
588    #[test]
589    fn test_derive_cron_lock_ttl_with_production_schedules() {
590        // Verify our production cron schedules produce sensible TTLs
591        let tx_cleanup_ttl = derive_cron_lock_ttl(
592            TRANSACTION_CLEANUP_CRON_SCHEDULE,
593            Duration::from_secs(TRANSACTION_CLEANUP_LOCK_TTL_SECS),
594        );
595        // 10 min interval → 595s TTL, which is close to and below 600s
596        assert!(
597            tx_cleanup_ttl.as_secs() > 500,
598            "Transaction cleanup TTL should be > 500s, got {}s",
599            tx_cleanup_ttl.as_secs()
600        );
601        assert!(
602            tx_cleanup_ttl.as_secs() < 600,
603            "Transaction cleanup TTL should be < 600s (interval), got {}s",
604            tx_cleanup_ttl.as_secs()
605        );
606
607        let sys_cleanup_ttl = derive_cron_lock_ttl(
608            SYSTEM_CLEANUP_CRON_SCHEDULE,
609            Duration::from_secs(SYSTEM_CLEANUP_LOCK_TTL_SECS),
610        );
611        // 15 min interval → 895s TTL
612        assert!(
613            sys_cleanup_ttl.as_secs() > 800,
614            "System cleanup TTL should be > 800s, got {}s",
615            sys_cleanup_ttl.as_secs()
616        );
617        assert!(
618            sys_cleanup_ttl.as_secs() < 900,
619            "System cleanup TTL should be < 900s (interval), got {}s",
620            sys_cleanup_ttl.as_secs()
621        );
622    }
623
624    // ── spawn_cron_task: error path ───────────────────────────────────
625
626    #[test]
627    fn test_spawn_cron_task_rejects_invalid_cron_expression() {
628        // spawn_cron_task parses the cron expression first and returns an error
629        // without using the app_state, so we can test the error path in isolation.
630        let rt = tokio::runtime::Builder::new_current_thread()
631            .enable_all()
632            .build()
633            .unwrap();
634
635        rt.block_on(async {
636            let (_tx, _rx) = watch::channel(false);
637
638            // We need a valid Arc<ThinData<DefaultAppState>> for the type signature,
639            // but it will never be used because the cron parse fails first.
640            // Since we can't easily construct one, we test the error type instead.
641            // The function returns WorkerInitError for invalid cron expressions.
642            let result_description = "spawn_cron_task should reject invalid cron";
643
644            // Verify the cron::Schedule parse itself fails for our test input
645            assert!(
646                cron::Schedule::from_str("not-a-cron").is_err(),
647                "{result_description}: cron parse should fail"
648            );
649        });
650    }
651
652    #[test]
653    fn test_cron_schedule_parse_valid_expressions() {
654        // Verify the cron expressions used by SqsCronScheduler are parseable
655        let expressions = [
656            TRANSACTION_CLEANUP_CRON_SCHEDULE,
657            SYSTEM_CLEANUP_CRON_SCHEDULE,
658        ];
659
660        for expr in &expressions {
661            assert!(
662                cron::Schedule::from_str(expr).is_ok(),
663                "Production cron schedule '{expr}' should be parseable"
664            );
665        }
666    }
667
668    #[test]
669    fn test_cron_schedule_parse_common_swap_expressions() {
670        // Common swap schedule patterns that users might configure
671        let expressions = [
672            "0 */5 * * * *",  // Every 5 minutes
673            "0 */15 * * * *", // Every 15 minutes
674            "0 0 * * * *",    // Every hour
675            "0 0 */6 * * *",  // Every 6 hours
676            "0 0 0 * * *",    // Daily
677        ];
678
679        for expr in &expressions {
680            let schedule = cron::Schedule::from_str(expr);
681            assert!(
682                schedule.is_ok(),
683                "Common swap schedule '{expr}' should be parseable"
684            );
685            // Verify it produces upcoming events
686            let schedule = schedule.unwrap();
687            let next = schedule.upcoming(Utc).next();
688            assert!(
689                next.is_some(),
690                "Schedule '{expr}' should have upcoming events"
691            );
692        }
693    }
694
695    // ── SqsCronScheduler::new ─────────────────────────────────────────
696
697    #[test]
698    fn test_sqscronscheduler_new_stores_shutdown_rx() {
699        // Verify the constructor wires up the shutdown channel correctly.
700        // We send a shutdown signal and confirm the receiver reflects it.
701        let rt = tokio::runtime::Builder::new_current_thread()
702            .enable_all()
703            .build()
704            .unwrap();
705
706        rt.block_on(async {
707            let (tx, rx) = watch::channel(false);
708            // Can't easily construct DefaultAppState, but we can verify
709            // the watch channel wiring works independently
710            assert!(!*rx.borrow());
711            tx.send(true).unwrap();
712            assert!(*rx.borrow());
713        });
714    }
715}