openzeppelin_relayer/jobs/handlers/
token_swap_request_handler.rs

1//! Unified swap request handling worker implementation.
2//!
3//! This module implements the token swap request handling worker that processes
4//! swap jobs from the queue for all supported networks (Solana and Stellar).
5
6use actix_web::web::ThinData;
7use eyre::Result as EyreResult;
8use tracing::{debug, info, instrument};
9
10use crate::{
11    constants::WORKER_TOKEN_SWAP_REQUEST_RETRIES,
12    domain::get_network_relayer,
13    jobs::{handle_result, Job, TokenSwapRequest},
14    models::DefaultAppState,
15    observability::request_id::set_request_id,
16    queues::{HandlerError, WorkerContext},
17};
18
19/// Handles incoming swap jobs from the queue.
20///
21/// # Arguments
22/// * `job` - The swap job containing relayer ID
23/// * `context` - Application state containing services
24///
25/// # Returns
26/// * `Result<(), HandlerError>` - Success or failure of swap processing
27#[instrument(
28    level = "debug",
29    skip(job, context),
30    fields(
31        request_id = ?job.request_id,
32        job_id = %job.message_id,
33        job_type = %job.job_type.to_string(),
34        attempt = %ctx.attempt,
35        relayer_id = %job.data.relayer_id,
36        task_id = %ctx.task_id,
37    )
38)]
39pub async fn token_swap_request_handler(
40    job: Job<TokenSwapRequest>,
41    context: ThinData<DefaultAppState>,
42    ctx: WorkerContext,
43) -> std::result::Result<(), HandlerError> {
44    if let Some(request_id) = job.request_id.clone() {
45        set_request_id(request_id);
46    }
47
48    debug!(
49        relayer_id = %job.data.relayer_id,
50        "handling token swap request"
51    );
52
53    let result = handle_request(job.data, &context).await;
54
55    handle_result(
56        result,
57        &ctx,
58        "TokenSwapRequest",
59        WORKER_TOKEN_SWAP_REQUEST_RETRIES,
60    )
61}
62
63#[derive(Default, Debug, Clone)]
64pub struct TokenSwapCronReminder();
65
66/// Handles incoming swap jobs from the cron queue.
67#[instrument(
68    level = "info",
69    skip(_job, data, relayer_id),
70    fields(
71        job_type = "token_swap_cron",
72        attempt = %ctx.attempt,
73    ),
74    err
75)]
76pub async fn token_swap_cron_handler(
77    _job: TokenSwapCronReminder,
78    relayer_id: String,
79    data: ThinData<DefaultAppState>,
80    ctx: WorkerContext,
81) -> std::result::Result<(), HandlerError> {
82    info!(
83        relayer_id = %relayer_id,
84        "handling token swap cron request"
85    );
86
87    let result = handle_request(
88        TokenSwapRequest {
89            relayer_id: relayer_id.to_string(),
90        },
91        &data,
92    )
93    .await;
94
95    handle_result(
96        result,
97        &ctx,
98        "TokenSwapRequest",
99        WORKER_TOKEN_SWAP_REQUEST_RETRIES,
100    )
101}
102
103async fn handle_request(
104    request: TokenSwapRequest,
105    context: &ThinData<DefaultAppState>,
106) -> EyreResult<()> {
107    debug!(
108        relayer_id = %request.relayer_id,
109        "processing token swap"
110    );
111
112    let relayer = get_network_relayer(request.relayer_id.clone(), context).await?;
113
114    relayer
115        .handle_token_swap_request(request.relayer_id.clone())
116        .await
117        .map_err(|e| eyre::eyre!("Failed to handle token swap request: {}", e))?;
118
119    debug!(
120        relayer_id = %request.relayer_id,
121        "token swap request completed"
122    );
123
124    Ok(())
125}
126
127#[cfg(test)]
128mod tests {}