openzeppelin_relayer/jobs/
job_producer.rs

1//! Job producer module for enqueueing jobs to Redis queues.
2//!
3//! Provides functionality for producing various types of jobs:
4//! - Transaction processing jobs
5//! - Transaction submission jobs
6//! - Status monitoring jobs
7//! - Notification jobs
8
9use crate::{
10    jobs::{
11        Job, NotificationSend, RelayerHealthCheck, TransactionRequest, TransactionSend,
12        TransactionStatusCheck,
13    },
14    models::RelayerError,
15    observability::request_id::get_request_id,
16    queues::{QueueBackend, QueueBackendStorage, QueueBackendType},
17};
18use async_trait::async_trait;
19use serde::Serialize;
20use std::sync::Arc;
21use thiserror::Error;
22use tracing::{debug, error};
23
24use super::{JobType, TokenSwapRequest};
25
26#[cfg(test)]
27use mockall::automock;
28
29#[derive(Debug, Error, Serialize, Clone)]
30pub enum JobProducerError {
31    #[error("Queue error: {0}")]
32    QueueError(String),
33}
34
35impl From<JobProducerError> for RelayerError {
36    fn from(err: JobProducerError) -> Self {
37        RelayerError::QueueError(err.to_string())
38    }
39}
40
41/// Job producer that enqueues jobs via the configured queue backend.
42#[derive(Debug, Clone)]
43pub struct JobProducer {
44    queue_backend: Arc<QueueBackendStorage>,
45}
46
47#[async_trait]
48#[cfg_attr(test, automock)]
49pub trait JobProducerTrait: Send + Sync {
50    async fn produce_transaction_request_job(
51        &self,
52        transaction_process_job: TransactionRequest,
53        scheduled_on: Option<i64>,
54    ) -> Result<(), JobProducerError>;
55
56    async fn produce_submit_transaction_job(
57        &self,
58        transaction_submit_job: TransactionSend,
59        scheduled_on: Option<i64>,
60    ) -> Result<(), JobProducerError>;
61
62    async fn produce_check_transaction_status_job(
63        &self,
64        transaction_status_check_job: TransactionStatusCheck,
65        scheduled_on: Option<i64>,
66    ) -> Result<(), JobProducerError>;
67
68    async fn produce_send_notification_job(
69        &self,
70        notification_send_job: NotificationSend,
71        scheduled_on: Option<i64>,
72    ) -> Result<(), JobProducerError>;
73
74    async fn produce_token_swap_request_job(
75        &self,
76        swap_request_job: TokenSwapRequest,
77        scheduled_on: Option<i64>,
78    ) -> Result<(), JobProducerError>;
79
80    async fn produce_relayer_health_check_job(
81        &self,
82        relayer_health_check_job: RelayerHealthCheck,
83        scheduled_on: Option<i64>,
84    ) -> Result<(), JobProducerError>;
85
86    /// Returns active queue backend storage when available.
87    fn get_queue_backend(&self) -> Option<Arc<QueueBackendStorage>> {
88        None
89    }
90
91    /// Returns active queue backend type.
92    fn backend_type(&self) -> QueueBackendType {
93        QueueBackendType::Redis
94    }
95}
96
97impl JobProducer {
98    pub fn new(queue_backend: Arc<QueueBackendStorage>) -> Self {
99        Self { queue_backend }
100    }
101
102    pub fn queue_backend(&self) -> Arc<QueueBackendStorage> {
103        self.queue_backend.clone()
104    }
105}
106
107#[async_trait]
108impl JobProducerTrait for JobProducer {
109    fn get_queue_backend(&self) -> Option<Arc<QueueBackendStorage>> {
110        Some(self.queue_backend())
111    }
112
113    fn backend_type(&self) -> QueueBackendType {
114        self.queue_backend.backend_type()
115    }
116
117    async fn produce_transaction_request_job(
118        &self,
119        transaction_process_job: TransactionRequest,
120        scheduled_on: Option<i64>,
121    ) -> Result<(), JobProducerError> {
122        debug!(
123            "Producing transaction request job: {:?}",
124            transaction_process_job
125        );
126        let job = Job::new(JobType::TransactionRequest, transaction_process_job)
127            .with_request_id(get_request_id());
128        let request_id = job.request_id.clone();
129        let tx_id = job.data.transaction_id.clone();
130        let relayer_id = job.data.relayer_id.clone();
131
132        let backend = self.queue_backend();
133        let job_id = backend
134            .produce_transaction_request(job, scheduled_on)
135            .await
136            .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
137
138        debug!(
139            job_type = %JobType::TransactionRequest,
140            backend = %backend.backend_type(),
141            job_id = %job_id,
142            request_id = ?request_id,
143            tx_id = %tx_id,
144            relayer_id = %relayer_id,
145            scheduled_on = ?scheduled_on,
146            "transaction request job produced"
147        );
148
149        Ok(())
150    }
151
152    async fn produce_submit_transaction_job(
153        &self,
154        transaction_submit_job: TransactionSend,
155        scheduled_on: Option<i64>,
156    ) -> Result<(), JobProducerError> {
157        let job = Job::new(JobType::TransactionSend, transaction_submit_job)
158            .with_request_id(get_request_id());
159        let request_id = job.request_id.clone();
160        let tx_id = job.data.transaction_id.clone();
161        let relayer_id = job.data.relayer_id.clone();
162        let command = job.data.command.clone();
163
164        let backend = self.queue_backend();
165        let job_id = backend
166            .produce_transaction_submission(job, scheduled_on)
167            .await
168            .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
169
170        debug!(
171            job_type = %JobType::TransactionSend,
172            backend = %backend.backend_type(),
173            job_id = %job_id,
174            request_id = ?request_id,
175            tx_id = %tx_id,
176            relayer_id = %relayer_id,
177            command = ?command,
178            scheduled_on = ?scheduled_on,
179            "transaction submission job produced"
180        );
181
182        Ok(())
183    }
184
185    async fn produce_check_transaction_status_job(
186        &self,
187        transaction_status_check_job: TransactionStatusCheck,
188        scheduled_on: Option<i64>,
189    ) -> Result<(), JobProducerError> {
190        let job = Job::new(
191            JobType::TransactionStatusCheck,
192            transaction_status_check_job.clone(),
193        )
194        .with_request_id(get_request_id());
195        let request_id = job.request_id.clone();
196        let tx_id = job.data.transaction_id.clone();
197        let relayer_id = job.data.relayer_id.clone();
198
199        let backend = self.queue_backend();
200        let job_id = backend
201            .produce_transaction_status_check(job, scheduled_on)
202            .await
203            .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
204
205        debug!(
206            job_type = %JobType::TransactionStatusCheck,
207            backend = %backend.backend_type(),
208            job_id = %job_id,
209            request_id = ?request_id,
210            tx_id = %tx_id,
211            relayer_id = %relayer_id,
212            network_type = ?transaction_status_check_job.network_type,
213            scheduled_on = ?scheduled_on,
214            "Transaction Status Check job produced successfully"
215        );
216        Ok(())
217    }
218
219    async fn produce_send_notification_job(
220        &self,
221        notification_send_job: NotificationSend,
222        scheduled_on: Option<i64>,
223    ) -> Result<(), JobProducerError> {
224        let job = Job::new(JobType::NotificationSend, notification_send_job)
225            .with_request_id(get_request_id());
226        let request_id = job.request_id.clone();
227        let notification_id = job.data.notification_id.clone();
228
229        let backend = self.queue_backend();
230        let job_id = backend
231            .produce_notification(job, scheduled_on)
232            .await
233            .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
234
235        debug!(
236            job_type = %JobType::NotificationSend,
237            backend = %backend.backend_type(),
238            job_id = %job_id,
239            request_id = ?request_id,
240            notification_id = %notification_id,
241            scheduled_on = ?scheduled_on,
242            "notification send job produced"
243        );
244        Ok(())
245    }
246
247    async fn produce_token_swap_request_job(
248        &self,
249        swap_request_job: TokenSwapRequest,
250        scheduled_on: Option<i64>,
251    ) -> Result<(), JobProducerError> {
252        let job =
253            Job::new(JobType::TokenSwapRequest, swap_request_job).with_request_id(get_request_id());
254        let request_id = job.request_id.clone();
255        let relayer_id = job.data.relayer_id.clone();
256        let backend = self.queue_backend();
257        let job_id = backend
258            .produce_token_swap_request(job, scheduled_on)
259            .await
260            .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
261
262        debug!(
263            job_type = %JobType::TokenSwapRequest,
264            backend = %backend.backend_type(),
265            job_id = %job_id,
266            request_id = ?request_id,
267            relayer_id = %relayer_id,
268            scheduled_on = ?scheduled_on,
269            "token swap job produced"
270        );
271        Ok(())
272    }
273
274    async fn produce_relayer_health_check_job(
275        &self,
276        relayer_health_check_job: RelayerHealthCheck,
277        scheduled_on: Option<i64>,
278    ) -> Result<(), JobProducerError> {
279        let job = Job::new(
280            JobType::RelayerHealthCheck,
281            relayer_health_check_job.clone(),
282        )
283        .with_request_id(get_request_id());
284        let request_id = job.request_id.clone();
285        let relayer_id = job.data.relayer_id.clone();
286        let backend = self.queue_backend();
287        let job_id = backend
288            .produce_relayer_health_check(job, scheduled_on)
289            .await
290            .map_err(|e| JobProducerError::QueueError(e.to_string()))?;
291
292        debug!(
293            job_type = %JobType::RelayerHealthCheck,
294            backend = %backend.backend_type(),
295            job_id = %job_id,
296            request_id = ?request_id,
297            relayer_id = %relayer_id,
298            scheduled_on = ?scheduled_on,
299            "relayer health check job produced"
300        );
301        Ok(())
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308    use crate::models::{
309        EvmTransactionResponse, TransactionResponse, TransactionStatus, WebhookNotification,
310        WebhookPayload, U256,
311    };
312    use crate::utils::calculate_scheduled_timestamp;
313    use tokio::sync::Mutex;
314
315    #[derive(Clone, Debug)]
316    // Define a simplified queue for testing without using complex mocks
317    struct TestRedisStorage<T> {
318        pub push_called: bool,
319        pub schedule_called: bool,
320        _phantom: std::marker::PhantomData<T>,
321    }
322
323    impl<T> TestRedisStorage<T> {
324        fn new() -> Self {
325            Self {
326                push_called: false,
327                schedule_called: false,
328                _phantom: std::marker::PhantomData,
329            }
330        }
331
332        async fn push(&mut self, _job: T) -> Result<(), JobProducerError> {
333            self.push_called = true;
334            Ok(())
335        }
336
337        async fn schedule(&mut self, _job: T, _timestamp: i64) -> Result<(), JobProducerError> {
338            self.schedule_called = true;
339            Ok(())
340        }
341    }
342
343    // A test version of the Queue
344    #[derive(Clone, Debug)]
345    struct TestQueue {
346        pub transaction_request_queue: TestRedisStorage<Job<TransactionRequest>>,
347        pub transaction_submission_queue: TestRedisStorage<Job<TransactionSend>>,
348        pub transaction_status_queue: TestRedisStorage<Job<TransactionStatusCheck>>,
349        pub transaction_status_queue_evm: TestRedisStorage<Job<TransactionStatusCheck>>,
350        pub transaction_status_queue_stellar: TestRedisStorage<Job<TransactionStatusCheck>>,
351        pub notification_queue: TestRedisStorage<Job<NotificationSend>>,
352        pub token_swap_request_queue: TestRedisStorage<Job<TokenSwapRequest>>,
353        pub relayer_health_check_queue: TestRedisStorage<Job<RelayerHealthCheck>>,
354    }
355
356    impl TestQueue {
357        fn new() -> Self {
358            Self {
359                transaction_request_queue: TestRedisStorage::new(),
360                transaction_submission_queue: TestRedisStorage::new(),
361                transaction_status_queue: TestRedisStorage::new(),
362                transaction_status_queue_evm: TestRedisStorage::new(),
363                transaction_status_queue_stellar: TestRedisStorage::new(),
364                notification_queue: TestRedisStorage::new(),
365                token_swap_request_queue: TestRedisStorage::new(),
366                relayer_health_check_queue: TestRedisStorage::new(),
367            }
368        }
369    }
370
371    // A test version of JobProducer
372    struct TestJobProducer {
373        queue: Mutex<TestQueue>,
374    }
375
376    impl Clone for TestJobProducer {
377        fn clone(&self) -> Self {
378            let queue = self
379                .queue
380                .try_lock()
381                .expect("Failed to lock queue for cloning")
382                .clone();
383            Self {
384                queue: Mutex::new(queue),
385            }
386        }
387    }
388
389    impl TestJobProducer {
390        fn new() -> Self {
391            Self {
392                queue: Mutex::new(TestQueue::new()),
393            }
394        }
395
396        async fn get_queue(&self) -> TestQueue {
397            self.queue.lock().await.clone()
398        }
399    }
400
401    #[async_trait]
402    impl JobProducerTrait for TestJobProducer {
403        fn get_queue_backend(&self) -> Option<Arc<QueueBackendStorage>> {
404            None
405        }
406
407        async fn produce_transaction_request_job(
408            &self,
409            transaction_process_job: TransactionRequest,
410            scheduled_on: Option<i64>,
411        ) -> Result<(), JobProducerError> {
412            let mut queue = self.queue.lock().await;
413            let job = Job::new(JobType::TransactionRequest, transaction_process_job);
414
415            match scheduled_on {
416                Some(scheduled_on) => {
417                    queue
418                        .transaction_request_queue
419                        .schedule(job, scheduled_on)
420                        .await?;
421                }
422                None => {
423                    queue.transaction_request_queue.push(job).await?;
424                }
425            }
426
427            Ok(())
428        }
429
430        async fn produce_submit_transaction_job(
431            &self,
432            transaction_submit_job: TransactionSend,
433            scheduled_on: Option<i64>,
434        ) -> Result<(), JobProducerError> {
435            let mut queue = self.queue.lock().await;
436            let job = Job::new(JobType::TransactionSend, transaction_submit_job);
437
438            match scheduled_on {
439                Some(on) => {
440                    queue.transaction_submission_queue.schedule(job, on).await?;
441                }
442                None => {
443                    queue.transaction_submission_queue.push(job).await?;
444                }
445            }
446
447            Ok(())
448        }
449
450        async fn produce_check_transaction_status_job(
451            &self,
452            transaction_status_check_job: TransactionStatusCheck,
453            scheduled_on: Option<i64>,
454        ) -> Result<(), JobProducerError> {
455            let mut queue = self.queue.lock().await;
456            let job = Job::new(
457                JobType::TransactionStatusCheck,
458                transaction_status_check_job.clone(),
459            );
460
461            // Route to the appropriate queue based on network type
462            use crate::models::NetworkType;
463            let status_queue = match transaction_status_check_job.network_type {
464                Some(NetworkType::Evm) => &mut queue.transaction_status_queue_evm,
465                Some(NetworkType::Stellar) => &mut queue.transaction_status_queue_stellar,
466                Some(NetworkType::Solana) => &mut queue.transaction_status_queue, // Use default queue
467                None => &mut queue.transaction_status_queue, // Legacy messages without network_type
468            };
469
470            match scheduled_on {
471                Some(on) => {
472                    status_queue.schedule(job, on).await?;
473                }
474                None => {
475                    status_queue.push(job).await?;
476                }
477            }
478
479            Ok(())
480        }
481
482        async fn produce_send_notification_job(
483            &self,
484            notification_send_job: NotificationSend,
485            scheduled_on: Option<i64>,
486        ) -> Result<(), JobProducerError> {
487            let mut queue = self.queue.lock().await;
488            let job = Job::new(JobType::NotificationSend, notification_send_job);
489
490            match scheduled_on {
491                Some(on) => {
492                    queue.notification_queue.schedule(job, on).await?;
493                }
494                None => {
495                    queue.notification_queue.push(job).await?;
496                }
497            }
498
499            Ok(())
500        }
501
502        async fn produce_token_swap_request_job(
503            &self,
504            swap_request_job: TokenSwapRequest,
505            scheduled_on: Option<i64>,
506        ) -> Result<(), JobProducerError> {
507            let mut queue = self.queue.lock().await;
508            let job = Job::new(JobType::TokenSwapRequest, swap_request_job);
509
510            match scheduled_on {
511                Some(on) => {
512                    queue.token_swap_request_queue.schedule(job, on).await?;
513                }
514                None => {
515                    queue.token_swap_request_queue.push(job).await?;
516                }
517            }
518
519            Ok(())
520        }
521
522        async fn produce_relayer_health_check_job(
523            &self,
524            relayer_health_check_job: RelayerHealthCheck,
525            scheduled_on: Option<i64>,
526        ) -> Result<(), JobProducerError> {
527            let mut queue = self.queue.lock().await;
528            let job = Job::new(JobType::RelayerHealthCheck, relayer_health_check_job);
529
530            match scheduled_on {
531                Some(scheduled_on) => {
532                    queue
533                        .relayer_health_check_queue
534                        .schedule(job, scheduled_on)
535                        .await?;
536                }
537                None => {
538                    queue.relayer_health_check_queue.push(job).await?;
539                }
540            }
541
542            Ok(())
543        }
544    }
545
546    #[tokio::test]
547    async fn test_job_producer_operations() {
548        let producer = TestJobProducer::new();
549
550        // Test transaction request job
551        let request = TransactionRequest::new("tx123", "relayer-1");
552        let result = producer
553            .produce_transaction_request_job(request, None)
554            .await;
555        assert!(result.is_ok());
556
557        let queue = producer.get_queue().await;
558        assert!(queue.transaction_request_queue.push_called);
559
560        // Test scheduled job
561        let producer = TestJobProducer::new();
562        let request = TransactionRequest::new("tx123", "relayer-1");
563        let scheduled_timestamp = calculate_scheduled_timestamp(10); // Schedule for 10 seconds from now
564        let result = producer
565            .produce_transaction_request_job(request, Some(scheduled_timestamp))
566            .await;
567        assert!(result.is_ok());
568
569        let queue = producer.get_queue().await;
570        assert!(queue.transaction_request_queue.schedule_called);
571    }
572
573    #[tokio::test]
574    async fn test_submit_transaction_job() {
575        let producer = TestJobProducer::new();
576
577        // Test submit transaction job
578        let submit_job = TransactionSend::submit("tx123", "relayer-1");
579        let result = producer
580            .produce_submit_transaction_job(submit_job, None)
581            .await;
582        assert!(result.is_ok());
583
584        let queue = producer.get_queue().await;
585        assert!(queue.transaction_submission_queue.push_called);
586    }
587
588    #[tokio::test]
589    async fn test_check_status_job() {
590        use crate::models::NetworkType;
591        let producer = TestJobProducer::new();
592
593        // Test status check job for EVM
594        let status_job = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
595        let result = producer
596            .produce_check_transaction_status_job(status_job, None)
597            .await;
598        assert!(result.is_ok());
599
600        let queue = producer.get_queue().await;
601        assert!(queue.transaction_status_queue_evm.push_called);
602    }
603
604    #[tokio::test]
605    async fn test_notification_job() {
606        let producer = TestJobProducer::new();
607
608        // Create a simple notification for testing
609        let notification = WebhookNotification::new(
610            "test_event".to_string(),
611            WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
612                EvmTransactionResponse {
613                    id: "tx123".to_string(),
614                    hash: Some("0x123".to_string()),
615                    status: TransactionStatus::Confirmed,
616                    status_reason: None,
617                    created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
618                    sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
619                    confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
620                    gas_price: Some(1000000000),
621                    gas_limit: Some(21000),
622                    nonce: Some(1),
623                    value: U256::from(1000000000000000000_u64),
624                    from: "0xabc".to_string(),
625                    to: Some("0xdef".to_string()),
626                    relayer_id: "relayer-1".to_string(),
627                    data: None,
628                    max_fee_per_gas: None,
629                    max_priority_fee_per_gas: None,
630                    signature: None,
631                    speed: None,
632                },
633            ))),
634        );
635        let job = NotificationSend::new("notification-1".to_string(), notification);
636
637        let result = producer.produce_send_notification_job(job, None).await;
638        assert!(result.is_ok());
639
640        let queue = producer.get_queue().await;
641        assert!(queue.notification_queue.push_called);
642    }
643
644    #[tokio::test]
645    async fn test_relayer_health_check_job() {
646        let producer = TestJobProducer::new();
647
648        // Test immediate health check job
649        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
650        let result = producer
651            .produce_relayer_health_check_job(health_check, None)
652            .await;
653        assert!(result.is_ok());
654
655        let queue = producer.get_queue().await;
656        assert!(queue.relayer_health_check_queue.push_called);
657
658        // Test scheduled health check job
659        let producer = TestJobProducer::new();
660        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
661        let scheduled_timestamp = calculate_scheduled_timestamp(60);
662        let result = producer
663            .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
664            .await;
665        assert!(result.is_ok());
666
667        let queue = producer.get_queue().await;
668        assert!(queue.relayer_health_check_queue.schedule_called);
669    }
670
671    #[test]
672    fn test_job_producer_error_conversion() {
673        // Test error conversion preserves original error message
674        let job_error = JobProducerError::QueueError("Test error".to_string());
675        let relayer_error: RelayerError = job_error.into();
676
677        match relayer_error {
678            RelayerError::QueueError(msg) => {
679                assert_eq!(msg, "Queue error: Test error");
680            }
681            _ => panic!("Unexpected error type"),
682        }
683    }
684
685    #[tokio::test]
686    async fn test_get_queue() {
687        let producer = TestJobProducer::new();
688
689        // Get the queue
690        let queue = producer.get_queue().await;
691
692        // Verify the queue is valid and has the expected structure
693        assert!(!queue.transaction_request_queue.push_called);
694        assert!(!queue.transaction_request_queue.schedule_called);
695        assert!(!queue.transaction_submission_queue.push_called);
696        assert!(!queue.notification_queue.push_called);
697        assert!(!queue.token_swap_request_queue.push_called);
698        assert!(!queue.relayer_health_check_queue.push_called);
699    }
700
701    #[tokio::test]
702    async fn test_produce_relayer_health_check_job_immediate() {
703        let producer = TestJobProducer::new();
704
705        // Test immediate health check job (no scheduling)
706        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
707        let result = producer
708            .produce_relayer_health_check_job(health_check, None)
709            .await;
710
711        // Should succeed
712        assert!(result.is_ok());
713
714        // Verify the job was pushed (not scheduled)
715        let queue = producer.get_queue().await;
716        assert!(queue.relayer_health_check_queue.push_called);
717        assert!(!queue.relayer_health_check_queue.schedule_called);
718
719        // Other queues should not be affected
720        assert!(!queue.transaction_request_queue.push_called);
721        assert!(!queue.transaction_submission_queue.push_called);
722        assert!(!queue.transaction_status_queue.push_called);
723        assert!(!queue.notification_queue.push_called);
724        assert!(!queue.token_swap_request_queue.push_called);
725    }
726
727    #[tokio::test]
728    async fn test_produce_relayer_health_check_job_scheduled() {
729        let producer = TestJobProducer::new();
730
731        // Test scheduled health check job
732        let health_check = RelayerHealthCheck::new("relayer-2".to_string());
733        let scheduled_timestamp = calculate_scheduled_timestamp(300); // 5 minutes from now
734        let result = producer
735            .produce_relayer_health_check_job(health_check, Some(scheduled_timestamp))
736            .await;
737
738        // Should succeed
739        assert!(result.is_ok());
740
741        // Verify the job was scheduled (not pushed)
742        let queue = producer.get_queue().await;
743        assert!(queue.relayer_health_check_queue.schedule_called);
744        assert!(!queue.relayer_health_check_queue.push_called);
745
746        // Other queues should not be affected
747        assert!(!queue.transaction_request_queue.push_called);
748        assert!(!queue.transaction_submission_queue.push_called);
749        assert!(!queue.transaction_status_queue.push_called);
750        assert!(!queue.notification_queue.push_called);
751        assert!(!queue.token_swap_request_queue.push_called);
752    }
753
754    #[tokio::test]
755    async fn test_produce_relayer_health_check_job_multiple_relayers() {
756        let producer = TestJobProducer::new();
757
758        // Produce health check jobs for multiple relayers
759        let relayer_ids = vec!["relayer-1", "relayer-2", "relayer-3"];
760
761        for relayer_id in &relayer_ids {
762            let health_check = RelayerHealthCheck::new(relayer_id.to_string());
763            let result = producer
764                .produce_relayer_health_check_job(health_check, None)
765                .await;
766            assert!(result.is_ok());
767        }
768
769        // Verify jobs were produced
770        let queue = producer.get_queue().await;
771        assert!(queue.relayer_health_check_queue.push_called);
772    }
773
774    #[tokio::test]
775    async fn test_status_check_routes_to_evm_queue() {
776        use crate::models::NetworkType;
777        let producer = TestJobProducer::new();
778
779        let status_job = TransactionStatusCheck::new("tx-evm", "relayer-1", NetworkType::Evm);
780        let result = producer
781            .produce_check_transaction_status_job(status_job, None)
782            .await;
783
784        assert!(result.is_ok());
785        let queue = producer.get_queue().await;
786        assert!(queue.transaction_status_queue_evm.push_called);
787        assert!(!queue.transaction_status_queue_stellar.push_called);
788        assert!(!queue.transaction_status_queue.push_called);
789    }
790
791    #[tokio::test]
792    async fn test_status_check_routes_to_stellar_queue() {
793        use crate::models::NetworkType;
794        let producer = TestJobProducer::new();
795
796        let status_job =
797            TransactionStatusCheck::new("tx-stellar", "relayer-2", NetworkType::Stellar);
798        let result = producer
799            .produce_check_transaction_status_job(status_job, None)
800            .await;
801
802        assert!(result.is_ok());
803        let queue = producer.get_queue().await;
804        assert!(queue.transaction_status_queue_stellar.push_called);
805        assert!(!queue.transaction_status_queue_evm.push_called);
806        assert!(!queue.transaction_status_queue.push_called);
807    }
808
809    #[tokio::test]
810    async fn test_status_check_routes_to_default_queue_for_solana() {
811        use crate::models::NetworkType;
812        let producer = TestJobProducer::new();
813
814        let status_job = TransactionStatusCheck::new("tx-solana", "relayer-3", NetworkType::Solana);
815        let result = producer
816            .produce_check_transaction_status_job(status_job, None)
817            .await;
818
819        assert!(result.is_ok());
820        let queue = producer.get_queue().await;
821        assert!(queue.transaction_status_queue.push_called);
822        assert!(!queue.transaction_status_queue_evm.push_called);
823        assert!(!queue.transaction_status_queue_stellar.push_called);
824    }
825
826    #[tokio::test]
827    async fn test_status_check_scheduled_evm() {
828        use crate::models::NetworkType;
829        let producer = TestJobProducer::new();
830
831        let status_job =
832            TransactionStatusCheck::new("tx-evm-scheduled", "relayer-1", NetworkType::Evm);
833        let scheduled_timestamp = calculate_scheduled_timestamp(30);
834        let result = producer
835            .produce_check_transaction_status_job(status_job, Some(scheduled_timestamp))
836            .await;
837
838        assert!(result.is_ok());
839        let queue = producer.get_queue().await;
840        assert!(queue.transaction_status_queue_evm.schedule_called);
841        assert!(!queue.transaction_status_queue_evm.push_called);
842    }
843
844    #[tokio::test]
845    async fn test_submit_transaction_scheduled() {
846        let producer = TestJobProducer::new();
847
848        let submit_job = TransactionSend::submit("tx-scheduled", "relayer-1");
849        let scheduled_timestamp = calculate_scheduled_timestamp(15);
850        let result = producer
851            .produce_submit_transaction_job(submit_job, Some(scheduled_timestamp))
852            .await;
853
854        assert!(result.is_ok());
855        let queue = producer.get_queue().await;
856        assert!(queue.transaction_submission_queue.schedule_called);
857        assert!(!queue.transaction_submission_queue.push_called);
858    }
859
860    #[tokio::test]
861    async fn test_notification_job_scheduled() {
862        let producer = TestJobProducer::new();
863
864        let notification = WebhookNotification::new(
865            "test_scheduled_event".to_string(),
866            WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
867                EvmTransactionResponse {
868                    id: "tx-notify-scheduled".to_string(),
869                    hash: Some("0xabc123".to_string()),
870                    status: TransactionStatus::Confirmed,
871                    status_reason: None,
872                    created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
873                    sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
874                    confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
875                    gas_price: Some(1000000000),
876                    gas_limit: Some(21000),
877                    nonce: Some(1),
878                    value: U256::from(1000000000000000000_u64),
879                    from: "0xabc".to_string(),
880                    to: Some("0xdef".to_string()),
881                    relayer_id: "relayer-1".to_string(),
882                    data: None,
883                    max_fee_per_gas: None,
884                    max_priority_fee_per_gas: None,
885                    signature: None,
886                    speed: None,
887                },
888            ))),
889        );
890        let job = NotificationSend::new("notification-scheduled".to_string(), notification);
891
892        let scheduled_timestamp = calculate_scheduled_timestamp(5);
893        let result = producer
894            .produce_send_notification_job(job, Some(scheduled_timestamp))
895            .await;
896
897        assert!(result.is_ok());
898        let queue = producer.get_queue().await;
899        assert!(queue.notification_queue.schedule_called);
900        assert!(!queue.notification_queue.push_called);
901    }
902
903    #[tokio::test]
904    async fn test_solana_swap_job_immediate() {
905        let producer = TestJobProducer::new();
906
907        let swap_job = TokenSwapRequest::new("relayer-solana".to_string());
908        let result = producer
909            .produce_token_swap_request_job(swap_job, None)
910            .await;
911
912        assert!(result.is_ok());
913        let queue = producer.get_queue().await;
914        assert!(queue.token_swap_request_queue.push_called);
915        assert!(!queue.token_swap_request_queue.schedule_called);
916    }
917
918    #[tokio::test]
919    async fn test_token_swap_job_scheduled() {
920        let producer = TestJobProducer::new();
921
922        let swap_job = TokenSwapRequest::new("relayer-solana".to_string());
923        let scheduled_timestamp = calculate_scheduled_timestamp(20);
924        let result = producer
925            .produce_token_swap_request_job(swap_job, Some(scheduled_timestamp))
926            .await;
927
928        assert!(result.is_ok());
929        let queue = producer.get_queue().await;
930        assert!(queue.token_swap_request_queue.schedule_called);
931        assert!(!queue.token_swap_request_queue.push_called);
932    }
933
934    #[tokio::test]
935    async fn test_transaction_send_cancel_job() {
936        let producer = TestJobProducer::new();
937
938        let cancel_job = TransactionSend::cancel("tx-cancel", "relayer-1", "user requested");
939        let result = producer
940            .produce_submit_transaction_job(cancel_job, None)
941            .await;
942
943        assert!(result.is_ok());
944        let queue = producer.get_queue().await;
945        assert!(queue.transaction_submission_queue.push_called);
946    }
947
948    #[tokio::test]
949    async fn test_transaction_send_resubmit_job() {
950        let producer = TestJobProducer::new();
951
952        let resubmit_job = TransactionSend::resubmit("tx-resubmit", "relayer-1");
953        let result = producer
954            .produce_submit_transaction_job(resubmit_job, None)
955            .await;
956
957        assert!(result.is_ok());
958        let queue = producer.get_queue().await;
959        assert!(queue.transaction_submission_queue.push_called);
960    }
961
962    #[tokio::test]
963    async fn test_transaction_send_resend_job() {
964        let producer = TestJobProducer::new();
965
966        let resend_job = TransactionSend::resend("tx-resend", "relayer-1");
967        let result = producer
968            .produce_submit_transaction_job(resend_job, None)
969            .await;
970
971        assert!(result.is_ok());
972        let queue = producer.get_queue().await;
973        assert!(queue.transaction_submission_queue.push_called);
974    }
975
976    #[tokio::test]
977    async fn test_multiple_jobs_different_queues() {
978        let producer = TestJobProducer::new();
979
980        // Produce different types of jobs
981        let request = TransactionRequest::new("tx1", "relayer-1");
982        producer
983            .produce_transaction_request_job(request, None)
984            .await
985            .unwrap();
986
987        let submit = TransactionSend::submit("tx2", "relayer-1");
988        producer
989            .produce_submit_transaction_job(submit, None)
990            .await
991            .unwrap();
992
993        use crate::models::NetworkType;
994        let status = TransactionStatusCheck::new("tx3", "relayer-1", NetworkType::Evm);
995        producer
996            .produce_check_transaction_status_job(status, None)
997            .await
998            .unwrap();
999
1000        // Verify all queues were used
1001        let queue = producer.get_queue().await;
1002        assert!(queue.transaction_request_queue.push_called);
1003        assert!(queue.transaction_submission_queue.push_called);
1004        assert!(queue.transaction_status_queue_evm.push_called);
1005    }
1006
1007    #[test]
1008    fn test_job_producer_clone() {
1009        let producer = TestJobProducer::new();
1010        let cloned_producer = producer.clone();
1011
1012        // Both should be valid instances
1013        // The clone creates a new Mutex with a cloned Queue
1014        assert!(std::ptr::addr_of!(producer) != std::ptr::addr_of!(cloned_producer));
1015    }
1016
1017    #[tokio::test]
1018    async fn test_transaction_request_with_metadata() {
1019        let producer = TestJobProducer::new();
1020
1021        let mut metadata = std::collections::HashMap::new();
1022        metadata.insert("retry_count".to_string(), "3".to_string());
1023
1024        let request = TransactionRequest::new("tx-meta", "relayer-1").with_metadata(metadata);
1025
1026        let result = producer
1027            .produce_transaction_request_job(request, None)
1028            .await;
1029
1030        assert!(result.is_ok());
1031        let queue = producer.get_queue().await;
1032        assert!(queue.transaction_request_queue.push_called);
1033    }
1034
1035    #[tokio::test]
1036    async fn test_status_check_with_metadata() {
1037        use crate::models::NetworkType;
1038        let producer = TestJobProducer::new();
1039
1040        let mut metadata = std::collections::HashMap::new();
1041        metadata.insert("attempt".to_string(), "2".to_string());
1042
1043        let status =
1044            TransactionStatusCheck::new("tx-status-meta", "relayer-1", NetworkType::Stellar)
1045                .with_metadata(metadata);
1046
1047        let result = producer
1048            .produce_check_transaction_status_job(status, None)
1049            .await;
1050
1051        assert!(result.is_ok());
1052        let queue = producer.get_queue().await;
1053        assert!(queue.transaction_status_queue_stellar.push_called);
1054    }
1055
1056    #[tokio::test]
1057    async fn test_scheduled_jobs_with_different_delays() {
1058        let producer = TestJobProducer::new();
1059
1060        // Test with various scheduling delays
1061        let delays = [1, 10, 60, 300, 3600]; // 1s, 10s, 1m, 5m, 1h
1062
1063        for (idx, delay) in delays.iter().enumerate() {
1064            let request = TransactionRequest::new(format!("tx-delay-{idx}"), "relayer-1");
1065            let timestamp = calculate_scheduled_timestamp(*delay);
1066
1067            let result = producer
1068                .produce_transaction_request_job(request, Some(timestamp))
1069                .await;
1070
1071            assert!(result.is_ok(), "Failed to schedule job with delay {delay}");
1072        }
1073    }
1074
1075    #[test]
1076    fn test_job_producer_error_display() {
1077        let error = JobProducerError::QueueError("Test queue error".to_string());
1078        let error_string = error.to_string();
1079
1080        assert!(error_string.contains("Queue error"));
1081        assert!(error_string.contains("Test queue error"));
1082    }
1083
1084    #[test]
1085    fn test_job_producer_error_to_relayer_error() {
1086        // Test error conversion preserves original error message
1087        let job_error = JobProducerError::QueueError("Connection failed".to_string());
1088        let relayer_error: RelayerError = job_error.into();
1089
1090        match relayer_error {
1091            RelayerError::QueueError(msg) => {
1092                assert_eq!(msg, "Queue error: Connection failed");
1093            }
1094            _ => panic!("Expected QueueError variant"),
1095        }
1096    }
1097}