openzeppelin_relayer/queues/
worker_types.rs

1//! Worker types for the queue abstraction.
2use serde::{Deserialize, Serialize};
3use std::fmt;
4use std::sync::Arc;
5
6use crate::queues::QueueType;
7
8/// Handle to a running worker task.
9#[derive(Debug)]
10pub enum WorkerHandle {
11    Apalis(Box<dyn std::any::Any + Send>),
12    Tokio(tokio::task::JoinHandle<()>),
13}
14
15/// Queue health status information.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct QueueHealth {
18    pub queue_type: QueueType,
19    pub messages_visible: u64,
20    pub messages_in_flight: u64,
21    pub messages_dlq: u64,
22    pub backend: String,
23    pub is_healthy: bool,
24}
25
26/// Backend-neutral context passed to all job handlers.
27#[derive(Debug, Clone)]
28pub struct WorkerContext {
29    pub attempt: usize,
30    pub task_id: String,
31}
32
33impl WorkerContext {
34    pub fn new(attempt: usize, task_id: String) -> Self {
35        Self { attempt, task_id }
36    }
37}
38
39/// Backend-neutral handler error for retry control.
40#[derive(Debug)]
41pub enum HandlerError {
42    Retry(String),
43    Abort(String),
44}
45
46impl fmt::Display for HandlerError {
47    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48        match self {
49            Self::Retry(msg) => write!(f, "Retry: {msg}"),
50            Self::Abort(msg) => write!(f, "Abort: {msg}"),
51        }
52    }
53}
54
55impl std::error::Error for HandlerError {}
56
57impl From<HandlerError> for apalis::prelude::Error {
58    fn from(err: HandlerError) -> Self {
59        match err {
60            HandlerError::Retry(msg) => apalis::prelude::Error::Failed(Arc::new(msg.into())),
61            HandlerError::Abort(msg) => apalis::prelude::Error::Abort(Arc::new(msg.into())),
62        }
63    }
64}
65
66#[cfg(test)]
67mod tests {
68    use super::*;
69
70    #[test]
71    fn test_worker_context_new() {
72        let ctx = WorkerContext::new(3, "task-abc".to_string());
73        assert_eq!(ctx.attempt, 3);
74        assert_eq!(ctx.task_id, "task-abc");
75    }
76
77    #[test]
78    fn test_handler_error_retry_display() {
79        let err = HandlerError::Retry("connection timeout".to_string());
80        assert_eq!(err.to_string(), "Retry: connection timeout");
81    }
82
83    #[test]
84    fn test_handler_error_abort_display() {
85        let err = HandlerError::Abort("invalid payload".to_string());
86        assert_eq!(err.to_string(), "Abort: invalid payload");
87    }
88
89    #[test]
90    fn test_handler_error_retry_into_apalis_failed() {
91        let err = HandlerError::Retry("temp failure".to_string());
92        let apalis_err: apalis::prelude::Error = err.into();
93        assert!(
94            matches!(apalis_err, apalis::prelude::Error::Failed(_)),
95            "Retry should map to Failed"
96        );
97    }
98
99    #[test]
100    fn test_handler_error_abort_into_apalis_abort() {
101        let err = HandlerError::Abort("permanent failure".to_string());
102        let apalis_err: apalis::prelude::Error = err.into();
103        assert!(
104            matches!(apalis_err, apalis::prelude::Error::Abort(_)),
105            "Abort should map to Abort"
106        );
107    }
108}