1use crate::{
10 jobs::{
11 Job, NotificationSend, RelayerHealthCheck, TransactionRequest, TransactionSend,
12 TransactionStatusCheck,
13 },
14 models::RelayerError,
15 observability::request_id::get_request_id,
16 queues::{QueueBackend, QueueBackendStorage, QueueBackendType},
17};
18use async_trait::async_trait;
19use serde::Serialize;
20use std::sync::Arc;
21use thiserror::Error;
22use tracing::{debug, error};
23
24use super::{JobType, TokenSwapRequest};
25
26#[cfg(test)]
27use mockall::automock;
28
29#[derive(Debug, Error, Serialize, Clone)]
30pub enum JobProducerError {
31 #[error("Queue error: {0}")]
32 QueueError(String),
33}
34
35impl From<JobProducerError> for RelayerError {
36 fn from(err: JobProducerError) -> Self {
37 RelayerError::QueueError(err.to_string())
38 }
39}
40
41#[derive(Debug, Clone)]
43pub struct JobProducer {
44 queue_backend: Arc<QueueBackendStorage>,
45}
46
47#[async_trait]
48#[cfg_attr(test, automock)]
49pub trait JobProducerTrait: Send + Sync {
50 async fn produce_transaction_request_job(
51 &self,
52 transaction_process_job: TransactionRequest,
53 scheduled_on: Option<i64>,
54 ) -> Result<(), JobProducerError>;
55
56 async fn produce_submit_transaction_job(
57 &self,
58 transaction_submit_job: TransactionSend,
59 scheduled_on: Option<i64>,
60 ) -> Result<(), JobProducerError>;
61
62 async fn produce_check_transaction_status_job(
63 &self,
64 transaction_status_check_job: TransactionStatusCheck,
65 scheduled_on: Option<i64>,
66 ) -> Result<(), JobProducerError>;
67
68 async fn produce_send_notification_job(
69 &self,
70 notification_send_job: NotificationSend,
71 scheduled_on: Option<i64>,
72 ) -> Result<(), JobProducerError>;
73
74 async fn produce_token_swap_request_job(
75 &self,
76 swap_request_job: TokenSwapRequest,
77 scheduled_on: Option<i64>,
78 ) -> Result<(), JobProducerError>;
79
80 async fn produce_relayer_health_check_job(
81 &self,
82 relayer_health_check_job: RelayerHealthCheck,
83 scheduled_on: Option<i64>,
84 ) -> Result<(), JobProducerError>;
85
86 fn get_queue_backend(&self) -> Option<Arc<QueueBackendStorage>> {
88 None
89 }
90
91 fn backend_type(&self) -> QueueBackendType {
93 QueueBackendType::Redis
94 }
95}
96
97impl JobProducer {
98 pub fn new(queue_backend: Arc<QueueBackendStorage>) -> Self {
99 Self { queue_backend }
100 }
101
102 pub fn queue_backend(&self) -> Arc<QueueBackendStorage> {
103 self.queue_backend.clone()
104 }
105}
106
107#[async_trait]
108impl JobProducerTrait for JobProducer {
109 fn get_queue_backend(&self) -> Option<Arc<QueueBackendStorage>> {
110 Some(self.queue_backend())
111 }
112
113 fn backend_type(&self) -> QueueBackendType {
114 self.queue_backend.backend_type()
115 }
116
117 async fn produce_transaction_request_job(
118 &self,
119 transaction_process_job: TransactionRequest,
120 scheduled_on: Option<i64>,
121 ) -> Result<(), JobProducerError> {
122 debug!(
123 "Producing transaction request job: {:?}",
124 transaction_process_job
125 );
126 let job = Job::new(JobType::TransactionRequest, transaction_process_job)
127 .with_request_id(get_request_id());
128 let request_id = job.request_id.clone();
129 let tx_id = job.data.transaction_id.clone();
130 let relayer_id = job.data.relayer_id.clone();
131
132 let backend = self.queue_backend();
133 let job_id = backend
134 .produce_transaction_request(job, scheduled_on)
135 .await
136 .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
137
138 debug!(
139 job_type = %JobType::TransactionRequest,
140 backend = %backend.backend_type(),
141 job_id = %job_id,
142 request_id = ?request_id,
143 tx_id = %tx_id,
144 relayer_id = %relayer_id,
145 scheduled_on = ?scheduled_on,
146 "transaction request job produced"
147 );
148
149 Ok(())
150 }
151
152 async fn produce_submit_transaction_job(
153 &self,
154 transaction_submit_job: TransactionSend,
155 scheduled_on: Option<i64>,
156 ) -> Result<(), JobProducerError> {
157 let job = Job::new(JobType::TransactionSend, transaction_submit_job)
158 .with_request_id(get_request_id());
159 let request_id = job.request_id.clone();
160 let tx_id = job.data.transaction_id.clone();
161 let relayer_id = job.data.relayer_id.clone();
162 let command = job.data.command.clone();
163
164 let backend = self.queue_backend();
165 let job_id = backend
166 .produce_transaction_submission(job, scheduled_on)
167 .await
168 .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
169
170 debug!(
171 job_type = %JobType::TransactionSend,
172 backend = %backend.backend_type(),
173 job_id = %job_id,
174 request_id = ?request_id,
175 tx_id = %tx_id,
176 relayer_id = %relayer_id,
177 command = ?command,
178 scheduled_on = ?scheduled_on,
179 "transaction submission job produced"
180 );
181
182 Ok(())
183 }
184
185 async fn produce_check_transaction_status_job(
186 &self,
187 transaction_status_check_job: TransactionStatusCheck,
188 scheduled_on: Option<i64>,
189 ) -> Result<(), JobProducerError> {
190 let job = Job::new(
191 JobType::TransactionStatusCheck,
192 transaction_status_check_job.clone(),
193 )
194 .with_request_id(get_request_id());
195 let request_id = job.request_id.clone();
196 let tx_id = job.data.transaction_id.clone();
197 let relayer_id = job.data.relayer_id.clone();
198
199 let backend = self.queue_backend();
200 let job_id = backend
201 .produce_transaction_status_check(job, scheduled_on)
202 .await
203 .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
204
205 debug!(
206 job_type = %JobType::TransactionStatusCheck,
207 backend = %backend.backend_type(),
208 job_id = %job_id,
209 request_id = ?request_id,
210 tx_id = %tx_id,
211 relayer_id = %relayer_id,
212 network_type = ?transaction_status_check_job.network_type,
213 scheduled_on = ?scheduled_on,
214 "Transaction Status Check job produced successfully"
215 );
216 Ok(())
217 }
218
219 async fn produce_send_notification_job(
220 &self,
221 notification_send_job: NotificationSend,
222 scheduled_on: Option<i64>,
223 ) -> Result<(), JobProducerError> {
224 let job = Job::new(JobType::NotificationSend, notification_send_job)
225 .with_request_id(get_request_id());
226 let request_id = job.request_id.clone();
227 let notification_id = job.data.notification_id.clone();
228
229 let backend = self.queue_backend();
230 let job_id = backend
231 .produce_notification(job, scheduled_on)
232 .await
233 .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
234
235 debug!(
236 job_type = %JobType::NotificationSend,
237 backend = %backend.backend_type(),
238 job_id = %job_id,
239 request_id = ?request_id,
240 notification_id = %notification_id,
241 scheduled_on = ?scheduled_on,
242 "notification send job produced"
243 );
244 Ok(())
245 }
246
247 async fn produce_token_swap_request_job(
248 &self,
249 swap_request_job: TokenSwapRequest,
250 scheduled_on: Option<i64>,
251 ) -> Result<(), JobProducerError> {
252 let job =
253 Job::new(JobType::TokenSwapRequest, swap_request_job).with_request_id(get_request_id());
254 let request_id = job.request_id.clone();
255 let relayer_id = job.data.relayer_id.clone();
256 let backend = self.queue_backend();
257 let job_id = backend
258 .produce_token_swap_request(job, scheduled_on)
259 .await
260 .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
261
262 debug!(
263 job_type = %JobType::TokenSwapRequest,
264 backend = %backend.backend_type(),
265 job_id = %job_id,
266 request_id = ?request_id,
267 relayer_id = %relayer_id,
268 scheduled_on = ?scheduled_on,
269 "token swap job produced"
270 );
271 Ok(())
272 }
273
274 async fn produce_relayer_health_check_job(
275 &self,
276 relayer_health_check_job: RelayerHealthCheck,
277 scheduled_on: Option<i64>,
278 ) -> Result<(), JobProducerError> {
279 let job = Job::new(
280 JobType::RelayerHealthCheck,
281 relayer_health_check_job.clone(),
282 )
283 .with_request_id(get_request_id());
284 let request_id = job.request_id.clone();
285 let relayer_id = job.data.relayer_id.clone();
286 let backend = self.queue_backend();
287 let job_id = backend
288 .produce_relayer_health_check(job, scheduled_on)
289 .await
290 .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
291
292 debug!(
293 job_type = %JobType::RelayerHealthCheck,
294 backend = %backend.backend_type(),
295 job_id = %job_id,
296 request_id = ?request_id,
297 relayer_id = %relayer_id,
298 scheduled_on = ?scheduled_on,
299 "relayer health check job produced"
300 );
301 Ok(())
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308 use crate::models::{
309 EvmTransactionResponse, TransactionResponse, TransactionStatus, WebhookNotification,
310 WebhookPayload, U256,
311 };
312 use crate::utils::calculate_scheduled_timestamp;
313 use tokio::sync::Mutex;
314
315 #[derive(Clone, Debug)]
316 struct TestRedisStorage<T> {
318 pub push_called: bool,
319 pub schedule_called: bool,
320 _phantom: std::marker::PhantomData<T>,
321 }
322
323 impl<T> TestRedisStorage<T> {
324 fn new() -> Self {
325 Self {
326 push_called: false,
327 schedule_called: false,
328 _phantom: std::marker::PhantomData,
329 }
330 }
331
332 async fn push(&mut self, _job: T) -> Result<(), JobProducerError> {
333 self.push_called = true;
334 Ok(())
335 }
336
337 async fn schedule(&mut self, _job: T, _timestamp: i64) -> Result<(), JobProducerError> {
338 self.schedule_called = true;
339 Ok(())
340 }
341 }
342
343 #[derive(Clone, Debug)]
345 struct TestQueue {
346 pub transaction_request_queue: TestRedisStorage<Job<TransactionRequest>>,
347 pub transaction_submission_queue: TestRedisStorage<Job<TransactionSend>>,
348 pub transaction_status_queue: TestRedisStorage<Job<TransactionStatusCheck>>,
349 pub transaction_status_queue_evm: TestRedisStorage<Job<TransactionStatusCheck>>,
350 pub transaction_status_queue_stellar: TestRedisStorage<Job<TransactionStatusCheck>>,
351 pub notification_queue: TestRedisStorage<Job<NotificationSend>>,
352 pub token_swap_request_queue: TestRedisStorage<Job<TokenSwapRequest>>,
353 pub relayer_health_check_queue: TestRedisStorage<Job<RelayerHealthCheck>>,
354 }
355
356 impl TestQueue {
357 fn new() -> Self {
358 Self {
359 transaction_request_queue: TestRedisStorage::new(),
360 transaction_submission_queue: TestRedisStorage::new(),
361 transaction_status_queue: TestRedisStorage::new(),
362 transaction_status_queue_evm: TestRedisStorage::new(),
363 transaction_status_queue_stellar: TestRedisStorage::new(),
364 notification_queue: TestRedisStorage::new(),
365 token_swap_request_queue: TestRedisStorage::new(),
366 relayer_health_check_queue: TestRedisStorage::new(),
367 }
368 }
369 }
370
371 struct TestJobProducer {
373 queue: Mutex<TestQueue>,
374 }
375
376 impl Clone for TestJobProducer {
377 fn clone(&self) -> Self {
378 let queue = self
379 .queue
380 .try_lock()
381 .expect("Failed to lock queue for cloning")
382 .clone();
383 Self {
384 queue: Mutex::new(queue),
385 }
386 }
387 }
388
389 impl TestJobProducer {
390 fn new() -> Self {
391 Self {
392 queue: Mutex::new(TestQueue::new()),
393 }
394 }
395
396 async fn get_queue(&self) -> TestQueue {
397 self.queue.lock().await.clone()
398 }
399 }
400
401 #[async_trait]
402 impl JobProducerTrait for TestJobProducer {
403 fn get_queue_backend(&self) -> Option<Arc<QueueBackendStorage>> {
404 None
405 }
406
407 async fn produce_transaction_request_job(
408 &self,
409 transaction_process_job: TransactionRequest,
410 scheduled_on: Option<i64>,
411 ) -> Result<(), JobProducerError> {
412 let mut queue = self.queue.lock().await;
413 let job = Job::new(JobType::TransactionRequest, transaction_process_job);
414
415 match scheduled_on {
416 Some(scheduled_on) => {
417 queue
418 .transaction_request_queue
419 .schedule(job, scheduled_on)
420 .await?;
421 }
422 None => {
423 queue.transaction_request_queue.push(job).await?;
424 }
425 }
426
427 Ok(())
428 }
429
430 async fn produce_submit_transaction_job(
431 &self,
432 transaction_submit_job: TransactionSend,
433 scheduled_on: Option<i64>,
434 ) -> Result<(), JobProducerError> {
435 let mut queue = self.queue.lock().await;
436 let job = Job::new(JobType::TransactionSend, transaction_submit_job);
437
438 match scheduled_on {
439 Some(on) => {
440 queue.transaction_submission_queue.schedule(job, on).await?;
441 }
442 None => {
443 queue.transaction_submission_queue.push(job).await?;
444 }
445 }
446
447 Ok(())
448 }
449
450 async fn produce_check_transaction_status_job(
451 &self,
452 transaction_status_check_job: TransactionStatusCheck,
453 scheduled_on: Option<i64>,
454 ) -> Result<(), JobProducerError> {
455 let mut queue = self.queue.lock().await;
456 let job = Job::new(
457 JobType::TransactionStatusCheck,
458 transaction_status_check_job.clone(),
459 );
460
461 use crate::models::NetworkType;
463 let status_queue = match transaction_status_check_job.network_type {
464 Some(NetworkType::Evm) => &mut queue.transaction_status_queue_evm,
465 Some(NetworkType::Stellar) => &mut queue.transaction_status_queue_stellar,
466 Some(NetworkType::Solana) => &mut queue.transaction_status_queue, None => &mut queue.transaction_status_queue, };
469
470 match scheduled_on {
471 Some(on) => {
472 status_queue.schedule(job, on).await?;
473 }
474 None => {
475 status_queue.push(job).await?;
476 }
477 }
478
479 Ok(())
480 }
481
482 async fn produce_send_notification_job(
483 &self,
484 notification_send_job: NotificationSend,
485 scheduled_on: Option<i64>,
486 ) -> Result<(), JobProducerError> {
487 let mut queue = self.queue.lock().await;
488 let job = Job::new(JobType::NotificationSend, notification_send_job);
489
490 match scheduled_on {
491 Some(on) => {
492 queue.notification_queue.schedule(job, on).await?;
493 }
494 None => {
495 queue.notification_queue.push(job).await?;
496 }
497 }
498
499 Ok(())
500 }
501
502 async fn produce_token_swap_request_job(
503 &self,
504 swap_request_job: TokenSwapRequest,
505 scheduled_on: Option<i64>,
506 ) -> Result<(), JobProducerError> {
507 let mut queue = self.queue.lock().await;
508 let job = Job::new(JobType::TokenSwapRequest, swap_request_job);
509
510 match scheduled_on {
511 Some(on) => {
512 queue.token_swap_request_queue.schedule(job, on).await?;
513 }
514 None => {
515 queue.token_swap_request_queue.push(job).await?;
516 }
517 }
518
519 Ok(())
520 }
521
522 async fn produce_relayer_health_check_job(
523 &self,
524 relayer_health_check_job: RelayerHealthCheck,
525 scheduled_on: Option<i64>,
526 ) -> Result<(), JobProducerError> {
527 let mut queue = self.queue.lock().await;
528 let job = Job::new(JobType::RelayerHealthCheck, relayer_health_check_job);
529
530 match scheduled_on {
531 Some(scheduled_on) => {
532 queue
533 .relayer_health_check_queue
534 .schedule(job, scheduled_on)
535 .await?;
536 }
537 None => {
538 queue.relayer_health_check_queue.push(job).await?;
539 }
540 }
541
542 Ok(())
543 }
544 }
545
546 #[tokio::test]
547 async fn test_job_producer_operations() {
548 let producer = TestJobProducer::new();
549
550 let request = TransactionRequest::new("tx123", "relayer-1");
552 let result = producer
553 .produce_transaction_request_job(request, None)
554 .await;
555 assert!(result.is_ok());
556
557 let queue = producer.get_queue().await;
558 assert!(queue.transaction_request_queue.push_called);
559
560 let producer = TestJobProducer::new();
562 let request = TransactionRequest::new("tx123", "relayer-1");
563 let scheduled_timestamp = calculate_scheduled_timestamp(10); let result = producer
565 .produce_transaction_request_job(request, Some(scheduled_timestamp))
566 .await;
567 assert!(result.is_ok());
568
569 let queue = producer.get_queue().await;
570 assert!(queue.transaction_request_queue.schedule_called);
571 }
572
573 #[tokio::test]
574 async fn test_submit_transaction_job() {
575 let producer = TestJobProducer::new();
576
577 let submit_job = TransactionSend::submit("tx123", "relayer-1");
579 let result = producer
580 .produce_submit_transaction_job(submit_job, None)
581 .await;
582 assert!(result.is_ok());
583
584 let queue = producer.get_queue().await;
585 assert!(queue.transaction_submission_queue.push_called);
586 }
587
588 #[tokio::test]
589 async fn test_check_status_job() {
590 use crate::models::NetworkType;
591 let producer = TestJobProducer::new();
592
593 let status_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
595 let result = producer
596 .produce_check_transaction_status_job(status_job, None)
597 .await;
598 assert!(result.is_ok());
599
600 let queue = producer.get_queue().await;
601 assert!(queue.transaction_status_queue_evm.push_called);
602 }
603
604 #[tokio::test]
605 async fn test_notification_job() {
606 let producer = TestJobProducer::new();
607
608 let notification = WebhookNotification::new(
610 "test_event".to_string(),
611 WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
612 EvmTransactionResponse {
613 id: "tx123".to_string(),
614 hash: Some("0x123".to_string()),
615 status: TransactionStatus::Confirmed,
616 status_reason: None,
617 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
618 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
619 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
620 gas_price: Some(1000000000),
621 gas_limit: Some(21000),
622 nonce: Some(1),
623 value: U256::from(1000000000000000000_u64),
624 from: "0xabc".to_string(),
625 to: Some("0xdef".to_string()),
626 relayer_id: "relayer-1".to_string(),
627 data: None,
628 max_fee_per_gas: None,
629 max_priority_fee_per_gas: None,
630 signature: None,
631 speed: None,
632 },
633 ))),
634 );
635 let job = NotificationSend::new("notification-1".to_string(), notification);
636
637 let result = producer.produce_send_notification_job(job, None).await;
638 assert!(result.is_ok());
639
640 let queue = producer.get_queue().await;
641 assert!(queue.notification_queue.push_called);
642 }
643
644 #[tokio::test]
645 async fn test_relayer_health_check_job() {
646 let producer = TestJobProducer::new();
647
648 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
650 let result = producer
651 .produce_relayer_health_check_job(health_check, None)
652 .await;
653 assert!(result.is_ok());
654
655 let queue = producer.get_queue().await;
656 assert!(queue.relayer_health_check_queue.push_called);
657
658 let producer = TestJobProducer::new();
660 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
661 let scheduled_timestamp = calculate_scheduled_timestamp(60);
662 let result = producer
663 .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
664 .await;
665 assert!(result.is_ok());
666
667 let queue = producer.get_queue().await;
668 assert!(queue.relayer_health_check_queue.schedule_called);
669 }
670
671 #[test]
672 fn test_job_producer_error_conversion() {
673 let job_error = JobProducerError::QueueError("Test error".to_string());
675 let relayer_error: RelayerError = job_error.into();
676
677 match relayer_error {
678 RelayerError::QueueError(msg) => {
679 assert_eq!(msg, "Queue error: Test error");
680 }
681 _ => panic!("Unexpected error type"),
682 }
683 }
684
685 #[tokio::test]
686 async fn test_get_queue() {
687 let producer = TestJobProducer::new();
688
689 let queue = producer.get_queue().await;
691
692 assert!(!queue.transaction_request_queue.push_called);
694 assert!(!queue.transaction_request_queue.schedule_called);
695 assert!(!queue.transaction_submission_queue.push_called);
696 assert!(!queue.notification_queue.push_called);
697 assert!(!queue.token_swap_request_queue.push_called);
698 assert!(!queue.relayer_health_check_queue.push_called);
699 }
700
701 #[tokio::test]
702 async fn test_produce_relayer_health_check_job_immediate() {
703 let producer = TestJobProducer::new();
704
705 let health_check = RelayerHealthCheck::new("relayer-1".to_string());
707 let result = producer
708 .produce_relayer_health_check_job(health_check, None)
709 .await;
710
711 assert!(result.is_ok());
713
714 let queue = producer.get_queue().await;
716 assert!(queue.relayer_health_check_queue.push_called);
717 assert!(!queue.relayer_health_check_queue.schedule_called);
718
719 assert!(!queue.transaction_request_queue.push_called);
721 assert!(!queue.transaction_submission_queue.push_called);
722 assert!(!queue.transaction_status_queue.push_called);
723 assert!(!queue.notification_queue.push_called);
724 assert!(!queue.token_swap_request_queue.push_called);
725 }
726
727 #[tokio::test]
728 async fn test_produce_relayer_health_check_job_scheduled() {
729 let producer = TestJobProducer::new();
730
731 let health_check = RelayerHealthCheck::new("relayer-2".to_string());
733 let scheduled_timestamp = calculate_scheduled_timestamp(300); let result = producer
735 .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
736 .await;
737
738 assert!(result.is_ok());
740
741 let queue = producer.get_queue().await;
743 assert!(queue.relayer_health_check_queue.schedule_called);
744 assert!(!queue.relayer_health_check_queue.push_called);
745
746 assert!(!queue.transaction_request_queue.push_called);
748 assert!(!queue.transaction_submission_queue.push_called);
749 assert!(!queue.transaction_status_queue.push_called);
750 assert!(!queue.notification_queue.push_called);
751 assert!(!queue.token_swap_request_queue.push_called);
752 }
753
754 #[tokio::test]
755 async fn test_produce_relayer_health_check_job_multiple_relayers() {
756 let producer = TestJobProducer::new();
757
758 let relayer_ids = vec!["relayer-1", "relayer-2", "relayer-3"];
760
761 for relayer_id in &relayer_ids {
762 let health_check = RelayerHealthCheck::new(relayer_id.to_string());
763 let result = producer
764 .produce_relayer_health_check_job(health_check, None)
765 .await;
766 assert!(result.is_ok());
767 }
768
769 let queue = producer.get_queue().await;
771 assert!(queue.relayer_health_check_queue.push_called);
772 }
773
774 #[tokio::test]
775 async fn test_status_check_routes_to_evm_queue() {
776 use crate::models::NetworkType;
777 let producer = TestJobProducer::new();
778
779 let status_job = TransactionStatusCheck::new("tx-evm", "relayer-1", NetworkType::Evm);
780 let result = producer
781 .produce_check_transaction_status_job(status_job, None)
782 .await;
783
784 assert!(result.is_ok());
785 let queue = producer.get_queue().await;
786 assert!(queue.transaction_status_queue_evm.push_called);
787 assert!(!queue.transaction_status_queue_stellar.push_called);
788 assert!(!queue.transaction_status_queue.push_called);
789 }
790
791 #[tokio::test]
792 async fn test_status_check_routes_to_stellar_queue() {
793 use crate::models::NetworkType;
794 let producer = TestJobProducer::new();
795
796 let status_job =
797 TransactionStatusCheck::new("tx-stellar", "relayer-2", NetworkType::Stellar);
798 let result = producer
799 .produce_check_transaction_status_job(status_job, None)
800 .await;
801
802 assert!(result.is_ok());
803 let queue = producer.get_queue().await;
804 assert!(queue.transaction_status_queue_stellar.push_called);
805 assert!(!queue.transaction_status_queue_evm.push_called);
806 assert!(!queue.transaction_status_queue.push_called);
807 }
808
809 #[tokio::test]
810 async fn test_status_check_routes_to_default_queue_for_solana() {
811 use crate::models::NetworkType;
812 let producer = TestJobProducer::new();
813
814 let status_job = TransactionStatusCheck::new("tx-solana", "relayer-3", NetworkType::Solana);
815 let result = producer
816 .produce_check_transaction_status_job(status_job, None)
817 .await;
818
819 assert!(result.is_ok());
820 let queue = producer.get_queue().await;
821 assert!(queue.transaction_status_queue.push_called);
822 assert!(!queue.transaction_status_queue_evm.push_called);
823 assert!(!queue.transaction_status_queue_stellar.push_called);
824 }
825
826 #[tokio::test]
827 async fn test_status_check_scheduled_evm() {
828 use crate::models::NetworkType;
829 let producer = TestJobProducer::new();
830
831 let status_job =
832 TransactionStatusCheck::new("tx-evm-scheduled", "relayer-1", NetworkType::Evm);
833 let scheduled_timestamp = calculate_scheduled_timestamp(30);
834 let result = producer
835 .produce_check_transaction_status_job(status_job, Some(scheduled_timestamp))
836 .await;
837
838 assert!(result.is_ok());
839 let queue = producer.get_queue().await;
840 assert!(queue.transaction_status_queue_evm.schedule_called);
841 assert!(!queue.transaction_status_queue_evm.push_called);
842 }
843
844 #[tokio::test]
845 async fn test_submit_transaction_scheduled() {
846 let producer = TestJobProducer::new();
847
848 let submit_job = TransactionSend::submit("tx-scheduled", "relayer-1");
849 let scheduled_timestamp = calculate_scheduled_timestamp(15);
850 let result = producer
851 .produce_submit_transaction_job(submit_job, Some(scheduled_timestamp))
852 .await;
853
854 assert!(result.is_ok());
855 let queue = producer.get_queue().await;
856 assert!(queue.transaction_submission_queue.schedule_called);
857 assert!(!queue.transaction_submission_queue.push_called);
858 }
859
860 #[tokio::test]
861 async fn test_notification_job_scheduled() {
862 let producer = TestJobProducer::new();
863
864 let notification = WebhookNotification::new(
865 "test_scheduled_event".to_string(),
866 WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
867 EvmTransactionResponse {
868 id: "tx-notify-scheduled".to_string(),
869 hash: Some("0xabc123".to_string()),
870 status: TransactionStatus::Confirmed,
871 status_reason: None,
872 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
873 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
874 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
875 gas_price: Some(1000000000),
876 gas_limit: Some(21000),
877 nonce: Some(1),
878 value: U256::from(1000000000000000000_u64),
879 from: "0xabc".to_string(),
880 to: Some("0xdef".to_string()),
881 relayer_id: "relayer-1".to_string(),
882 data: None,
883 max_fee_per_gas: None,
884 max_priority_fee_per_gas: None,
885 signature: None,
886 speed: None,
887 },
888 ))),
889 );
890 let job = NotificationSend::new("notification-scheduled".to_string(), notification);
891
892 let scheduled_timestamp = calculate_scheduled_timestamp(5);
893 let result = producer
894 .produce_send_notification_job(job, Some(scheduled_timestamp))
895 .await;
896
897 assert!(result.is_ok());
898 let queue = producer.get_queue().await;
899 assert!(queue.notification_queue.schedule_called);
900 assert!(!queue.notification_queue.push_called);
901 }
902
903 #[tokio::test]
904 async fn test_solana_swap_job_immediate() {
905 let producer = TestJobProducer::new();
906
907 let swap_job = TokenSwapRequest::new("relayer-solana".to_string());
908 let result = producer
909 .produce_token_swap_request_job(swap_job, None)
910 .await;
911
912 assert!(result.is_ok());
913 let queue = producer.get_queue().await;
914 assert!(queue.token_swap_request_queue.push_called);
915 assert!(!queue.token_swap_request_queue.schedule_called);
916 }
917
918 #[tokio::test]
919 async fn test_token_swap_job_scheduled() {
920 let producer = TestJobProducer::new();
921
922 let swap_job = TokenSwapRequest::new("relayer-solana".to_string());
923 let scheduled_timestamp = calculate_scheduled_timestamp(20);
924 let result = producer
925 .produce_token_swap_request_job(swap_job, Some(scheduled_timestamp))
926 .await;
927
928 assert!(result.is_ok());
929 let queue = producer.get_queue().await;
930 assert!(queue.token_swap_request_queue.schedule_called);
931 assert!(!queue.token_swap_request_queue.push_called);
932 }
933
934 #[tokio::test]
935 async fn test_transaction_send_cancel_job() {
936 let producer = TestJobProducer::new();
937
938 let cancel_job = TransactionSend::cancel("tx-cancel", "relayer-1", "user requested");
939 let result = producer
940 .produce_submit_transaction_job(cancel_job, None)
941 .await;
942
943 assert!(result.is_ok());
944 let queue = producer.get_queue().await;
945 assert!(queue.transaction_submission_queue.push_called);
946 }
947
948 #[tokio::test]
949 async fn test_transaction_send_resubmit_job() {
950 let producer = TestJobProducer::new();
951
952 let resubmit_job = TransactionSend::resubmit("tx-resubmit", "relayer-1");
953 let result = producer
954 .produce_submit_transaction_job(resubmit_job, None)
955 .await;
956
957 assert!(result.is_ok());
958 let queue = producer.get_queue().await;
959 assert!(queue.transaction_submission_queue.push_called);
960 }
961
962 #[tokio::test]
963 async fn test_transaction_send_resend_job() {
964 let producer = TestJobProducer::new();
965
966 let resend_job = TransactionSend::resend("tx-resend", "relayer-1");
967 let result = producer
968 .produce_submit_transaction_job(resend_job, None)
969 .await;
970
971 assert!(result.is_ok());
972 let queue = producer.get_queue().await;
973 assert!(queue.transaction_submission_queue.push_called);
974 }
975
976 #[tokio::test]
977 async fn test_multiple_jobs_different_queues() {
978 let producer = TestJobProducer::new();
979
980 let request = TransactionRequest::new("tx1", "relayer-1");
982 producer
983 .produce_transaction_request_job(request, None)
984 .await
985 .unwrap();
986
987 let submit = TransactionSend::submit("tx2", "relayer-1");
988 producer
989 .produce_submit_transaction_job(submit, None)
990 .await
991 .unwrap();
992
993 use crate::models::NetworkType;
994 let status = TransactionStatusCheck::new("tx3", "relayer-1", NetworkType::Evm);
995 producer
996 .produce_check_transaction_status_job(status, None)
997 .await
998 .unwrap();
999
1000 let queue = producer.get_queue().await;
1002 assert!(queue.transaction_request_queue.push_called);
1003 assert!(queue.transaction_submission_queue.push_called);
1004 assert!(queue.transaction_status_queue_evm.push_called);
1005 }
1006
1007 #[test]
1008 fn test_job_producer_clone() {
1009 let producer = TestJobProducer::new();
1010 let cloned_producer = producer.clone();
1011
1012 assert!(std::ptr::addr_of!(producer) != std::ptr::addr_of!(cloned_producer));
1015 }
1016
1017 #[tokio::test]
1018 async fn test_transaction_request_with_metadata() {
1019 let producer = TestJobProducer::new();
1020
1021 let mut metadata = std::collections::HashMap::new();
1022 metadata.insert("retry_count".to_string(), "3".to_string());
1023
1024 let request = TransactionRequest::new("tx-meta", "relayer-1").with_metadata(metadata);
1025
1026 let result = producer
1027 .produce_transaction_request_job(request, None)
1028 .await;
1029
1030 assert!(result.is_ok());
1031 let queue = producer.get_queue().await;
1032 assert!(queue.transaction_request_queue.push_called);
1033 }
1034
1035 #[tokio::test]
1036 async fn test_status_check_with_metadata() {
1037 use crate::models::NetworkType;
1038 let producer = TestJobProducer::new();
1039
1040 let mut metadata = std::collections::HashMap::new();
1041 metadata.insert("attempt".to_string(), "2".to_string());
1042
1043 let status =
1044 TransactionStatusCheck::new("tx-status-meta", "relayer-1", NetworkType::Stellar)
1045 .with_metadata(metadata);
1046
1047 let result = producer
1048 .produce_check_transaction_status_job(status, None)
1049 .await;
1050
1051 assert!(result.is_ok());
1052 let queue = producer.get_queue().await;
1053 assert!(queue.transaction_status_queue_stellar.push_called);
1054 }
1055
1056 #[tokio::test]
1057 async fn test_scheduled_jobs_with_different_delays() {
1058 let producer = TestJobProducer::new();
1059
1060 let delays = [1, 10, 60, 300, 3600]; for (idx, delay) in delays.iter().enumerate() {
1064 let request = TransactionRequest::new(format!("tx-delay-{idx}"), "relayer-1");
1065 let timestamp = calculate_scheduled_timestamp(*delay);
1066
1067 let result = producer
1068 .produce_transaction_request_job(request, Some(timestamp))
1069 .await;
1070
1071 assert!(result.is_ok(), "Failed to schedule job with delay {delay}");
1072 }
1073 }
1074
1075 #[test]
1076 fn test_job_producer_error_display() {
1077 let error = JobProducerError::QueueError("Test queue error".to_string());
1078 let error_string = error.to_string();
1079
1080 assert!(error_string.contains("Queue error"));
1081 assert!(error_string.contains("Test queue error"));
1082 }
1083
1084 #[test]
1085 fn test_job_producer_error_to_relayer_error() {
1086 let job_error = JobProducerError::QueueError("Connection failed".to_string());
1088 let relayer_error: RelayerError = job_error.into();
1089
1090 match relayer_error {
1091 RelayerError::QueueError(msg) => {
1092 assert_eq!(msg, "Queue error: Connection failed");
1093 }
1094 _ => panic!("Expected QueueError variant"),
1095 }
1096 }
1097}