openzeppelin_relayer/jobs/handlers/
notification_handler.rs1use actix_web::web::ThinData;
7use eyre::Result;
8use tracing::{debug, instrument};
9
10use crate::{
11 constants::WORKER_NOTIFICATION_SENDER_RETRIES,
12 jobs::{handle_result, Job, NotificationSend},
13 models::DefaultAppState,
14 observability::request_id::set_request_id,
15 queues::{HandlerError, WorkerContext},
16 repositories::Repository,
17 services::WebhookNotificationService,
18};
19
20#[instrument(
29 level = "debug",
30 skip(job, context),
31 fields(
32 request_id = ?job.request_id,
33 job_id = %job.message_id,
34 job_type = %job.job_type.to_string(),
35 attempt = %ctx.attempt,
36 task_id = %ctx.task_id,
37 notification_id = %job.data.notification_id,
38 )
39)]
40pub async fn notification_handler(
41 job: Job<NotificationSend>,
42 context: ThinData<DefaultAppState>,
43 ctx: WorkerContext,
44) -> Result<(), HandlerError> {
45 if let Some(request_id) = job.request_id.clone() {
46 set_request_id(request_id);
47 }
48
49 debug!(
50 notification_id = %job.data.notification_id,
51 "handling notification"
52 );
53
54 let result = handle_request(job.data, &context).await;
55
56 handle_result(
57 result,
58 &ctx,
59 "Notification",
60 WORKER_NOTIFICATION_SENDER_RETRIES,
61 )
62}
63
64async fn handle_request(
65 request: NotificationSend,
66 context: &ThinData<DefaultAppState>,
67) -> Result<()> {
68 debug!(
69 notification_id = %request.notification_id,
70 "sending notification"
71 );
72 let notification = context
73 .notification_repository
74 .get_by_id(request.notification_id.clone())
75 .await?;
76
77 let notification_service =
78 WebhookNotificationService::new(notification.url, notification.signing_key);
79
80 notification_service
81 .send_notification(request.notification)
82 .await?;
83
84 debug!(
85 notification_id = %request.notification_id,
86 "notification sent successfully"
87 );
88
89 Ok(())
90}
91
92#[cfg(test)]
93mod tests {
94 use super::*;
95 use crate::models::{
96 EvmTransactionResponse, NetworkType, RelayerDisabledPayload, RelayerEvmPolicy,
97 RelayerNetworkPolicyResponse, RelayerResponse, TransactionResponse, TransactionStatus,
98 WebhookNotification, WebhookPayload, U256,
99 };
100
101 #[tokio::test]
102 async fn test_notification_job_creation() {
103 let payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
105 EvmTransactionResponse {
106 id: "tx123".to_string(),
107 hash: Some("0x123".to_string()),
108 status: TransactionStatus::Confirmed,
109 status_reason: None,
110 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
111 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
112 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
113 gas_price: Some(1000000000),
114 gas_limit: Some(21000),
115 nonce: Some(1),
116 value: U256::from(1000000000000000000_u64),
117 from: "0xabc".to_string(),
118 to: Some("0xdef".to_string()),
119 relayer_id: "relayer-1".to_string(),
120 data: None,
121 max_fee_per_gas: None,
122 max_priority_fee_per_gas: None,
123 signature: None,
124 speed: None,
125 },
126 )));
127
128 let notification = WebhookNotification::new("test_event".to_string(), payload);
130 let notification_job =
131 NotificationSend::new("notification-1".to_string(), notification.clone());
132
133 let job = Job::new(crate::jobs::JobType::NotificationSend, notification_job);
135
136 assert_eq!(job.data.notification_id, "notification-1");
138 assert_eq!(job.data.notification.event, "test_event");
139 }
140
141 #[tokio::test]
142 async fn test_notification_job_with_different_payloads() {
143 let transaction_payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
146 EvmTransactionResponse {
147 id: "tx123".to_string(),
148 hash: Some("0x123".to_string()),
149 status: TransactionStatus::Confirmed,
150 status_reason: None,
151 created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
152 sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
153 confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
154 gas_price: Some(1000000000),
155 gas_limit: Some(21000),
156 nonce: Some(1),
157 value: U256::from(1000000000000000000_u64),
158 from: "0xabc".to_string(),
159 to: Some("0xdef".to_string()),
160 relayer_id: "relayer-1".to_string(),
161 data: None,
162 max_fee_per_gas: None,
163 max_priority_fee_per_gas: None,
164 signature: None,
165 speed: None,
166 },
167 )));
168
169 let string_notification =
170 WebhookNotification::new("transaction_payload".to_string(), transaction_payload);
171 let job = NotificationSend::new("notification-string".to_string(), string_notification);
172 assert_eq!(job.notification.event, "transaction_payload");
173
174 let relayer_disabled = WebhookPayload::RelayerDisabled(Box::new(RelayerDisabledPayload {
175 relayer: RelayerResponse {
176 id: "relayer-1".to_string(),
177 name: "relayer-1".to_string(),
178 network: "ethereum".to_string(),
179 network_type: NetworkType::Evm,
180 paused: false,
181 policies: Some(RelayerNetworkPolicyResponse::Evm(
182 RelayerEvmPolicy {
183 gas_price_cap: None,
184 whitelist_receivers: None,
185 eip1559_pricing: None,
186 private_transactions: Some(false),
187 min_balance: Some(0),
188 gas_limit_estimation: None,
189 }
190 .into(),
191 )),
192 signer_id: "signer-1".to_string(),
193 notification_id: None,
194 custom_rpc_urls: None,
195 address: Some("0xabc".to_string()),
196 system_disabled: Some(false),
197 ..Default::default()
198 },
199 disable_reason: "test".to_string(),
200 }));
201 let object_notification =
202 WebhookNotification::new("object_event".to_string(), relayer_disabled);
203 let job = NotificationSend::new("notification-object".to_string(), object_notification);
204 assert_eq!(job.notification.event, "object_event");
205 }
206}