openzeppelin_relayer/queues/
queue_type.rs

1use std::fmt;
2
3use serde::{Deserialize, Serialize};
4
5use crate::constants::{
6    DEFAULT_CONCURRENCY_STATUS_CHECKER, DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
7    WORKER_NOTIFICATION_SENDER_RETRIES, WORKER_RELAYER_HEALTH_CHECK_RETRIES,
8    WORKER_TOKEN_SWAP_REQUEST_RETRIES, WORKER_TRANSACTION_REQUEST_RETRIES,
9    WORKER_TRANSACTION_STATUS_CHECKER_RETRIES, WORKER_TRANSACTION_SUBMIT_RETRIES,
10};
11
12/// Queue types for relayer operations.
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
14pub enum QueueType {
15    TransactionRequest,
16    TransactionSubmission,
17    StatusCheck,
18    StatusCheckEvm,
19    StatusCheckStellar,
20    Notification,
21    TokenSwapRequest,
22    RelayerHealthCheck,
23}
24
25impl QueueType {
26    /// Returns the queue name for logging and identification purposes.
27    pub fn queue_name(&self) -> &'static str {
28        match self {
29            Self::TransactionRequest => "transaction-request",
30            Self::TransactionSubmission => "transaction-submission",
31            Self::StatusCheck => "status-check",
32            Self::StatusCheckEvm => "status-check-evm",
33            Self::StatusCheckStellar => "status-check-stellar",
34            Self::Notification => "notification",
35            Self::TokenSwapRequest => "token-swap-request",
36            Self::RelayerHealthCheck => "relayer-health-check",
37        }
38    }
39
40    /// Returns the Redis namespace for this queue type (Apalis format).
41    pub fn redis_namespace(&self) -> &'static str {
42        match self {
43            Self::TransactionRequest => "relayer:transaction_request",
44            Self::TransactionSubmission => "relayer:transaction_submission",
45            Self::StatusCheck => "relayer:transaction_status",
46            Self::StatusCheckEvm => "relayer:transaction_status_evm",
47            Self::StatusCheckStellar => "relayer:transaction_status_stellar",
48            Self::Notification => "relayer:notification",
49            Self::TokenSwapRequest => "relayer:token_swap_request",
50            Self::RelayerHealthCheck => "relayer:relayer_health_check",
51        }
52    }
53
54    /// Returns the maximum number of retries for this queue type.
55    pub fn max_retries(&self) -> usize {
56        match self {
57            Self::TransactionRequest => WORKER_TRANSACTION_REQUEST_RETRIES,
58            Self::TransactionSubmission => WORKER_TRANSACTION_SUBMIT_RETRIES,
59            Self::StatusCheck | Self::StatusCheckEvm | Self::StatusCheckStellar => {
60                WORKER_TRANSACTION_STATUS_CHECKER_RETRIES
61            }
62            Self::Notification => WORKER_NOTIFICATION_SENDER_RETRIES,
63            Self::TokenSwapRequest => WORKER_TOKEN_SWAP_REQUEST_RETRIES,
64            Self::RelayerHealthCheck => WORKER_RELAYER_HEALTH_CHECK_RETRIES,
65        }
66    }
67
68    /// Returns the visibility timeout in seconds for SQS (how long a worker has to finish
69    /// processing before the message becomes visible again).
70    pub fn visibility_timeout_secs(&self) -> u32 {
71        match self {
72            Self::TransactionRequest => 30,
73            Self::TransactionSubmission => 30,
74            Self::StatusCheck | Self::StatusCheckEvm => 30,
75            Self::StatusCheckStellar => 20,
76            Self::Notification => 60,
77            Self::TokenSwapRequest => 60,
78            Self::RelayerHealthCheck => 60,
79        }
80    }
81
82    /// Returns the worker name used for the concurrency environment variable.
83    pub fn concurrency_env_key(&self) -> &'static str {
84        match self {
85            Self::TransactionRequest => "transaction_request",
86            Self::TransactionSubmission => "transaction_sender",
87            Self::StatusCheck => "transaction_status_checker",
88            Self::StatusCheckEvm => "transaction_status_checker_evm",
89            Self::StatusCheckStellar => "transaction_status_checker_stellar",
90            Self::Notification => "notification_sender",
91            Self::TokenSwapRequest => "token_swap_request",
92            Self::RelayerHealthCheck => "relayer_health_check",
93        }
94    }
95
96    /// Returns the default concurrency for this queue type.
97    pub fn default_concurrency(&self) -> usize {
98        match self {
99            Self::TransactionRequest => 50,
100            Self::TransactionSubmission => 75,
101            Self::StatusCheck => DEFAULT_CONCURRENCY_STATUS_CHECKER,
102            Self::StatusCheckEvm => DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
103            Self::StatusCheckStellar => DEFAULT_CONCURRENCY_STATUS_CHECKER,
104            Self::Notification => 30,
105            Self::TokenSwapRequest => 10,
106            Self::RelayerHealthCheck => 10,
107        }
108    }
109
110    /// Returns the SQS long-poll wait time in seconds.
111    /// Note: SQS `WaitTimeSeconds` maximum is 20 seconds.
112    pub fn polling_interval_secs(&self) -> u64 {
113        match self {
114            Self::TransactionRequest => 15,
115            Self::TransactionSubmission => 15,
116            Self::StatusCheck | Self::StatusCheckEvm => 5,
117            Self::StatusCheckStellar => 3,
118            Self::Notification => 20,
119            Self::TokenSwapRequest => 20,
120            Self::RelayerHealthCheck => 20,
121        }
122    }
123
124    /// Returns true if this is any variant of status check queue.
125    pub fn is_status_check(&self) -> bool {
126        matches!(
127            self,
128            Self::StatusCheck | Self::StatusCheckEvm | Self::StatusCheckStellar
129        )
130    }
131}
132
133impl fmt::Display for QueueType {
134    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135        write!(f, "{}", self.queue_name())
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142
143    #[test]
144    fn test_polling_interval_defaults() {
145        assert_eq!(QueueType::TransactionRequest.polling_interval_secs(), 15);
146        assert_eq!(QueueType::TransactionSubmission.polling_interval_secs(), 15);
147        assert_eq!(QueueType::StatusCheck.polling_interval_secs(), 5);
148        assert_eq!(QueueType::StatusCheckStellar.polling_interval_secs(), 3);
149    }
150
151    #[test]
152    fn test_is_status_check() {
153        assert!(QueueType::StatusCheck.is_status_check());
154        assert!(QueueType::StatusCheckEvm.is_status_check());
155        assert!(QueueType::StatusCheckStellar.is_status_check());
156        assert!(!QueueType::TransactionRequest.is_status_check());
157        assert!(!QueueType::Notification.is_status_check());
158    }
159
160    #[test]
161    fn test_status_check_evm_concurrency() {
162        assert_eq!(
163            QueueType::StatusCheckEvm.default_concurrency(),
164            DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM
165        );
166    }
167
168    #[test]
169    fn test_all_variants_have_nonempty_queue_name() {
170        let all = [
171            QueueType::TransactionRequest,
172            QueueType::TransactionSubmission,
173            QueueType::StatusCheck,
174            QueueType::StatusCheckEvm,
175            QueueType::StatusCheckStellar,
176            QueueType::Notification,
177            QueueType::TokenSwapRequest,
178            QueueType::RelayerHealthCheck,
179        ];
180        for qt in &all {
181            assert!(!qt.queue_name().is_empty(), "{qt:?} has empty queue_name");
182            assert!(
183                !qt.redis_namespace().is_empty(),
184                "{qt:?} has empty redis_namespace"
185            );
186            assert!(
187                !qt.concurrency_env_key().is_empty(),
188                "{qt:?} has empty concurrency_env_key"
189            );
190        }
191    }
192
193    #[test]
194    fn test_display_matches_queue_name() {
195        let all = [
196            QueueType::TransactionRequest,
197            QueueType::TransactionSubmission,
198            QueueType::StatusCheck,
199            QueueType::StatusCheckEvm,
200            QueueType::StatusCheckStellar,
201            QueueType::Notification,
202            QueueType::TokenSwapRequest,
203            QueueType::RelayerHealthCheck,
204        ];
205        for qt in &all {
206            assert_eq!(qt.to_string(), qt.queue_name());
207        }
208    }
209
210    #[test]
211    fn test_max_retries_status_checkers_use_infinite() {
212        // Status checkers retry until tx reaches final state
213        assert_eq!(QueueType::StatusCheck.max_retries(), usize::MAX);
214        assert_eq!(QueueType::StatusCheckEvm.max_retries(), usize::MAX);
215        assert_eq!(QueueType::StatusCheckStellar.max_retries(), usize::MAX);
216    }
217
218    #[test]
219    fn test_max_retries_bounded_queues() {
220        // Non-status queues should have finite retries
221        assert!(QueueType::TransactionRequest.max_retries() < usize::MAX);
222        assert!(QueueType::TransactionSubmission.max_retries() < usize::MAX);
223        assert!(QueueType::Notification.max_retries() < usize::MAX);
224        assert!(QueueType::TokenSwapRequest.max_retries() < usize::MAX);
225        assert!(QueueType::RelayerHealthCheck.max_retries() < usize::MAX);
226    }
227
228    #[test]
229    fn test_polling_intervals_within_sqs_limit() {
230        let all = [
231            QueueType::TransactionRequest,
232            QueueType::TransactionSubmission,
233            QueueType::StatusCheck,
234            QueueType::StatusCheckEvm,
235            QueueType::StatusCheckStellar,
236            QueueType::Notification,
237            QueueType::TokenSwapRequest,
238            QueueType::RelayerHealthCheck,
239        ];
240        for qt in &all {
241            assert!(
242                qt.polling_interval_secs() <= 20,
243                "{qt:?} polling interval {} exceeds SQS max of 20s",
244                qt.polling_interval_secs()
245            );
246        }
247    }
248
249    #[test]
250    fn test_visibility_timeout_within_sqs_range() {
251        // SQS allows 0..=43200 (12 hours)
252        let all = [
253            QueueType::TransactionRequest,
254            QueueType::TransactionSubmission,
255            QueueType::StatusCheck,
256            QueueType::StatusCheckEvm,
257            QueueType::StatusCheckStellar,
258            QueueType::Notification,
259            QueueType::TokenSwapRequest,
260            QueueType::RelayerHealthCheck,
261        ];
262        for qt in &all {
263            let vt = qt.visibility_timeout_secs();
264            assert!(
265                vt <= 43200,
266                "{qt:?} visibility timeout {vt} exceeds SQS max"
267            );
268            assert!(vt > 0, "{qt:?} visibility timeout should be positive");
269        }
270    }
271
272    #[test]
273    fn test_default_concurrency_positive() {
274        let all = [
275            QueueType::TransactionRequest,
276            QueueType::TransactionSubmission,
277            QueueType::StatusCheck,
278            QueueType::StatusCheckEvm,
279            QueueType::StatusCheckStellar,
280            QueueType::Notification,
281            QueueType::TokenSwapRequest,
282            QueueType::RelayerHealthCheck,
283        ];
284        for qt in &all {
285            assert!(qt.default_concurrency() > 0, "{qt:?} has zero concurrency");
286        }
287    }
288}