openzeppelin_relayer/queues/
worker_types.rs1use serde::{Deserialize, Serialize};
3use std::fmt;
4use std::sync::Arc;
5
6use crate::queues::QueueType;
7
8#[derive(Debug)]
10pub enum WorkerHandle {
11 Apalis(Box<dyn std::any::Any + Send>),
12 Tokio(tokio::task::JoinHandle<()>),
13}
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct QueueHealth {
18 pub queue_type: QueueType,
19 pub messages_visible: u64,
20 pub messages_in_flight: u64,
21 pub messages_dlq: u64,
22 pub backend: String,
23 pub is_healthy: bool,
24}
25
26#[derive(Debug, Clone)]
28pub struct WorkerContext {
29 pub attempt: usize,
30 pub task_id: String,
31}
32
33impl WorkerContext {
34 pub fn new(attempt: usize, task_id: String) -> Self {
35 Self { attempt, task_id }
36 }
37}
38
39#[derive(Debug)]
41pub enum HandlerError {
42 Retry(String),
43 Abort(String),
44}
45
46impl fmt::Display for HandlerError {
47 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48 match self {
49 Self::Retry(msg) => write!(f, "Retry: {msg}"),
50 Self::Abort(msg) => write!(f, "Abort: {msg}"),
51 }
52 }
53}
54
55impl std::error::Error for HandlerError {}
56
57impl From<HandlerError> for apalis::prelude::Error {
58 fn from(err: HandlerError) -> Self {
59 match err {
60 HandlerError::Retry(msg) => apalis::prelude::Error::Failed(Arc::new(msg.into())),
61 HandlerError::Abort(msg) => apalis::prelude::Error::Abort(Arc::new(msg.into())),
62 }
63 }
64}
65
66#[cfg(test)]
67mod tests {
68 use super::*;
69
70 #[test]
71 fn test_worker_context_new() {
72 let ctx = WorkerContext::new(3, "task-abc".to_string());
73 assert_eq!(ctx.attempt, 3);
74 assert_eq!(ctx.task_id, "task-abc");
75 }
76
77 #[test]
78 fn test_handler_error_retry_display() {
79 let err = HandlerError::Retry("connection timeout".to_string());
80 assert_eq!(err.to_string(), "Retry: connection timeout");
81 }
82
83 #[test]
84 fn test_handler_error_abort_display() {
85 let err = HandlerError::Abort("invalid payload".to_string());
86 assert_eq!(err.to_string(), "Abort: invalid payload");
87 }
88
89 #[test]
90 fn test_handler_error_retry_into_apalis_failed() {
91 let err = HandlerError::Retry("temp failure".to_string());
92 let apalis_err: apalis::prelude::Error = err.into();
93 assert!(
94 matches!(apalis_err, apalis::prelude::Error::Failed(_)),
95 "Retry should map to Failed"
96 );
97 }
98
99 #[test]
100 fn test_handler_error_abort_into_apalis_abort() {
101 let err = HandlerError::Abort("permanent failure".to_string());
102 let apalis_err: apalis::prelude::Error = err.into();
103 assert!(
104 matches!(apalis_err, apalis::prelude::Error::Abort(_)),
105 "Abort should map to Abort"
106 );
107 }
108}