openzeppelin_relayer/jobs/
job.rs

1//! Job processing module for handling asynchronous tasks.
2//!
3//! Provides generic job structure for different types of operations:
4//! - Transaction processing
5//! - Status monitoring
6//! - Notifications
7use crate::models::{NetworkType, WebhookNotification};
8use chrono::Utc;
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use strum::Display;
12use uuid::Uuid;
13
14// Common message structure
15#[derive(Debug, Serialize, Deserialize, Clone)]
16pub struct Job<T> {
17    pub message_id: String,
18    pub version: String,
19    pub timestamp: String,
20    pub job_type: JobType,
21    pub data: T,
22    #[serde(skip_serializing_if = "Option::is_none")]
23    pub request_id: Option<String>,
24}
25
26impl<T> Job<T> {
27    pub fn new(job_type: JobType, data: T) -> Self {
28        Self {
29            message_id: Uuid::new_v4().to_string(),
30            version: "1.0".to_string(),
31            timestamp: Utc::now().timestamp().to_string(),
32            job_type,
33            data,
34            request_id: None,
35        }
36    }
37    pub fn with_request_id(mut self, id: Option<String>) -> Self {
38        self.request_id = id;
39        self
40    }
41}
42
43// Enum to represent different message types
44#[derive(Debug, Serialize, Deserialize, Display, Clone)]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum JobType {
47    TransactionRequest,
48    TransactionSend,
49    TransactionStatusCheck,
50    NotificationSend,
51    TokenSwapRequest,
52    RelayerHealthCheck,
53}
54
55// Example message data for transaction request
56#[derive(Debug, Serialize, Deserialize, Clone)]
57pub struct TransactionRequest {
58    pub transaction_id: String,
59    pub relayer_id: String,
60    /// Network type for this transaction request.
61    /// Used by SQS backend to choose the FIFO message group strategy:
62    /// EVM uses relayer_id (nonce ordering), others use transaction_id (parallelism).
63    /// Optional for backward compatibility with older queued messages.
64    #[serde(default)]
65    pub network_type: Option<NetworkType>,
66    pub metadata: Option<HashMap<String, String>>,
67}
68
69impl TransactionRequest {
70    pub fn new(transaction_id: impl Into<String>, relayer_id: impl Into<String>) -> Self {
71        Self {
72            transaction_id: transaction_id.into(),
73            relayer_id: relayer_id.into(),
74            network_type: None,
75            metadata: None,
76        }
77    }
78
79    pub fn with_network_type(mut self, network_type: NetworkType) -> Self {
80        self.network_type = Some(network_type);
81        self
82    }
83
84    pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
85        self.metadata = Some(metadata);
86        self
87    }
88}
89
90#[derive(Debug, Serialize, Deserialize, Clone)]
91pub enum TransactionCommand {
92    Submit,
93    Cancel { reason: String },
94    Resubmit,
95    Resend,
96}
97
98// Example message data for order creation
99#[derive(Debug, Serialize, Deserialize, Clone)]
100pub struct TransactionSend {
101    pub transaction_id: String,
102    pub relayer_id: String,
103    pub command: TransactionCommand,
104    /// Network type for this transaction submission.
105    /// Used by SQS backend to choose the FIFO message group strategy:
106    /// EVM uses relayer_id (nonce ordering), others use transaction_id (parallelism).
107    /// Optional for backward compatibility with older queued messages.
108    #[serde(default)]
109    pub network_type: Option<NetworkType>,
110    pub metadata: Option<HashMap<String, String>>,
111}
112
113impl TransactionSend {
114    // Submit a transaction to the relayer
115    pub fn submit(transaction_id: impl Into<String>, relayer_id: impl Into<String>) -> Self {
116        Self {
117            transaction_id: transaction_id.into(),
118            relayer_id: relayer_id.into(),
119            command: TransactionCommand::Submit,
120            network_type: None,
121            metadata: None,
122        }
123    }
124
125    // Cancel a transaction
126    pub fn cancel(
127        transaction_id: impl Into<String>,
128        relayer_id: impl Into<String>,
129        reason: impl Into<String>,
130    ) -> Self {
131        Self {
132            transaction_id: transaction_id.into(),
133            relayer_id: relayer_id.into(),
134            command: TransactionCommand::Cancel {
135                reason: reason.into(),
136            },
137            network_type: None,
138            metadata: None,
139        }
140    }
141
142    // Resubmit a transaction
143    pub fn resubmit(transaction_id: impl Into<String>, relayer_id: impl Into<String>) -> Self {
144        Self {
145            transaction_id: transaction_id.into(),
146            relayer_id: relayer_id.into(),
147            command: TransactionCommand::Resubmit,
148            network_type: None,
149            metadata: None,
150        }
151    }
152
153    // Resend a transaction
154    pub fn resend(transaction_id: impl Into<String>, relayer_id: impl Into<String>) -> Self {
155        Self {
156            transaction_id: transaction_id.into(),
157            relayer_id: relayer_id.into(),
158            command: TransactionCommand::Resend,
159            network_type: None,
160            metadata: None,
161        }
162    }
163
164    // Set the network type for this transaction submission
165    pub fn with_network_type(mut self, network_type: NetworkType) -> Self {
166        self.network_type = Some(network_type);
167        self
168    }
169
170    // Set the metadata for this transaction submission
171    pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
172        self.metadata = Some(metadata);
173        self
174    }
175}
176
177// Struct for individual order item
178#[derive(Debug, Serialize, Deserialize, Clone)]
179pub struct TransactionStatusCheck {
180    pub transaction_id: String,
181    pub relayer_id: String,
182    /// Network type for this transaction status check.
183    /// Optional for backward compatibility with older queued messages.
184    #[serde(default)]
185    pub network_type: Option<NetworkType>,
186    pub metadata: Option<HashMap<String, String>>,
187}
188
189impl TransactionStatusCheck {
190    // Create a new transaction status check
191    pub fn new(
192        transaction_id: impl Into<String>,
193        relayer_id: impl Into<String>,
194        network_type: NetworkType,
195    ) -> Self {
196        Self {
197            transaction_id: transaction_id.into(),
198            relayer_id: relayer_id.into(),
199            network_type: Some(network_type),
200            metadata: None,
201        }
202    }
203
204    // Set the metadata for this transaction status check
205    pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
206        self.metadata = Some(metadata);
207        self
208    }
209}
210
211#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
212pub struct NotificationSend {
213    pub notification_id: String,
214    pub notification: WebhookNotification,
215}
216
217impl NotificationSend {
218    pub fn new(notification_id: String, notification: WebhookNotification) -> Self {
219        Self {
220            notification_id,
221            notification,
222        }
223    }
224}
225
226#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
227pub struct TokenSwapRequest {
228    pub relayer_id: String,
229}
230
231impl TokenSwapRequest {
232    pub fn new(relayer_id: String) -> Self {
233        Self { relayer_id }
234    }
235}
236
237#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
238pub struct RelayerHealthCheck {
239    pub relayer_id: String,
240    pub retry_count: u32,
241}
242
243impl RelayerHealthCheck {
244    pub fn new(relayer_id: String) -> Self {
245        Self {
246            relayer_id,
247            retry_count: 0,
248        }
249    }
250
251    pub fn with_retry_count(relayer_id: String, retry_count: u32) -> Self {
252        Self {
253            relayer_id,
254            retry_count,
255        }
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use std::collections::HashMap;
262    use std::str::FromStr;
263
264    use crate::models::{
265        evm::Speed, EvmTransactionDataSignature, EvmTransactionResponse, TransactionResponse,
266        TransactionStatus, WebhookNotification, WebhookPayload, U256,
267    };
268
269    use super::*;
270
271    #[test]
272    fn test_job_creation() {
273        let job_data = TransactionRequest::new("tx123", "relayer-1");
274        let job = Job::new(JobType::TransactionRequest, job_data.clone());
275
276        assert_eq!(job.job_type.to_string(), "TransactionRequest");
277        assert_eq!(job.version, "1.0");
278        assert_eq!(job.data.transaction_id, "tx123");
279        assert_eq!(job.data.relayer_id, "relayer-1");
280        assert!(job.data.metadata.is_none());
281    }
282
283    #[test]
284    fn test_transaction_request_with_metadata() {
285        let mut metadata = HashMap::new();
286        metadata.insert("chain_id".to_string(), "1".to_string());
287        metadata.insert("gas_price".to_string(), "20000000000".to_string());
288
289        let tx_request =
290            TransactionRequest::new("tx123", "relayer-1").with_metadata(metadata.clone());
291
292        assert_eq!(tx_request.transaction_id, "tx123");
293        assert_eq!(tx_request.relayer_id, "relayer-1");
294        assert!(tx_request.metadata.is_some());
295        assert_eq!(tx_request.metadata.unwrap(), metadata);
296    }
297
298    #[test]
299    fn test_transaction_send_methods() {
300        // Test submit
301        let tx_submit = TransactionSend::submit("tx123", "relayer-1");
302        assert_eq!(tx_submit.transaction_id, "tx123");
303        assert_eq!(tx_submit.relayer_id, "relayer-1");
304        matches!(tx_submit.command, TransactionCommand::Submit);
305
306        // Test cancel
307        let tx_cancel = TransactionSend::cancel("tx123", "relayer-1", "user requested");
308        matches!(tx_cancel.command, TransactionCommand::Cancel { reason } if reason == "user requested");
309
310        // Test resubmit
311        let tx_resubmit = TransactionSend::resubmit("tx123", "relayer-1");
312        matches!(tx_resubmit.command, TransactionCommand::Resubmit);
313
314        // Test resend
315        let tx_resend = TransactionSend::resend("tx123", "relayer-1");
316        matches!(tx_resend.command, TransactionCommand::Resend);
317
318        // Test with_metadata
319        let mut metadata = HashMap::new();
320        metadata.insert("nonce".to_string(), "5".to_string());
321
322        let tx_with_metadata =
323            TransactionSend::submit("tx123", "relayer-1").with_metadata(metadata.clone());
324
325        assert!(tx_with_metadata.metadata.is_some());
326        assert_eq!(tx_with_metadata.metadata.unwrap(), metadata);
327    }
328
329    #[test]
330    fn test_transaction_status_check() {
331        let tx_status = TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Evm);
332        assert_eq!(tx_status.transaction_id, "tx123");
333        assert_eq!(tx_status.relayer_id, "relayer-1");
334        assert_eq!(tx_status.network_type, Some(NetworkType::Evm));
335        assert!(tx_status.metadata.is_none());
336
337        let mut metadata = HashMap::new();
338        metadata.insert("retries".to_string(), "3".to_string());
339
340        let tx_status_with_metadata =
341            TransactionStatusCheck::new("tx123", "relayer-1", NetworkType::Stellar)
342                .with_metadata(metadata.clone());
343
344        assert!(tx_status_with_metadata.metadata.is_some());
345        assert_eq!(tx_status_with_metadata.metadata.unwrap(), metadata);
346    }
347
348    #[test]
349    fn test_transaction_status_check_backward_compatibility() {
350        // Simulate an old message without network_type field
351        let old_json = r#"{
352            "transaction_id": "tx456",
353            "relayer_id": "relayer-2",
354            "metadata": null
355        }"#;
356
357        // Should deserialize successfully with network_type defaulting to None
358        let deserialized: TransactionStatusCheck = serde_json::from_str(old_json).unwrap();
359        assert_eq!(deserialized.transaction_id, "tx456");
360        assert_eq!(deserialized.relayer_id, "relayer-2");
361        assert_eq!(deserialized.network_type, None);
362        assert!(deserialized.metadata.is_none());
363
364        // New messages should include network_type
365        let new_status = TransactionStatusCheck::new("tx789", "relayer-3", NetworkType::Solana);
366        assert_eq!(new_status.network_type, Some(NetworkType::Solana));
367    }
368
369    #[test]
370    fn test_job_serialization() {
371        let tx_request = TransactionRequest::new("tx123", "relayer-1");
372        let job = Job::new(JobType::TransactionRequest, tx_request);
373
374        let serialized = serde_json::to_string(&job).unwrap();
375        let deserialized: Job<TransactionRequest> = serde_json::from_str(&serialized).unwrap();
376
377        assert_eq!(deserialized.job_type.to_string(), "TransactionRequest");
378        assert_eq!(deserialized.data.transaction_id, "tx123");
379        assert_eq!(deserialized.data.relayer_id, "relayer-1");
380    }
381
382    #[test]
383    fn test_notification_send_serialization() {
384        let payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
385            EvmTransactionResponse {
386                id: "tx123".to_string(),
387                hash: Some("0x123".to_string()),
388                status: TransactionStatus::Confirmed,
389                status_reason: None,
390                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
391                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
392                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
393                gas_price: Some(1000000000),
394                gas_limit: Some(21000),
395                nonce: Some(1),
396                value: U256::from_str("1000000000000000000").unwrap(),
397                from: "0xabc".to_string(),
398                to: Some("0xdef".to_string()),
399                relayer_id: "relayer-1".to_string(),
400                data: Some("0x123".to_string()),
401                max_fee_per_gas: Some(1000000000),
402                max_priority_fee_per_gas: Some(1000000000),
403                signature: Some(EvmTransactionDataSignature {
404                    r: "0x123".to_string(),
405                    s: "0x123".to_string(),
406                    v: 1,
407                    sig: "0x123".to_string(),
408                }),
409                speed: Some(Speed::Fast),
410            },
411        )));
412
413        let notification = WebhookNotification::new("transaction".to_string(), payload);
414        let notification_send =
415            NotificationSend::new("notification-test".to_string(), notification);
416
417        let serialized = serde_json::to_string(&notification_send).unwrap();
418
419        match serde_json::from_str::<NotificationSend>(&serialized) {
420            Ok(deserialized) => {
421                assert_eq!(notification_send, deserialized);
422            }
423            Err(e) => {
424                panic!("Deserialization error: {e}");
425            }
426        }
427    }
428
429    #[test]
430    fn test_notification_send_serialization_none_values() {
431        let payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
432            EvmTransactionResponse {
433                id: "tx123".to_string(),
434                hash: None,
435                status: TransactionStatus::Confirmed,
436                status_reason: None,
437                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
438                sent_at: None,
439                confirmed_at: None,
440                gas_price: None,
441                gas_limit: Some(21000),
442                nonce: None,
443                value: U256::from_str("1000000000000000000").unwrap(),
444                from: "0xabc".to_string(),
445                to: None,
446                relayer_id: "relayer-1".to_string(),
447                data: None,
448                max_fee_per_gas: None,
449                max_priority_fee_per_gas: None,
450                signature: None,
451                speed: None,
452            },
453        )));
454
455        let notification = WebhookNotification::new("transaction".to_string(), payload);
456        let notification_send =
457            NotificationSend::new("notification-test".to_string(), notification);
458
459        let serialized = serde_json::to_string(&notification_send).unwrap();
460
461        match serde_json::from_str::<NotificationSend>(&serialized) {
462            Ok(deserialized) => {
463                assert_eq!(notification_send, deserialized);
464            }
465            Err(e) => {
466                panic!("Deserialization error: {e}");
467            }
468        }
469    }
470
471    #[test]
472    fn test_relayer_health_check_new() {
473        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
474
475        assert_eq!(health_check.relayer_id, "relayer-1");
476        assert_eq!(health_check.retry_count, 0);
477    }
478
479    #[test]
480    fn test_relayer_health_check_with_retry_count() {
481        let health_check = RelayerHealthCheck::with_retry_count("relayer-1".to_string(), 5);
482
483        assert_eq!(health_check.relayer_id, "relayer-1");
484        assert_eq!(health_check.retry_count, 5);
485    }
486
487    #[test]
488    fn test_relayer_health_check_correct_field_values() {
489        // Test with zero retry count
490        let health_check_zero = RelayerHealthCheck::new("relayer-test-123".to_string());
491        assert_eq!(health_check_zero.relayer_id, "relayer-test-123");
492        assert_eq!(health_check_zero.retry_count, 0);
493
494        // Test with specific retry count
495        let health_check_custom =
496            RelayerHealthCheck::with_retry_count("relayer-abc".to_string(), 10);
497        assert_eq!(health_check_custom.relayer_id, "relayer-abc");
498        assert_eq!(health_check_custom.retry_count, 10);
499
500        // Test with large retry count
501        let health_check_large =
502            RelayerHealthCheck::with_retry_count("relayer-xyz".to_string(), 999);
503        assert_eq!(health_check_large.relayer_id, "relayer-xyz");
504        assert_eq!(health_check_large.retry_count, 999);
505    }
506
507    #[test]
508    fn test_relayer_health_check_job_serialization() {
509        let health_check = RelayerHealthCheck::new("relayer-1".to_string());
510        let job = Job::new(JobType::RelayerHealthCheck, health_check);
511
512        let serialized = serde_json::to_string(&job).unwrap();
513        let deserialized: Job<RelayerHealthCheck> = serde_json::from_str(&serialized).unwrap();
514
515        assert_eq!(deserialized.job_type.to_string(), "RelayerHealthCheck");
516        assert_eq!(deserialized.data.relayer_id, "relayer-1");
517        assert_eq!(deserialized.data.retry_count, 0);
518    }
519
520    #[test]
521    fn test_relayer_health_check_job_serialization_with_retry_count() {
522        let health_check = RelayerHealthCheck::with_retry_count("relayer-2".to_string(), 3);
523        let job = Job::new(JobType::RelayerHealthCheck, health_check.clone());
524
525        let serialized = serde_json::to_string(&job).unwrap();
526        let deserialized: Job<RelayerHealthCheck> = serde_json::from_str(&serialized).unwrap();
527
528        assert_eq!(deserialized.job_type.to_string(), "RelayerHealthCheck");
529        assert_eq!(deserialized.data.relayer_id, health_check.relayer_id);
530        assert_eq!(deserialized.data.retry_count, health_check.retry_count);
531        assert_eq!(deserialized.data, health_check);
532    }
533
534    #[test]
535    fn test_relayer_health_check_equality_after_deserialization() {
536        let original_health_check =
537            RelayerHealthCheck::with_retry_count("relayer-test".to_string(), 7);
538        let job = Job::new(JobType::RelayerHealthCheck, original_health_check.clone());
539
540        let serialized = serde_json::to_string(&job).unwrap();
541        let deserialized: Job<RelayerHealthCheck> = serde_json::from_str(&serialized).unwrap();
542
543        // Assert job type string
544        assert_eq!(deserialized.job_type.to_string(), "RelayerHealthCheck");
545
546        // Assert data equality
547        assert_eq!(deserialized.data, original_health_check);
548        assert_eq!(
549            deserialized.data.relayer_id,
550            original_health_check.relayer_id
551        );
552        assert_eq!(
553            deserialized.data.retry_count,
554            original_health_check.retry_count
555        );
556    }
557}