openzeppelin_relayer/queues/
queue_type.rs1use std::fmt;
2
3use serde::{Deserialize, Serialize};
4
5use crate::constants::{
6 DEFAULT_CONCURRENCY_STATUS_CHECKER, DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
7 WORKER_NOTIFICATION_SENDER_RETRIES, WORKER_RELAYER_HEALTH_CHECK_RETRIES,
8 WORKER_TOKEN_SWAP_REQUEST_RETRIES, WORKER_TRANSACTION_REQUEST_RETRIES,
9 WORKER_TRANSACTION_STATUS_CHECKER_RETRIES, WORKER_TRANSACTION_SUBMIT_RETRIES,
10};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
14pub enum QueueType {
15 TransactionRequest,
16 TransactionSubmission,
17 StatusCheck,
18 StatusCheckEvm,
19 StatusCheckStellar,
20 Notification,
21 TokenSwapRequest,
22 RelayerHealthCheck,
23}
24
25impl QueueType {
26 pub fn queue_name(&self) -> &'static str {
28 match self {
29 Self::TransactionRequest => "transaction-request",
30 Self::TransactionSubmission => "transaction-submission",
31 Self::StatusCheck => "status-check",
32 Self::StatusCheckEvm => "status-check-evm",
33 Self::StatusCheckStellar => "status-check-stellar",
34 Self::Notification => "notification",
35 Self::TokenSwapRequest => "token-swap-request",
36 Self::RelayerHealthCheck => "relayer-health-check",
37 }
38 }
39
40 pub fn redis_namespace(&self) -> &'static str {
42 match self {
43 Self::TransactionRequest => "relayer:transaction_request",
44 Self::TransactionSubmission => "relayer:transaction_submission",
45 Self::StatusCheck => "relayer:transaction_status",
46 Self::StatusCheckEvm => "relayer:transaction_status_evm",
47 Self::StatusCheckStellar => "relayer:transaction_status_stellar",
48 Self::Notification => "relayer:notification",
49 Self::TokenSwapRequest => "relayer:token_swap_request",
50 Self::RelayerHealthCheck => "relayer:relayer_health_check",
51 }
52 }
53
54 pub fn max_retries(&self) -> usize {
56 match self {
57 Self::TransactionRequest => WORKER_TRANSACTION_REQUEST_RETRIES,
58 Self::TransactionSubmission => WORKER_TRANSACTION_SUBMIT_RETRIES,
59 Self::StatusCheck | Self::StatusCheckEvm | Self::StatusCheckStellar => {
60 WORKER_TRANSACTION_STATUS_CHECKER_RETRIES
61 }
62 Self::Notification => WORKER_NOTIFICATION_SENDER_RETRIES,
63 Self::TokenSwapRequest => WORKER_TOKEN_SWAP_REQUEST_RETRIES,
64 Self::RelayerHealthCheck => WORKER_RELAYER_HEALTH_CHECK_RETRIES,
65 }
66 }
67
68 pub fn visibility_timeout_secs(&self) -> u32 {
71 match self {
72 Self::TransactionRequest => 30,
73 Self::TransactionSubmission => 30,
74 Self::StatusCheck | Self::StatusCheckEvm => 30,
75 Self::StatusCheckStellar => 20,
76 Self::Notification => 60,
77 Self::TokenSwapRequest => 60,
78 Self::RelayerHealthCheck => 60,
79 }
80 }
81
82 pub fn concurrency_env_key(&self) -> &'static str {
84 match self {
85 Self::TransactionRequest => "transaction_request",
86 Self::TransactionSubmission => "transaction_sender",
87 Self::StatusCheck => "transaction_status_checker",
88 Self::StatusCheckEvm => "transaction_status_checker_evm",
89 Self::StatusCheckStellar => "transaction_status_checker_stellar",
90 Self::Notification => "notification_sender",
91 Self::TokenSwapRequest => "token_swap_request",
92 Self::RelayerHealthCheck => "relayer_health_check",
93 }
94 }
95
96 pub fn default_concurrency(&self) -> usize {
98 match self {
99 Self::TransactionRequest => 50,
100 Self::TransactionSubmission => 75,
101 Self::StatusCheck => DEFAULT_CONCURRENCY_STATUS_CHECKER,
102 Self::StatusCheckEvm => DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM,
103 Self::StatusCheckStellar => DEFAULT_CONCURRENCY_STATUS_CHECKER,
104 Self::Notification => 30,
105 Self::TokenSwapRequest => 10,
106 Self::RelayerHealthCheck => 10,
107 }
108 }
109
110 pub fn polling_interval_secs(&self) -> u64 {
113 match self {
114 Self::TransactionRequest => 15,
115 Self::TransactionSubmission => 15,
116 Self::StatusCheck | Self::StatusCheckEvm => 5,
117 Self::StatusCheckStellar => 3,
118 Self::Notification => 20,
119 Self::TokenSwapRequest => 20,
120 Self::RelayerHealthCheck => 20,
121 }
122 }
123
124 pub fn is_status_check(&self) -> bool {
126 matches!(
127 self,
128 Self::StatusCheck | Self::StatusCheckEvm | Self::StatusCheckStellar
129 )
130 }
131}
132
133impl fmt::Display for QueueType {
134 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135 write!(f, "{}", self.queue_name())
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142
143 #[test]
144 fn test_polling_interval_defaults() {
145 assert_eq!(QueueType::TransactionRequest.polling_interval_secs(), 15);
146 assert_eq!(QueueType::TransactionSubmission.polling_interval_secs(), 15);
147 assert_eq!(QueueType::StatusCheck.polling_interval_secs(), 5);
148 assert_eq!(QueueType::StatusCheckStellar.polling_interval_secs(), 3);
149 }
150
151 #[test]
152 fn test_is_status_check() {
153 assert!(QueueType::StatusCheck.is_status_check());
154 assert!(QueueType::StatusCheckEvm.is_status_check());
155 assert!(QueueType::StatusCheckStellar.is_status_check());
156 assert!(!QueueType::TransactionRequest.is_status_check());
157 assert!(!QueueType::Notification.is_status_check());
158 }
159
160 #[test]
161 fn test_status_check_evm_concurrency() {
162 assert_eq!(
163 QueueType::StatusCheckEvm.default_concurrency(),
164 DEFAULT_CONCURRENCY_STATUS_CHECKER_EVM
165 );
166 }
167
168 #[test]
169 fn test_all_variants_have_nonempty_queue_name() {
170 let all = [
171 QueueType::TransactionRequest,
172 QueueType::TransactionSubmission,
173 QueueType::StatusCheck,
174 QueueType::StatusCheckEvm,
175 QueueType::StatusCheckStellar,
176 QueueType::Notification,
177 QueueType::TokenSwapRequest,
178 QueueType::RelayerHealthCheck,
179 ];
180 for qt in &all {
181 assert!(!qt.queue_name().is_empty(), "{qt:?} has empty queue_name");
182 assert!(
183 !qt.redis_namespace().is_empty(),
184 "{qt:?} has empty redis_namespace"
185 );
186 assert!(
187 !qt.concurrency_env_key().is_empty(),
188 "{qt:?} has empty concurrency_env_key"
189 );
190 }
191 }
192
193 #[test]
194 fn test_display_matches_queue_name() {
195 let all = [
196 QueueType::TransactionRequest,
197 QueueType::TransactionSubmission,
198 QueueType::StatusCheck,
199 QueueType::StatusCheckEvm,
200 QueueType::StatusCheckStellar,
201 QueueType::Notification,
202 QueueType::TokenSwapRequest,
203 QueueType::RelayerHealthCheck,
204 ];
205 for qt in &all {
206 assert_eq!(qt.to_string(), qt.queue_name());
207 }
208 }
209
210 #[test]
211 fn test_max_retries_status_checkers_use_infinite() {
212 assert_eq!(QueueType::StatusCheck.max_retries(), usize::MAX);
214 assert_eq!(QueueType::StatusCheckEvm.max_retries(), usize::MAX);
215 assert_eq!(QueueType::StatusCheckStellar.max_retries(), usize::MAX);
216 }
217
218 #[test]
219 fn test_max_retries_bounded_queues() {
220 assert!(QueueType::TransactionRequest.max_retries() < usize::MAX);
222 assert!(QueueType::TransactionSubmission.max_retries() < usize::MAX);
223 assert!(QueueType::Notification.max_retries() < usize::MAX);
224 assert!(QueueType::TokenSwapRequest.max_retries() < usize::MAX);
225 assert!(QueueType::RelayerHealthCheck.max_retries() < usize::MAX);
226 }
227
228 #[test]
229 fn test_polling_intervals_within_sqs_limit() {
230 let all = [
231 QueueType::TransactionRequest,
232 QueueType::TransactionSubmission,
233 QueueType::StatusCheck,
234 QueueType::StatusCheckEvm,
235 QueueType::StatusCheckStellar,
236 QueueType::Notification,
237 QueueType::TokenSwapRequest,
238 QueueType::RelayerHealthCheck,
239 ];
240 for qt in &all {
241 assert!(
242 qt.polling_interval_secs() <= 20,
243 "{qt:?} polling interval {} exceeds SQS max of 20s",
244 qt.polling_interval_secs()
245 );
246 }
247 }
248
249 #[test]
250 fn test_visibility_timeout_within_sqs_range() {
251 let all = [
253 QueueType::TransactionRequest,
254 QueueType::TransactionSubmission,
255 QueueType::StatusCheck,
256 QueueType::StatusCheckEvm,
257 QueueType::StatusCheckStellar,
258 QueueType::Notification,
259 QueueType::TokenSwapRequest,
260 QueueType::RelayerHealthCheck,
261 ];
262 for qt in &all {
263 let vt = qt.visibility_timeout_secs();
264 assert!(
265 vt <= 43200,
266 "{qt:?} visibility timeout {vt} exceeds SQS max"
267 );
268 assert!(vt > 0, "{qt:?} visibility timeout should be positive");
269 }
270 }
271
272 #[test]
273 fn test_default_concurrency_positive() {
274 let all = [
275 QueueType::TransactionRequest,
276 QueueType::TransactionSubmission,
277 QueueType::StatusCheck,
278 QueueType::StatusCheckEvm,
279 QueueType::StatusCheckStellar,
280 QueueType::Notification,
281 QueueType::TokenSwapRequest,
282 QueueType::RelayerHealthCheck,
283 ];
284 for qt in &all {
285 assert!(qt.default_concurrency() > 0, "{qt:?} has zero concurrency");
286 }
287 }
288}