1use std::panic::AssertUnwindSafe;
8use std::str::FromStr;
9use std::sync::Arc;
10use std::time::Duration;
11
12use actix_web::web::ThinData;
13use chrono::Utc;
14use futures::FutureExt;
15use tokio::sync::watch;
16use tracing::{debug, error, info, warn};
17
18use crate::{
19 config::ServerConfig,
20 constants::{
21 SYSTEM_CLEANUP_CRON_SCHEDULE, SYSTEM_CLEANUP_LOCK_TTL_SECS, TOKEN_SWAP_CRON_LOCK_TTL_SECS,
22 TRANSACTION_CLEANUP_CRON_SCHEDULE, TRANSACTION_CLEANUP_LOCK_TTL_SECS,
23 },
24 jobs::{
25 system_cleanup_handler, token_swap_cron_handler, transaction_cleanup_handler,
26 SystemCleanupCronReminder, TokenSwapCronReminder, TransactionCleanupCronReminder,
27 },
28 models::{DefaultAppState, RelayerNetworkPolicy},
29 queues::WorkerContext,
30 repositories::RelayerRepository,
31 utils::DistributedLock,
32};
33
34use super::filter_relayers_for_swap;
35
36use super::WorkerHandle;
37
38const CRON_LOCK_TTL_MARGIN_SECS: u64 = 5;
40const CRON_LOCK_TTL_MIN_SECS: u64 = 30;
42
43pub struct SqsCronScheduler {
46 app_state: Arc<ThinData<DefaultAppState>>,
47 shutdown_rx: watch::Receiver<bool>,
48}
49
50impl SqsCronScheduler {
51 pub fn new(
52 app_state: Arc<ThinData<DefaultAppState>>,
53 shutdown_rx: watch::Receiver<bool>,
54 ) -> Self {
55 Self {
56 app_state,
57 shutdown_rx,
58 }
59 }
60
61 pub async fn start(self) -> Result<Vec<WorkerHandle>, super::QueueBackendError> {
63 let mut handles = Vec::new();
64
65 handles.push(spawn_cron_task(
67 "sqs-cron-transaction-cleanup",
68 TRANSACTION_CLEANUP_CRON_SCHEDULE,
69 Duration::from_secs(TRANSACTION_CLEANUP_LOCK_TTL_SECS),
70 self.app_state.clone(),
71 self.shutdown_rx.clone(),
72 |state| {
73 Box::pin(async move {
74 let ctx = WorkerContext::new(0, uuid::Uuid::new_v4().to_string());
75 if let Err(e) = transaction_cleanup_handler(
76 TransactionCleanupCronReminder(),
77 (*state).clone(),
78 ctx,
79 )
80 .await
81 {
82 warn!(error = %e, "Transaction cleanup handler failed");
83 }
84 })
85 },
86 )?);
87
88 handles.push(spawn_cron_task(
90 "sqs-cron-system-cleanup",
91 SYSTEM_CLEANUP_CRON_SCHEDULE,
92 Duration::from_secs(SYSTEM_CLEANUP_LOCK_TTL_SECS),
93 self.app_state.clone(),
94 self.shutdown_rx.clone(),
95 |state| {
96 Box::pin(async move {
97 let ctx = WorkerContext::new(0, uuid::Uuid::new_v4().to_string());
98 if let Err(e) =
99 system_cleanup_handler(SystemCleanupCronReminder(), (*state).clone(), ctx)
100 .await
101 {
102 warn!(error = %e, "System cleanup handler failed");
103 }
104 })
105 },
106 )?);
107
108 let swap_handles = self.start_token_swap_crons().await?;
110 handles.extend(swap_handles);
111
112 info!(
113 cron_count = handles.len(),
114 "SQS cron scheduler started all tasks"
115 );
116 Ok(handles)
117 }
118
119 async fn start_token_swap_crons(&self) -> Result<Vec<WorkerHandle>, super::QueueBackendError> {
122 let active_relayers = self
123 .app_state
124 .relayer_repository()
125 .list_active()
126 .await
127 .map_err(|e| {
128 super::QueueBackendError::WorkerInitError(format!(
129 "Failed to list active relayers for swap crons: {e}"
130 ))
131 })?;
132
133 let eligible_relayers = filter_relayers_for_swap(active_relayers);
134 let mut handles = Vec::new();
135
136 for relayer in eligible_relayers {
137 let cron_expr = match &relayer.policies {
138 RelayerNetworkPolicy::Solana(policy) => policy
139 .get_swap_config()
140 .and_then(|c| c.cron_schedule.clone()),
141 RelayerNetworkPolicy::Stellar(policy) => policy
142 .get_swap_config()
143 .and_then(|c| c.cron_schedule.clone()),
144 _ => None,
145 };
146
147 let Some(cron_expr) = cron_expr else {
148 continue;
149 };
150
151 let relayer_id = relayer.id.clone();
152 let task_name = format!("sqs-cron-token-swap-{relayer_id}");
153 let lock_ttl = derive_cron_lock_ttl(
154 &cron_expr,
155 Duration::from_secs(TOKEN_SWAP_CRON_LOCK_TTL_SECS),
156 );
157
158 let state = self.app_state.clone();
159 let handle = spawn_cron_task(
160 &task_name,
161 &cron_expr,
162 lock_ttl,
163 state.clone(),
164 self.shutdown_rx.clone(),
165 move |state| {
166 let rid = relayer_id.clone();
167 Box::pin(async move {
168 let ctx = WorkerContext::new(0, uuid::Uuid::new_v4().to_string());
169 if let Err(e) = token_swap_cron_handler(
170 TokenSwapCronReminder(),
171 rid.clone(),
172 (*state).clone(),
173 ctx,
174 )
175 .await
176 {
177 warn!(relayer_id = %rid, error = %e, "Token swap cron handler failed");
178 }
179 })
180 },
181 )?;
182
183 handles.push(handle);
184 debug!(task_name = %format!("sqs-cron-token-swap-{}", relayer.id), "Registered token swap cron");
185 }
186
187 Ok(handles)
188 }
189}
190
191fn derive_cron_lock_ttl(cron_expr: &str, fallback_ttl: Duration) -> Duration {
197 let schedule = match cron::Schedule::from_str(cron_expr) {
198 Ok(s) => s,
199 Err(_) => return fallback_ttl,
200 };
201
202 let now = Utc::now();
203 let mut upcoming = schedule.after(&now);
204 let (Some(first), Some(second)) = (upcoming.next(), upcoming.next()) else {
205 return fallback_ttl;
206 };
207
208 let Ok(interval) = (second - first).to_std() else {
209 return fallback_ttl;
210 };
211
212 let interval_secs = interval.as_secs();
213 if interval_secs <= 1 {
214 return Duration::from_secs(1);
215 }
216
217 let capped_secs = interval_secs.saturating_sub(CRON_LOCK_TTL_MARGIN_SECS);
218 let derived_secs = capped_secs
219 .max(CRON_LOCK_TTL_MIN_SECS)
220 .min(interval_secs - 1);
221 Duration::from_secs(derived_secs)
222}
223
224fn spawn_cron_task(
230 name: &str,
231 cron_expr: &str,
232 lock_ttl: Duration,
233 app_state: Arc<ThinData<DefaultAppState>>,
234 mut shutdown_rx: watch::Receiver<bool>,
235 handler: impl Fn(
236 Arc<ThinData<DefaultAppState>>,
237 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>>
238 + Send
239 + Sync
240 + 'static,
241) -> Result<WorkerHandle, super::QueueBackendError> {
242 let schedule = cron::Schedule::from_str(cron_expr).map_err(|e| {
243 super::QueueBackendError::WorkerInitError(format!(
244 "Invalid cron expression '{cron_expr}' for {name}: {e}"
245 ))
246 })?;
247
248 let task_name = name.to_string();
249
250 info!(
251 name = %task_name,
252 cron = %cron_expr,
253 lock_ttl_secs = lock_ttl.as_secs(),
254 "Registering SQS cron task"
255 );
256
257 let handle = tokio::spawn(async move {
258 loop {
259 let next = match schedule.upcoming(Utc).next() {
261 Some(t) => t,
262 None => {
263 warn!(name = %task_name, "Cron schedule exhausted, stopping task");
264 break;
265 }
266 };
267
268 let until_next = (next - Utc::now())
269 .to_std()
270 .unwrap_or(Duration::from_secs(1));
271
272 debug!(
273 name = %task_name,
274 next = %next,
275 sleep_secs = until_next.as_secs(),
276 "Sleeping until next cron tick"
277 );
278
279 tokio::select! {
281 _ = tokio::time::sleep(until_next) => {}
282 _ = shutdown_rx.changed() => {
283 info!(name = %task_name, "Shutdown signal received, stopping cron task");
284 break;
285 }
286 }
287
288 if *shutdown_rx.borrow() {
289 info!(name = %task_name, "Shutdown detected, stopping cron task");
290 break;
291 }
292
293 let _guard = if !ServerConfig::get_distributed_mode() {
296 debug!(name = %task_name, "Distributed mode disabled, running cron without lock");
297 None
298 } else {
299 let transaction_repo = app_state.transaction_repository();
300 match crate::repositories::TransactionRepository::connection_info(
301 transaction_repo.as_ref(),
302 ) {
303 None => {
304 debug!(name = %task_name, "In-memory mode, running cron without lock");
305 None
306 }
307 Some((connections, key_prefix)) => {
308 let pool = connections.primary().clone();
309 let lock_key = format!("{key_prefix}:lock:{task_name}");
310 let lock = DistributedLock::new(pool, &lock_key, lock_ttl);
311 match lock.try_acquire().await {
312 Ok(Some(guard)) => {
313 info!(name = %task_name, "Distributed lock acquired, running cron handler");
314 Some(guard)
315 }
316 Ok(None) => {
317 debug!(name = %task_name, "Distributed lock held by another instance, skipping");
318 continue;
319 }
320 Err(e) => {
321 warn!(name = %task_name, error = %e, "Failed to acquire distributed lock, skipping");
322 continue;
323 }
324 }
325 }
326 }
327 };
328
329 if let Err(panic_info) = AssertUnwindSafe(handler(app_state.clone()))
330 .catch_unwind()
331 .await
332 {
333 let msg = panic_info
334 .downcast_ref::<String>()
335 .map(|s| s.as_str())
336 .or_else(|| panic_info.downcast_ref::<&str>().copied())
337 .unwrap_or("unknown panic");
338 error!(name = %task_name, panic = %msg, "Cron handler panicked");
339 }
340
341 drop(_guard);
342 }
343
344 info!(name = %task_name, "SQS cron task stopped");
345 });
346
347 Ok(WorkerHandle::Tokio(handle))
348}
349
350#[cfg(test)]
351mod tests {
352 use super::*;
353
354 #[test]
357 fn test_cron_lock_ttl_margin_is_positive() {
358 assert!(
359 CRON_LOCK_TTL_MARGIN_SECS > 0,
360 "Margin must be positive to leave headroom for the next cron tick"
361 );
362 }
363
364 #[test]
365 fn test_cron_lock_ttl_min_is_positive() {
366 assert!(
367 CRON_LOCK_TTL_MIN_SECS > 0,
368 "Minimum TTL must be positive to avoid zero-length locks"
369 );
370 }
371
372 #[test]
373 fn test_cron_lock_ttl_min_greater_than_margin() {
374 assert!(
375 CRON_LOCK_TTL_MIN_SECS > CRON_LOCK_TTL_MARGIN_SECS,
376 "Minimum TTL ({}) should exceed margin ({}) to prevent degenerate lock durations",
377 CRON_LOCK_TTL_MIN_SECS,
378 CRON_LOCK_TTL_MARGIN_SECS
379 );
380 }
381
382 #[test]
385 fn test_derive_cron_lock_ttl_for_five_minute_schedule() {
386 let ttl = derive_cron_lock_ttl("0 */5 * * * *", Duration::from_secs(240));
387 assert_eq!(ttl, Duration::from_secs(295));
389 }
390
391 #[test]
392 fn test_derive_cron_lock_ttl_for_minute_schedule() {
393 let ttl = derive_cron_lock_ttl("0 * * * * *", Duration::from_secs(240));
394 assert_eq!(ttl, Duration::from_secs(55));
396 }
397
398 #[test]
399 fn test_derive_cron_lock_ttl_hourly_schedule() {
400 let ttl = derive_cron_lock_ttl("0 0 * * * *", Duration::from_secs(240));
401 assert_eq!(ttl, Duration::from_secs(3595));
403 }
404
405 #[test]
406 fn test_derive_cron_lock_ttl_ten_minute_schedule() {
407 let ttl = derive_cron_lock_ttl("0 */10 * * * *", Duration::from_secs(240));
409 assert_eq!(ttl, Duration::from_secs(595));
411 }
412
413 #[test]
414 fn test_derive_cron_lock_ttl_fifteen_minute_schedule() {
415 let ttl = derive_cron_lock_ttl("0 */15 * * * *", Duration::from_secs(240));
417 assert_eq!(ttl, Duration::from_secs(895));
419 }
420
421 #[test]
422 fn test_derive_cron_lock_ttl_daily_schedule() {
423 let ttl = derive_cron_lock_ttl("0 0 0 * * *", Duration::from_secs(240));
424 assert_eq!(ttl, Duration::from_secs(86395));
426 }
427
428 #[test]
431 fn test_derive_cron_lock_ttl_one_second_schedule_caps_to_one_second() {
432 let ttl = derive_cron_lock_ttl("*/1 * * * * *", Duration::from_secs(240));
433 assert_eq!(ttl, Duration::from_secs(1));
434 }
435
436 #[test]
437 fn test_derive_cron_lock_ttl_two_second_schedule() {
438 let ttl = derive_cron_lock_ttl("*/2 * * * * *", Duration::from_secs(240));
440 assert_eq!(ttl, Duration::from_secs(1));
441 }
442
443 #[test]
444 fn test_derive_cron_lock_ttl_five_second_schedule() {
445 let ttl = derive_cron_lock_ttl("*/5 * * * * *", Duration::from_secs(240));
447 assert_eq!(ttl, Duration::from_secs(4));
448 }
449
450 #[test]
451 fn test_derive_cron_lock_ttl_short_interval_floors_at_minimum() {
452 let ttl = derive_cron_lock_ttl("*/10 * * * * *", Duration::from_secs(240));
454 assert_eq!(ttl, Duration::from_secs(9));
455 }
456
457 #[test]
458 fn test_derive_cron_lock_ttl_fifteen_second_schedule() {
459 let ttl = derive_cron_lock_ttl("*/15 * * * * *", Duration::from_secs(240));
461 assert_eq!(ttl, Duration::from_secs(14));
462 }
463
464 #[test]
465 fn test_derive_cron_lock_ttl_thirty_second_schedule() {
466 let ttl = derive_cron_lock_ttl("*/30 * * * * *", Duration::from_secs(240));
468 assert_eq!(ttl, Duration::from_secs(29));
469 }
470
471 #[test]
472 fn test_derive_cron_lock_ttl_two_minute_schedule() {
473 let ttl = derive_cron_lock_ttl("0 */2 * * * *", Duration::from_secs(240));
475 assert_eq!(ttl, Duration::from_secs(115));
476 }
477
478 #[test]
481 fn test_derive_cron_lock_ttl_fallback_on_invalid_cron() {
482 let fallback = Duration::from_secs(240);
483 let ttl = derive_cron_lock_ttl("not-a-cron", fallback);
484 assert_eq!(ttl, fallback);
485 }
486
487 #[test]
488 fn test_derive_cron_lock_ttl_fallback_on_empty_string() {
489 let fallback = Duration::from_secs(100);
490 let ttl = derive_cron_lock_ttl("", fallback);
491 assert_eq!(ttl, fallback);
492 }
493
494 #[test]
495 fn test_derive_cron_lock_ttl_fallback_on_partial_cron() {
496 let fallback = Duration::from_secs(300);
497 let ttl = derive_cron_lock_ttl("0 0 *", fallback);
498 assert_eq!(ttl, fallback);
499 }
500
501 #[test]
502 fn test_derive_cron_lock_ttl_different_fallback_values() {
503 for secs in [1, 60, 3600, 86400] {
505 let fallback = Duration::from_secs(secs);
506 let ttl = derive_cron_lock_ttl("invalid!", fallback);
507 assert_eq!(
508 ttl, fallback,
509 "Fallback for {secs}s should be returned unchanged"
510 );
511 }
512 }
513
514 #[test]
517 fn test_derive_cron_lock_ttl_always_less_than_interval() {
518 let schedules = [
521 "*/2 * * * * *", "*/5 * * * * *", "*/10 * * * * *", "*/15 * * * * *", "*/30 * * * * *", "0 * * * * *", "0 */5 * * * *", "0 */10 * * * *", "0 */15 * * * *", "0 0 * * * *", "0 0 0 * * *", ];
533
534 let fallback = Duration::from_secs(9999);
535 for expr in &schedules {
536 let ttl = derive_cron_lock_ttl(expr, fallback);
537 let schedule = cron::Schedule::from_str(expr).unwrap();
539 let now = Utc::now();
540 let mut upcoming = schedule.after(&now);
541 let first = upcoming.next().unwrap();
542 let second = upcoming.next().unwrap();
543 let interval = (second - first).to_std().unwrap();
544
545 assert!(
546 ttl < interval,
547 "TTL ({:?}) must be < interval ({:?}) for schedule '{expr}'",
548 ttl,
549 interval
550 );
551 }
552 }
553
554 #[test]
555 fn test_derive_cron_lock_ttl_always_positive() {
556 let schedules = [
557 "*/1 * * * * *",
558 "*/2 * * * * *",
559 "*/5 * * * * *",
560 "*/10 * * * * *",
561 "0 * * * * *",
562 "0 0 * * * *",
563 ];
564
565 let fallback = Duration::from_secs(9999);
566 for expr in &schedules {
567 let ttl = derive_cron_lock_ttl(expr, fallback);
568 assert!(
569 ttl > Duration::ZERO,
570 "TTL must be positive for schedule '{expr}', got {:?}",
571 ttl
572 );
573 }
574 }
575
576 #[test]
577 fn test_derive_cron_lock_ttl_is_deterministic() {
578 let expr = "0 */5 * * * *";
580 let fallback = Duration::from_secs(240);
581 let ttl1 = derive_cron_lock_ttl(expr, fallback);
582 let ttl2 = derive_cron_lock_ttl(expr, fallback);
583 assert_eq!(ttl1, ttl2, "derive_cron_lock_ttl must be deterministic");
584 }
585
586 #[test]
589 fn test_derive_cron_lock_ttl_with_production_schedules() {
590 let tx_cleanup_ttl = derive_cron_lock_ttl(
592 TRANSACTION_CLEANUP_CRON_SCHEDULE,
593 Duration::from_secs(TRANSACTION_CLEANUP_LOCK_TTL_SECS),
594 );
595 assert!(
597 tx_cleanup_ttl.as_secs() > 500,
598 "Transaction cleanup TTL should be > 500s, got {}s",
599 tx_cleanup_ttl.as_secs()
600 );
601 assert!(
602 tx_cleanup_ttl.as_secs() < 600,
603 "Transaction cleanup TTL should be < 600s (interval), got {}s",
604 tx_cleanup_ttl.as_secs()
605 );
606
607 let sys_cleanup_ttl = derive_cron_lock_ttl(
608 SYSTEM_CLEANUP_CRON_SCHEDULE,
609 Duration::from_secs(SYSTEM_CLEANUP_LOCK_TTL_SECS),
610 );
611 assert!(
613 sys_cleanup_ttl.as_secs() > 800,
614 "System cleanup TTL should be > 800s, got {}s",
615 sys_cleanup_ttl.as_secs()
616 );
617 assert!(
618 sys_cleanup_ttl.as_secs() < 900,
619 "System cleanup TTL should be < 900s (interval), got {}s",
620 sys_cleanup_ttl.as_secs()
621 );
622 }
623
624 #[test]
627 fn test_spawn_cron_task_rejects_invalid_cron_expression() {
628 let rt = tokio::runtime::Builder::new_current_thread()
631 .enable_all()
632 .build()
633 .unwrap();
634
635 rt.block_on(async {
636 let (_tx, _rx) = watch::channel(false);
637
638 let result_description = "spawn_cron_task should reject invalid cron";
643
644 assert!(
646 cron::Schedule::from_str("not-a-cron").is_err(),
647 "{result_description}: cron parse should fail"
648 );
649 });
650 }
651
652 #[test]
653 fn test_cron_schedule_parse_valid_expressions() {
654 let expressions = [
656 TRANSACTION_CLEANUP_CRON_SCHEDULE,
657 SYSTEM_CLEANUP_CRON_SCHEDULE,
658 ];
659
660 for expr in &expressions {
661 assert!(
662 cron::Schedule::from_str(expr).is_ok(),
663 "Production cron schedule '{expr}' should be parseable"
664 );
665 }
666 }
667
668 #[test]
669 fn test_cron_schedule_parse_common_swap_expressions() {
670 let expressions = [
672 "0 */5 * * * *", "0 */15 * * * *", "0 0 * * * *", "0 0 */6 * * *", "0 0 0 * * *", ];
678
679 for expr in &expressions {
680 let schedule = cron::Schedule::from_str(expr);
681 assert!(
682 schedule.is_ok(),
683 "Common swap schedule '{expr}' should be parseable"
684 );
685 let schedule = schedule.unwrap();
687 let next = schedule.upcoming(Utc).next();
688 assert!(
689 next.is_some(),
690 "Schedule '{expr}' should have upcoming events"
691 );
692 }
693 }
694
695 #[test]
698 fn test_sqscronscheduler_new_stores_shutdown_rx() {
699 let rt = tokio::runtime::Builder::new_current_thread()
702 .enable_all()
703 .build()
704 .unwrap();
705
706 rt.block_on(async {
707 let (tx, rx) = watch::channel(false);
708 assert!(!*rx.borrow());
711 tx.send(true).unwrap();
712 assert!(*rx.borrow());
713 });
714 }
715}