openzeppelin_relayer/jobs/handlers/
notification_handler.rs

1//! Notification handling worker implementation.
2//!
3//! This module implements the notification handling worker that processes
4//! notification jobs from the queue.
5
6use actix_web::web::ThinData;
7use eyre::Result;
8use tracing::{debug, instrument};
9
10use crate::{
11    constants::WORKER_NOTIFICATION_SENDER_RETRIES,
12    jobs::{handle_result, Job, NotificationSend},
13    models::DefaultAppState,
14    observability::request_id::set_request_id,
15    queues::{HandlerError, WorkerContext},
16    repositories::Repository,
17    services::WebhookNotificationService,
18};
19
20/// Handles incoming notification jobs from the queue.
21///
22/// # Arguments
23/// * `job` - The notification job containing recipient and message details
24/// * `context` - Application state containing notification services
25///
26/// # Returns
27/// * `Result<(), HandlerError>` - Success or failure of notification processing
28#[instrument(
29    level = "debug",
30    skip(job, context),
31    fields(
32        request_id = ?job.request_id,
33        job_id = %job.message_id,
34        job_type = %job.job_type.to_string(),
35        attempt = %ctx.attempt,
36        task_id = %ctx.task_id,
37        notification_id = %job.data.notification_id,
38    )
39)]
40pub async fn notification_handler(
41    job: Job<NotificationSend>,
42    context: ThinData<DefaultAppState>,
43    ctx: WorkerContext,
44) -> Result<(), HandlerError> {
45    if let Some(request_id) = job.request_id.clone() {
46        set_request_id(request_id);
47    }
48
49    debug!(
50        notification_id = %job.data.notification_id,
51        "handling notification"
52    );
53
54    let result = handle_request(job.data, &context).await;
55
56    handle_result(
57        result,
58        &ctx,
59        "Notification",
60        WORKER_NOTIFICATION_SENDER_RETRIES,
61    )
62}
63
64async fn handle_request(
65    request: NotificationSend,
66    context: &ThinData<DefaultAppState>,
67) -> Result<()> {
68    debug!(
69        notification_id = %request.notification_id,
70        "sending notification"
71    );
72    let notification = context
73        .notification_repository
74        .get_by_id(request.notification_id.clone())
75        .await?;
76
77    let notification_service =
78        WebhookNotificationService::new(notification.url, notification.signing_key);
79
80    notification_service
81        .send_notification(request.notification)
82        .await?;
83
84    debug!(
85        notification_id = %request.notification_id,
86        "notification sent successfully"
87    );
88
89    Ok(())
90}
91
92#[cfg(test)]
93mod tests {
94    use super::*;
95    use crate::models::{
96        EvmTransactionResponse, NetworkType, RelayerDisabledPayload, RelayerEvmPolicy,
97        RelayerNetworkPolicyResponse, RelayerResponse, TransactionResponse, TransactionStatus,
98        WebhookNotification, WebhookPayload, U256,
99    };
100
101    #[tokio::test]
102    async fn test_notification_job_creation() {
103        // Create a basic notification webhook payload
104        let payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
105            EvmTransactionResponse {
106                id: "tx123".to_string(),
107                hash: Some("0x123".to_string()),
108                status: TransactionStatus::Confirmed,
109                status_reason: None,
110                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
111                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
112                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
113                gas_price: Some(1000000000),
114                gas_limit: Some(21000),
115                nonce: Some(1),
116                value: U256::from(1000000000000000000_u64),
117                from: "0xabc".to_string(),
118                to: Some("0xdef".to_string()),
119                relayer_id: "relayer-1".to_string(),
120                data: None,
121                max_fee_per_gas: None,
122                max_priority_fee_per_gas: None,
123                signature: None,
124                speed: None,
125            },
126        )));
127
128        // Create a notification
129        let notification = WebhookNotification::new("test_event".to_string(), payload);
130        let notification_job =
131            NotificationSend::new("notification-1".to_string(), notification.clone());
132
133        // Create the job
134        let job = Job::new(crate::jobs::JobType::NotificationSend, notification_job);
135
136        // Test the job structure
137        assert_eq!(job.data.notification_id, "notification-1");
138        assert_eq!(job.data.notification.event, "test_event");
139    }
140
141    #[tokio::test]
142    async fn test_notification_job_with_different_payloads() {
143        // Test with different payload types
144
145        let transaction_payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
146            EvmTransactionResponse {
147                id: "tx123".to_string(),
148                hash: Some("0x123".to_string()),
149                status: TransactionStatus::Confirmed,
150                status_reason: None,
151                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
152                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
153                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
154                gas_price: Some(1000000000),
155                gas_limit: Some(21000),
156                nonce: Some(1),
157                value: U256::from(1000000000000000000_u64),
158                from: "0xabc".to_string(),
159                to: Some("0xdef".to_string()),
160                relayer_id: "relayer-1".to_string(),
161                data: None,
162                max_fee_per_gas: None,
163                max_priority_fee_per_gas: None,
164                signature: None,
165                speed: None,
166            },
167        )));
168
169        let string_notification =
170            WebhookNotification::new("transaction_payload".to_string(), transaction_payload);
171        let job = NotificationSend::new("notification-string".to_string(), string_notification);
172        assert_eq!(job.notification.event, "transaction_payload");
173
174        let relayer_disabled = WebhookPayload::RelayerDisabled(Box::new(RelayerDisabledPayload {
175            relayer: RelayerResponse {
176                id: "relayer-1".to_string(),
177                name: "relayer-1".to_string(),
178                network: "ethereum".to_string(),
179                network_type: NetworkType::Evm,
180                paused: false,
181                policies: Some(RelayerNetworkPolicyResponse::Evm(
182                    RelayerEvmPolicy {
183                        gas_price_cap: None,
184                        whitelist_receivers: None,
185                        eip1559_pricing: None,
186                        private_transactions: Some(false),
187                        min_balance: Some(0),
188                        gas_limit_estimation: None,
189                    }
190                    .into(),
191                )),
192                signer_id: "signer-1".to_string(),
193                notification_id: None,
194                custom_rpc_urls: None,
195                address: Some("0xabc".to_string()),
196                system_disabled: Some(false),
197                ..Default::default()
198            },
199            disable_reason: "test".to_string(),
200        }));
201        let object_notification =
202            WebhookNotification::new("object_event".to_string(), relayer_disabled);
203        let job = NotificationSend::new("notification-object".to_string(), object_notification);
204        assert_eq!(job.notification.event, "object_event");
205    }
206}