1use std::sync::Arc;
28
29use crate::{
30 constants::{
31 transactions::PENDING_TRANSACTION_STATUSES, EVM_SMALLEST_UNIT_NAME,
32 EVM_STATUS_CHECK_INITIAL_DELAY_SECONDS,
33 },
34 domain::{
35 relayer::{Relayer, RelayerError},
36 BalanceResponse, SignDataRequest, SignDataResponse, SignTransactionExternalResponse,
37 SignTransactionRequest, SignTypedDataRequest,
38 },
39 jobs::{
40 JobProducerTrait, RelayerHealthCheck, TransactionRequest, TransactionSend,
41 TransactionStatusCheck,
42 },
43 models::{
44 produce_relayer_disabled_payload, DeletePendingTransactionsResponse, DisabledReason,
45 EvmNetwork, HealthCheckFailure, JsonRpcRequest, JsonRpcResponse, NetworkRepoModel,
46 NetworkRpcRequest, NetworkRpcResult, NetworkTransactionRequest, NetworkType,
47 PaginationQuery, RelayerRepoModel, RelayerStatus, RepositoryError, RpcErrorCodes,
48 TransactionRepoModel, TransactionStatus,
49 },
50 repositories::{NetworkRepository, RelayerRepository, Repository, TransactionRepository},
51 services::{
52 provider::{EvmProvider, EvmProviderTrait},
53 signer::{DataSignerTrait, EvmSigner},
54 TransactionCounterService, TransactionCounterServiceTrait,
55 },
56 utils::calculate_scheduled_timestamp,
57};
58use async_trait::async_trait;
59use eyre::Result;
60use tracing::{debug, info, instrument, warn};
61
62use super::{create_error_response, create_success_response, EvmTransactionValidator};
63use crate::utils::{map_provider_error, sanitize_error_description};
64
65#[allow(dead_code)]
66pub struct EvmRelayer<P, RR, NR, TR, J, S, TCS>
67where
68 P: EvmProviderTrait + Send + Sync,
69 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
70 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
71 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
72 J: JobProducerTrait + Send + Sync + 'static,
73 S: DataSignerTrait + Send + Sync + 'static,
74{
75 relayer: RelayerRepoModel,
76 signer: S,
77 network: EvmNetwork,
78 provider: P,
79 relayer_repository: Arc<RR>,
80 network_repository: Arc<NR>,
81 transaction_repository: Arc<TR>,
82 job_producer: Arc<J>,
83 transaction_counter_service: Arc<TCS>,
84}
85
86#[allow(clippy::too_many_arguments)]
87impl<P, RR, NR, TR, J, S, TCS> EvmRelayer<P, RR, NR, TR, J, S, TCS>
88where
89 P: EvmProviderTrait + Send + Sync,
90 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
91 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
92 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
93 J: JobProducerTrait + Send + Sync + 'static,
94 S: DataSignerTrait + Send + Sync + 'static,
95 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
96{
97 pub fn new(
114 relayer: RelayerRepoModel,
115 signer: S,
116 provider: P,
117 network: EvmNetwork,
118 relayer_repository: Arc<RR>,
119 network_repository: Arc<NR>,
120 transaction_repository: Arc<TR>,
121 transaction_counter_service: Arc<TCS>,
122 job_producer: Arc<J>,
123 ) -> Result<Self, RelayerError> {
124 Ok(Self {
125 relayer,
126 signer,
127 network,
128 provider,
129 relayer_repository,
130 network_repository,
131 transaction_repository,
132 transaction_counter_service,
133 job_producer,
134 })
135 }
136
137 #[instrument(
143 level = "debug",
144 skip(self),
145 fields(
146 request_id = ?crate::observability::request_id::get_request_id(),
147 relayer_id = %self.relayer.id,
148 )
149 )]
150 async fn sync_nonce(&self) -> Result<(), RelayerError> {
151 let on_chain_nonce = self
152 .provider
153 .get_transaction_count(&self.relayer.address)
154 .await
155 .map_err(|e| RelayerError::ProviderError(e.to_string()))?;
156
157 let transaction_counter_nonce = self
158 .transaction_counter_service
159 .get()
160 .await
161 .ok()
162 .flatten()
163 .unwrap_or(0);
164
165 let nonce = std::cmp::max(on_chain_nonce, transaction_counter_nonce);
166
167 debug!(
168 relayer_id = %self.relayer.id,
169 on_chain_nonce = %on_chain_nonce,
170 transaction_counter_nonce = %transaction_counter_nonce,
171 "syncing nonce"
172 );
173
174 debug!(nonce = %nonce, "setting nonce for relayer");
175
176 self.transaction_counter_service.set(nonce).await?;
177
178 Ok(())
179 }
180
181 #[instrument(
187 level = "debug",
188 skip(self),
189 fields(
190 request_id = ?crate::observability::request_id::get_request_id(),
191 relayer_id = %self.relayer.id,
192 )
193 )]
194 async fn validate_rpc(&self) -> Result<(), RelayerError> {
195 self.provider
196 .health_check()
197 .await
198 .map_err(|e| RelayerError::ProviderError(e.to_string()))?;
199
200 Ok(())
201 }
202
203 #[instrument(
213 level = "debug",
214 skip(self, transaction),
215 fields(
216 request_id = ?crate::observability::request_id::get_request_id(),
217 relayer_id = %self.relayer.id,
218 tx_id = %transaction.id,
219 )
220 )]
221 async fn cancel_transaction_via_job(
222 &self,
223 transaction: TransactionRepoModel,
224 ) -> Result<(), RelayerError> {
225 let cancel_job = TransactionSend::cancel(
226 transaction.id.clone(),
227 transaction.relayer_id.clone(),
228 "Cancelled via delete_pending_transactions".to_string(),
229 );
230
231 self.job_producer
232 .produce_submit_transaction_job(cancel_job, None)
233 .await
234 .map_err(RelayerError::from)?;
235
236 Ok(())
237 }
238}
239
240pub type DefaultEvmRelayer<J, T, RR, NR, TCR> =
242 EvmRelayer<EvmProvider, RR, NR, T, J, EvmSigner, TransactionCounterService<TCR>>;
243
244#[async_trait]
245impl<P, RR, NR, TR, J, S, TCS> Relayer for EvmRelayer<P, RR, NR, TR, J, S, TCS>
246where
247 P: EvmProviderTrait + Send + Sync,
248 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
249 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
250 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
251 J: JobProducerTrait + Send + Sync + 'static,
252 S: DataSignerTrait + Send + Sync + 'static,
253 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
254{
255 #[instrument(
265 level = "debug",
266 skip(self, network_transaction),
267 fields(
268 request_id = ?crate::observability::request_id::get_request_id(),
269 relayer_id = %self.relayer.id,
270 network_type = ?self.relayer.network_type,
271 )
272 )]
273 async fn process_transaction_request(
274 &self,
275 network_transaction: NetworkTransactionRequest,
276 ) -> Result<TransactionRepoModel, RelayerError> {
277 let network_model = self
278 .network_repository
279 .get_by_name(NetworkType::Evm, &self.relayer.network)
280 .await?
281 .ok_or_else(|| {
282 RelayerError::NetworkConfiguration(format!(
283 "Network {} not found",
284 self.relayer.network
285 ))
286 })?;
287 let transaction =
288 TransactionRepoModel::try_from((&network_transaction, &self.relayer, &network_model))?;
289
290 self.transaction_repository
291 .create(transaction.clone())
292 .await
293 .map_err(|e| RepositoryError::TransactionFailure(e.to_string()))?;
294
295 self.job_producer
297 .produce_transaction_request_job(
298 TransactionRequest::new(transaction.id.clone(), transaction.relayer_id.clone()),
299 None,
300 )
301 .await?;
302
303 self.job_producer
305 .produce_check_transaction_status_job(
306 TransactionStatusCheck::new(
307 transaction.id.clone(),
308 transaction.relayer_id.clone(),
309 crate::models::NetworkType::Evm,
310 ),
311 Some(calculate_scheduled_timestamp(
312 EVM_STATUS_CHECK_INITIAL_DELAY_SECONDS,
313 )),
314 )
315 .await?;
316
317 Ok(transaction)
318 }
319
320 #[instrument(
326 level = "debug",
327 skip(self),
328 fields(
329 request_id = ?crate::observability::request_id::get_request_id(),
330 relayer_id = %self.relayer.id,
331 )
332 )]
333 async fn get_balance(&self) -> Result<BalanceResponse, RelayerError> {
334 let balance: u128 = self
335 .provider
336 .get_balance(&self.relayer.address)
337 .await
338 .map_err(|e| RelayerError::ProviderError(e.to_string()))?
339 .try_into()
340 .map_err(|_| {
341 RelayerError::ProviderError("Failed to convert balance to u128".to_string())
342 })?;
343
344 Ok(BalanceResponse {
345 balance,
346 unit: EVM_SMALLEST_UNIT_NAME.to_string(),
347 })
348 }
349
350 #[instrument(
356 level = "debug",
357 skip(self),
358 fields(
359 request_id = ?crate::observability::request_id::get_request_id(),
360 relayer_id = %self.relayer.id,
361 )
362 )]
363 async fn get_status(&self) -> Result<RelayerStatus, RelayerError> {
364 let relayer_model = &self.relayer;
365
366 let nonce = self
368 .transaction_counter_service
369 .get()
370 .await
371 .ok()
372 .flatten()
373 .unwrap_or(0);
374 let nonce_str = nonce.to_string();
375
376 let balance_response = self.get_balance().await?;
377
378 let pending_transactions_count = self
380 .transaction_repository
381 .count_by_status(&relayer_model.id, PENDING_TRANSACTION_STATUSES)
382 .await
383 .map_err(RelayerError::from)?;
384
385 let last_confirmed_transaction_timestamp = self
387 .transaction_repository
388 .find_by_status_paginated(
389 &relayer_model.id,
390 &[TransactionStatus::Confirmed],
391 PaginationQuery {
392 page: 1,
393 per_page: 1,
394 },
395 false, )
397 .await
398 .map_err(RelayerError::from)?
399 .items
400 .into_iter()
401 .next()
402 .and_then(|tx| tx.confirmed_at);
403
404 Ok(RelayerStatus::Evm {
405 balance: balance_response.balance.to_string(),
406 pending_transactions_count,
407 last_confirmed_transaction_timestamp,
408 system_disabled: relayer_model.system_disabled,
409 paused: relayer_model.paused,
410 nonce: nonce_str,
411 })
412 }
413
414 #[instrument(
421 level = "debug",
422 skip(self),
423 fields(
424 request_id = ?crate::observability::request_id::get_request_id(),
425 relayer_id = %self.relayer.id,
426 )
427 )]
428 async fn delete_pending_transactions(
429 &self,
430 ) -> Result<DeletePendingTransactionsResponse, RelayerError> {
431 let pending_statuses = [
432 TransactionStatus::Pending,
433 TransactionStatus::Sent,
434 TransactionStatus::Submitted,
435 ];
436
437 let pending_transactions = self
439 .transaction_repository
440 .find_by_status(&self.relayer.id, &pending_statuses[..])
441 .await
442 .map_err(RelayerError::from)?;
443
444 let transaction_count = pending_transactions.len();
445
446 if transaction_count == 0 {
447 info!(
448 relayer_id = %self.relayer.id,
449 "no pending transactions found for relayer"
450 );
451 return Ok(DeletePendingTransactionsResponse {
452 queued_for_cancellation_transaction_ids: vec![],
453 failed_to_queue_transaction_ids: vec![],
454 total_processed: 0,
455 });
456 }
457
458 info!(
459 relayer_id = %self.relayer.id,
460 transaction_count = %transaction_count,
461 "processing pending transactions for relayer"
462 );
463
464 let mut cancelled_transaction_ids = Vec::new();
465 let mut failed_transaction_ids = Vec::new();
466
467 for transaction in pending_transactions {
469 match self.cancel_transaction_via_job(transaction.clone()).await {
470 Ok(_) => {
471 cancelled_transaction_ids.push(transaction.id.clone());
472 info!(
473 tx_id = %transaction.id,
474 relayer_id = %self.relayer.id,
475 status = ?transaction.status,
476 "initiated cancellation for transaction"
477 );
478 }
479 Err(e) => {
480 failed_transaction_ids.push(transaction.id.clone());
481 warn!(
482 tx_id = %transaction.id,
483 relayer_id = %self.relayer.id,
484 error = %e,
485 "failed to cancel transaction"
486 );
487 }
488 }
489 }
490
491 let total_processed = cancelled_transaction_ids.len() + failed_transaction_ids.len();
492
493 debug!(
494 queued_for_cancellation = %cancelled_transaction_ids.len(),
495 failed_to_queue = %failed_transaction_ids.len(),
496 "completed processing pending transactions for relayer"
497 );
498
499 Ok(DeletePendingTransactionsResponse {
500 queued_for_cancellation_transaction_ids: cancelled_transaction_ids,
501 failed_to_queue_transaction_ids: failed_transaction_ids,
502 total_processed: total_processed as u32,
503 })
504 }
505
506 #[instrument(
516 level = "debug",
517 skip(self, request),
518 fields(
519 request_id = ?crate::observability::request_id::get_request_id(),
520 relayer_id = %self.relayer.id,
521 )
522 )]
523 async fn sign_data(&self, request: SignDataRequest) -> Result<SignDataResponse, RelayerError> {
524 let result = self.signer.sign_data(request).await?;
525
526 Ok(result)
527 }
528
529 #[instrument(
539 level = "debug",
540 skip(self, request),
541 fields(
542 request_id = ?crate::observability::request_id::get_request_id(),
543 relayer_id = %self.relayer.id,
544 )
545 )]
546 async fn sign_typed_data(
547 &self,
548 request: SignTypedDataRequest,
549 ) -> Result<SignDataResponse, RelayerError> {
550 let result = self.signer.sign_typed_data(request).await?;
551
552 Ok(result)
553 }
554
555 #[instrument(
565 level = "debug",
566 skip(self, request),
567 fields(
568 request_id = ?crate::observability::request_id::get_request_id(),
569 relayer_id = %self.relayer.id,
570 )
571 )]
572 async fn rpc(
573 &self,
574 request: JsonRpcRequest<NetworkRpcRequest>,
575 ) -> Result<JsonRpcResponse<NetworkRpcResult>, RelayerError> {
576 let evm_request = match request.params {
577 NetworkRpcRequest::Evm(evm_req) => evm_req,
578 _ => {
579 return Ok(create_error_response(
580 request.id,
581 RpcErrorCodes::INVALID_PARAMS,
582 "Invalid params",
583 "Expected EVM network request",
584 ))
585 }
586 };
587
588 let (method, params_json) = match evm_request {
590 crate::models::EvmRpcRequest::RawRpcRequest { method, params } => (method, params),
591 };
592
593 match self.provider.raw_request_dyn(&method, params_json).await {
595 Ok(result_value) => Ok(create_success_response(request.id, result_value)),
596 Err(provider_error) => {
597 tracing::error!(
599 error = %provider_error,
600 "RPC provider error occurred"
601 );
602 let (error_code, error_message) = map_provider_error(&provider_error);
603 let sanitized_description = sanitize_error_description(&provider_error);
604 Ok(create_error_response(
605 request.id,
606 error_code,
607 error_message,
608 &sanitized_description,
609 ))
610 }
611 }
612 }
613
614 #[instrument(
620 level = "debug",
621 skip(self),
622 fields(
623 request_id = ?crate::observability::request_id::get_request_id(),
624 relayer_id = %self.relayer.id,
625 )
626 )]
627 async fn validate_min_balance(&self) -> Result<(), RelayerError> {
628 let policy = self.relayer.policies.get_evm_policy();
629 EvmTransactionValidator::init_balance_validation(
630 &self.relayer.address,
631 &policy,
632 &self.provider,
633 )
634 .await
635 .map_err(|e| RelayerError::InsufficientBalanceError(e.to_string()))?;
636
637 Ok(())
638 }
639
640 #[instrument(
646 level = "debug",
647 skip(self),
648 fields(
649 request_id = ?crate::observability::request_id::get_request_id(),
650 relayer_id = %self.relayer.id,
651 )
652 )]
653 async fn check_health(&self) -> Result<(), Vec<HealthCheckFailure>> {
654 debug!("running health checks");
655
656 let nonce_sync_result = self.sync_nonce().await;
657 let validate_rpc_result = self.validate_rpc().await;
658 let validate_min_balance_result = self.validate_min_balance().await;
659
660 let failures: Vec<HealthCheckFailure> = vec![
662 nonce_sync_result
663 .err()
664 .map(|e| HealthCheckFailure::NonceSyncFailed(e.to_string())),
665 validate_rpc_result
666 .err()
667 .map(|e| HealthCheckFailure::RpcValidationFailed(e.to_string())),
668 validate_min_balance_result
669 .err()
670 .map(|e| HealthCheckFailure::BalanceCheckFailed(e.to_string())),
671 ]
672 .into_iter()
673 .flatten()
674 .collect();
675
676 if failures.is_empty() {
677 info!("all health checks passed");
678 Ok(())
679 } else {
680 warn!("health checks failed: {:?}", failures);
681 Err(failures)
682 }
683 }
684
685 #[instrument(
686 level = "debug",
687 skip(self),
688 fields(
689 request_id = ?crate::observability::request_id::get_request_id(),
690 relayer_id = %self.relayer.id,
691 )
692 )]
693 async fn initialize_relayer(&self) -> Result<(), RelayerError> {
694 debug!("initializing EVM relayer");
695
696 match self.check_health().await {
697 Ok(_) => {
698 if self.relayer.system_disabled {
700 self.relayer_repository
702 .enable_relayer(self.relayer.id.clone())
703 .await?;
704 }
705 Ok(())
706 }
707 Err(failures) => {
708 let reason = DisabledReason::from_health_failures(failures).unwrap_or_else(|| {
710 DisabledReason::RpcValidationFailed("Unknown error".to_string())
711 });
712
713 warn!(reason = %reason, "disabling relayer");
714 let updated_relayer = self
715 .relayer_repository
716 .disable_relayer(self.relayer.id.clone(), reason.clone())
717 .await?;
718
719 if let Some(notification_id) = &self.relayer.notification_id {
721 self.job_producer
722 .produce_send_notification_job(
723 produce_relayer_disabled_payload(
724 notification_id,
725 &updated_relayer,
726 &reason.safe_description(),
727 ),
728 None,
729 )
730 .await?;
731 }
732
733 self.job_producer
735 .produce_relayer_health_check_job(
736 RelayerHealthCheck::new(self.relayer.id.clone()),
737 Some(calculate_scheduled_timestamp(10)),
738 )
739 .await?;
740
741 Ok(())
742 }
743 }
744 }
745
746 #[instrument(
747 level = "debug",
748 skip(self, _request),
749 fields(
750 request_id = ?crate::observability::request_id::get_request_id(),
751 relayer_id = %self.relayer.id,
752 )
753 )]
754 async fn sign_transaction(
755 &self,
756 _request: &SignTransactionRequest,
757 ) -> Result<SignTransactionExternalResponse, RelayerError> {
758 Err(RelayerError::NotSupported(
759 "Transaction signing not supported for EVM".to_string(),
760 ))
761 }
762}
763
764#[cfg(test)]
765mod tests {
766 use super::*;
767 use crate::models::RpcConfig;
768 use crate::{
769 config::{EvmNetworkConfig, NetworkConfigCommon},
770 jobs::MockJobProducerTrait,
771 models::{
772 EvmRpcRequest, EvmRpcResult, JsonRpcId, NetworkRepoModel, NetworkType,
773 RelayerEvmPolicy, RelayerNetworkPolicy, RepositoryError, SignerError,
774 TransactionStatus, U256,
775 },
776 repositories::{MockNetworkRepository, MockRelayerRepository, MockTransactionRepository},
777 services::{
778 provider::{MockEvmProviderTrait, ProviderError},
779 MockTransactionCounterServiceTrait,
780 },
781 };
782 use mockall::predicate::*;
783 use std::future::ready;
784
785 mockall::mock! {
786 pub DataSigner {}
787
788 #[async_trait]
789 impl DataSignerTrait for DataSigner {
790 async fn sign_data(&self, request: SignDataRequest) -> Result<SignDataResponse, SignerError>;
791 async fn sign_typed_data(&self, request: SignTypedDataRequest) -> Result<SignDataResponse, SignerError>;
792 }
793 }
794
795 fn create_test_evm_network() -> EvmNetwork {
796 EvmNetwork {
797 network: "mainnet".to_string(),
798 rpc_urls: vec![RpcConfig::new(
799 "https://mainnet.infura.io/v3/YOUR_INFURA_API_KEY".to_string(),
800 )],
801 explorer_urls: None,
802 average_blocktime_ms: 12000,
803 is_testnet: false,
804 tags: vec!["mainnet".to_string()],
805 chain_id: 1,
806 required_confirmations: 1,
807 features: vec!["eip1559".to_string()],
808 symbol: "ETH".to_string(),
809 gas_price_cache: None,
810 }
811 }
812
813 fn create_test_network_repo_model() -> NetworkRepoModel {
814 let config = EvmNetworkConfig {
815 common: NetworkConfigCommon {
816 network: "mainnet".to_string(),
817 from: None,
818 rpc_urls: Some(vec![crate::models::RpcConfig::new(
819 "https://mainnet.infura.io/v3/YOUR_INFURA_API_KEY".to_string(),
820 )]),
821 explorer_urls: None,
822 average_blocktime_ms: Some(12000),
823 is_testnet: Some(false),
824 tags: Some(vec!["mainnet".to_string()]),
825 },
826 chain_id: Some(1),
827 required_confirmations: Some(1),
828 features: Some(vec!["eip1559".to_string()]),
829 symbol: Some("ETH".to_string()),
830 gas_price_cache: None,
831 };
832
833 NetworkRepoModel::new_evm(config)
834 }
835
836 fn create_test_relayer() -> RelayerRepoModel {
837 RelayerRepoModel {
838 id: "test-relayer-id".to_string(),
839 name: "Test Relayer".to_string(),
840 network: "mainnet".to_string(), address: "0xSender".to_string(),
842 paused: false,
843 system_disabled: false,
844 signer_id: "test-signer-id".to_string(),
845 notification_id: Some("test-notification-id".to_string()),
846 policies: RelayerNetworkPolicy::Evm(RelayerEvmPolicy {
847 min_balance: Some(100000000000000000u128), whitelist_receivers: Some(vec!["0xRecipient".to_string()]),
849 gas_price_cap: Some(100000000000), eip1559_pricing: Some(true),
851 private_transactions: Some(false),
852 gas_limit_estimation: Some(true),
853 }),
854 network_type: NetworkType::Evm,
855 custom_rpc_urls: None,
856 ..Default::default()
857 }
858 }
859
860 fn setup_mocks() -> (
861 MockEvmProviderTrait,
862 MockRelayerRepository,
863 MockNetworkRepository,
864 MockTransactionRepository,
865 MockJobProducerTrait,
866 MockDataSigner,
867 MockTransactionCounterServiceTrait,
868 ) {
869 (
870 MockEvmProviderTrait::new(),
871 MockRelayerRepository::new(),
872 MockNetworkRepository::new(),
873 MockTransactionRepository::new(),
874 MockJobProducerTrait::new(),
875 MockDataSigner::new(),
876 MockTransactionCounterServiceTrait::new(),
877 )
878 }
879
880 #[tokio::test]
881 async fn test_get_balance() {
882 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
883 setup_mocks();
884 let relayer_model = create_test_relayer();
885
886 provider
887 .expect_get_balance()
888 .with(eq("0xSender"))
889 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64))))); let relayer = EvmRelayer::new(
892 relayer_model,
893 signer,
894 provider,
895 create_test_evm_network(),
896 Arc::new(relayer_repo),
897 Arc::new(network_repo),
898 Arc::new(tx_repo),
899 Arc::new(counter),
900 Arc::new(job_producer),
901 )
902 .unwrap();
903
904 let balance = relayer.get_balance().await.unwrap();
905 assert_eq!(balance.balance, 1000000000000000000u128);
906 assert_eq!(balance.unit, EVM_SMALLEST_UNIT_NAME);
907 }
908
909 #[tokio::test]
910 async fn test_process_transaction_request() {
911 let (
912 provider,
913 relayer_repo,
914 mut network_repo,
915 mut tx_repo,
916 mut job_producer,
917 signer,
918 counter,
919 ) = setup_mocks();
920 let relayer_model = create_test_relayer();
921
922 let network_tx = NetworkTransactionRequest::Evm(crate::models::EvmTransactionRequest {
923 to: Some("0xRecipient".to_string()),
924 value: U256::from(1000000000000000000u64),
925 data: Some("0xData".to_string()),
926 gas_limit: Some(21000),
927 gas_price: Some(20000000000),
928 max_fee_per_gas: None,
929 max_priority_fee_per_gas: None,
930 speed: None,
931 valid_until: None,
932 });
933
934 network_repo
935 .expect_get_by_name()
936 .with(eq(NetworkType::Evm), eq("mainnet"))
937 .returning(|_, _| Ok(Some(create_test_network_repo_model())));
938
939 tx_repo.expect_create().returning(Ok);
940 job_producer
941 .expect_produce_transaction_request_job()
942 .returning(|_, _| Box::pin(ready(Ok(()))));
943 job_producer
944 .expect_produce_check_transaction_status_job()
945 .returning(|_, _| Box::pin(ready(Ok(()))));
946
947 let relayer = EvmRelayer::new(
948 relayer_model,
949 signer,
950 provider,
951 create_test_evm_network(),
952 Arc::new(relayer_repo),
953 Arc::new(network_repo),
954 Arc::new(tx_repo),
955 Arc::new(counter),
956 Arc::new(job_producer),
957 )
958 .unwrap();
959
960 let result = relayer.process_transaction_request(network_tx).await;
961 assert!(result.is_ok());
962 }
963
964 #[tokio::test]
965 async fn test_validate_min_balance_sufficient() {
966 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
967 setup_mocks();
968 let relayer_model = create_test_relayer();
969
970 provider
971 .expect_get_balance()
972 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); let relayer = EvmRelayer::new(
975 relayer_model,
976 signer,
977 provider,
978 create_test_evm_network(),
979 Arc::new(relayer_repo),
980 Arc::new(network_repo),
981 Arc::new(tx_repo),
982 Arc::new(counter),
983 Arc::new(job_producer),
984 )
985 .unwrap();
986
987 let result = relayer.validate_min_balance().await;
988 assert!(result.is_ok());
989 }
990
991 #[tokio::test]
992 async fn test_validate_min_balance_insufficient() {
993 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
994 setup_mocks();
995 let relayer_model = create_test_relayer();
996
997 provider
998 .expect_get_balance()
999 .returning(|_| Box::pin(ready(Ok(U256::from(50000000000000000u64))))); let relayer = EvmRelayer::new(
1002 relayer_model,
1003 signer,
1004 provider,
1005 create_test_evm_network(),
1006 Arc::new(relayer_repo),
1007 Arc::new(network_repo),
1008 Arc::new(tx_repo),
1009 Arc::new(counter),
1010 Arc::new(job_producer),
1011 )
1012 .unwrap();
1013
1014 let result = relayer.validate_min_balance().await;
1015 assert!(matches!(
1016 result,
1017 Err(RelayerError::InsufficientBalanceError(_))
1018 ));
1019 }
1020
1021 #[tokio::test]
1022 async fn test_sync_nonce() {
1023 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
1024 setup_mocks();
1025 let relayer_model = create_test_relayer();
1026
1027 provider
1028 .expect_get_transaction_count()
1029 .returning(|_| Box::pin(ready(Ok(42u64))));
1030
1031 counter
1032 .expect_set()
1033 .returning(|_nonce| Box::pin(ready(Ok(()))));
1034
1035 counter
1036 .expect_get()
1037 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
1038
1039 let relayer = EvmRelayer::new(
1040 relayer_model,
1041 signer,
1042 provider,
1043 create_test_evm_network(),
1044 Arc::new(relayer_repo),
1045 Arc::new(network_repo),
1046 Arc::new(tx_repo),
1047 Arc::new(counter),
1048 Arc::new(job_producer),
1049 )
1050 .unwrap();
1051
1052 let result = relayer.sync_nonce().await;
1053 assert!(result.is_ok());
1054 }
1055
1056 #[tokio::test]
1057 async fn test_sync_nonce_lower_on_chain_nonce() {
1058 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
1059 setup_mocks();
1060 let relayer_model = create_test_relayer();
1061
1062 provider
1063 .expect_get_transaction_count()
1064 .returning(|_| Box::pin(ready(Ok(40u64))));
1065
1066 counter
1067 .expect_set()
1068 .with(eq(42u64))
1069 .returning(|_nonce| Box::pin(ready(Ok(()))));
1070
1071 counter
1072 .expect_get()
1073 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
1074
1075 let relayer = EvmRelayer::new(
1076 relayer_model,
1077 signer,
1078 provider,
1079 create_test_evm_network(),
1080 Arc::new(relayer_repo),
1081 Arc::new(network_repo),
1082 Arc::new(tx_repo),
1083 Arc::new(counter),
1084 Arc::new(job_producer),
1085 )
1086 .unwrap();
1087
1088 let result = relayer.sync_nonce().await;
1089 assert!(result.is_ok());
1090 }
1091
1092 #[tokio::test]
1093 async fn test_sync_nonce_lower_transaction_counter_nonce() {
1094 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
1095 setup_mocks();
1096 let relayer_model = create_test_relayer();
1097
1098 provider
1099 .expect_get_transaction_count()
1100 .returning(|_| Box::pin(ready(Ok(42u64))));
1101
1102 counter
1103 .expect_set()
1104 .with(eq(42u64))
1105 .returning(|_nonce| Box::pin(ready(Ok(()))));
1106
1107 counter
1108 .expect_get()
1109 .returning(|| Box::pin(ready(Ok(Some(40u64)))));
1110
1111 let relayer = EvmRelayer::new(
1112 relayer_model,
1113 signer,
1114 provider,
1115 create_test_evm_network(),
1116 Arc::new(relayer_repo),
1117 Arc::new(network_repo),
1118 Arc::new(tx_repo),
1119 Arc::new(counter),
1120 Arc::new(job_producer),
1121 )
1122 .unwrap();
1123
1124 let result = relayer.sync_nonce().await;
1125 assert!(result.is_ok());
1126 }
1127
1128 #[tokio::test]
1129 async fn test_validate_rpc() {
1130 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
1131 setup_mocks();
1132 let relayer_model = create_test_relayer();
1133
1134 provider
1135 .expect_health_check()
1136 .returning(|| Box::pin(ready(Ok(true))));
1137
1138 let relayer = EvmRelayer::new(
1139 relayer_model,
1140 signer,
1141 provider,
1142 create_test_evm_network(),
1143 Arc::new(relayer_repo),
1144 Arc::new(network_repo),
1145 Arc::new(tx_repo),
1146 Arc::new(counter),
1147 Arc::new(job_producer),
1148 )
1149 .unwrap();
1150
1151 let result = relayer.validate_rpc().await;
1152 assert!(result.is_ok());
1153 }
1154
1155 #[tokio::test]
1156 async fn test_get_status_success() {
1157 let (
1158 mut provider,
1159 relayer_repo,
1160 network_repo,
1161 mut tx_repo,
1162 job_producer,
1163 signer,
1164 mut counter,
1165 ) = setup_mocks();
1166 let relayer_model = create_test_relayer();
1167
1168 counter
1170 .expect_get()
1171 .returning(|| Box::pin(ready(Ok(Some(10u64)))))
1172 .once();
1173 provider
1174 .expect_get_balance()
1175 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))))
1176 .once();
1177
1178 tx_repo
1180 .expect_count_by_status()
1181 .withf(|relayer_id, statuses| {
1182 relayer_id == "test-relayer-id"
1183 && statuses
1184 == [
1185 TransactionStatus::Pending,
1186 TransactionStatus::Sent,
1187 TransactionStatus::Submitted,
1188 ]
1189 })
1190 .returning(|_, _| Ok(0u64))
1191 .once();
1192
1193 let latest_confirmed_tx = TransactionRepoModel {
1195 id: "tx1".to_string(),
1196 relayer_id: relayer_model.id.clone(),
1197 status: TransactionStatus::Confirmed,
1198 confirmed_at: Some("2023-01-01T12:00:00Z".to_string()),
1199 ..TransactionRepoModel::default()
1200 };
1201 let relayer_id_clone = relayer_model.id.clone();
1202 tx_repo
1203 .expect_find_by_status_paginated()
1204 .withf(move |relayer_id, statuses, query, oldest_first| {
1205 *relayer_id == relayer_id_clone
1206 && statuses == [TransactionStatus::Confirmed]
1207 && query.page == 1
1208 && query.per_page == 1
1209 && !(*oldest_first)
1210 })
1211 .returning(move |_, _, _, _| {
1212 Ok(crate::repositories::PaginatedResult {
1213 items: vec![latest_confirmed_tx.clone()],
1214 total: 1,
1215 page: 1,
1216 per_page: 1,
1217 })
1218 })
1219 .once();
1220
1221 let relayer = EvmRelayer::new(
1222 relayer_model.clone(),
1223 signer,
1224 provider,
1225 create_test_evm_network(),
1226 Arc::new(relayer_repo),
1227 Arc::new(network_repo),
1228 Arc::new(tx_repo),
1229 Arc::new(counter),
1230 Arc::new(job_producer),
1231 )
1232 .unwrap();
1233
1234 let status = relayer.get_status().await.unwrap();
1235
1236 match status {
1237 RelayerStatus::Evm {
1238 balance,
1239 pending_transactions_count,
1240 last_confirmed_transaction_timestamp,
1241 system_disabled,
1242 paused,
1243 nonce,
1244 } => {
1245 assert_eq!(balance, "1000000000000000000");
1246 assert_eq!(pending_transactions_count, 0);
1247 assert_eq!(
1248 last_confirmed_transaction_timestamp,
1249 Some("2023-01-01T12:00:00Z".to_string())
1250 );
1251 assert_eq!(system_disabled, relayer_model.system_disabled);
1252 assert_eq!(paused, relayer_model.paused);
1253 assert_eq!(nonce, "10");
1254 }
1255 _ => panic!("Expected EVM RelayerStatus"),
1256 }
1257 }
1258
1259 #[tokio::test]
1260 async fn test_get_status_provider_nonce_error() {
1261 let (
1262 mut provider,
1263 relayer_repo,
1264 network_repo,
1265 mut tx_repo,
1266 job_producer,
1267 signer,
1268 mut counter,
1269 ) = setup_mocks();
1270 let relayer_model = create_test_relayer();
1271
1272 counter
1274 .expect_get()
1275 .returning(|| Box::pin(ready(Ok(None))))
1276 .once();
1277 provider
1278 .expect_get_balance()
1279 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))))
1280 .once();
1281
1282 tx_repo
1284 .expect_count_by_status()
1285 .returning(|_, _| Ok(0u64))
1286 .once();
1287
1288 tx_repo
1290 .expect_find_by_status_paginated()
1291 .withf(|_relayer_id, statuses, query, oldest_first| {
1292 statuses == [TransactionStatus::Confirmed]
1293 && query.page == 1
1294 && query.per_page == 1
1295 && !(*oldest_first)
1296 })
1297 .returning(|_, _, _, _| {
1298 Ok(crate::repositories::PaginatedResult {
1299 items: vec![],
1300 total: 0,
1301 page: 1,
1302 per_page: 1,
1303 })
1304 })
1305 .once();
1306
1307 let relayer = EvmRelayer::new(
1308 relayer_model.clone(),
1309 signer,
1310 provider,
1311 create_test_evm_network(),
1312 Arc::new(relayer_repo),
1313 Arc::new(network_repo),
1314 Arc::new(tx_repo),
1315 Arc::new(counter),
1316 Arc::new(job_producer),
1317 )
1318 .unwrap();
1319
1320 let status = relayer.get_status().await.unwrap();
1322 match status {
1323 RelayerStatus::Evm { nonce, .. } => {
1324 assert_eq!(nonce, "0");
1325 }
1326 _ => panic!("Expected Evm status"),
1327 }
1328 }
1329
1330 #[tokio::test]
1331 async fn test_get_status_repository_pending_error() {
1332 let (
1333 mut provider,
1334 relayer_repo,
1335 network_repo,
1336 mut tx_repo,
1337 job_producer,
1338 signer,
1339 mut counter,
1340 ) = setup_mocks();
1341 let relayer_model = create_test_relayer();
1342
1343 counter
1345 .expect_get()
1346 .returning(|| Box::pin(ready(Ok(Some(10u64)))))
1347 .once();
1348 provider
1349 .expect_get_balance()
1350 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))));
1351
1352 tx_repo
1353 .expect_count_by_status()
1354 .withf(|relayer_id, statuses| {
1355 relayer_id == "test-relayer-id"
1356 && statuses
1357 == [
1358 TransactionStatus::Pending,
1359 TransactionStatus::Sent,
1360 TransactionStatus::Submitted,
1361 ]
1362 })
1363 .returning(|_, _| Err(RepositoryError::Unknown("DB down".to_string())))
1364 .once();
1365
1366 let relayer = EvmRelayer::new(
1367 relayer_model.clone(),
1368 signer,
1369 provider,
1370 create_test_evm_network(),
1371 Arc::new(relayer_repo),
1372 Arc::new(network_repo),
1373 Arc::new(tx_repo),
1374 Arc::new(counter),
1375 Arc::new(job_producer),
1376 )
1377 .unwrap();
1378
1379 let result = relayer.get_status().await;
1380 assert!(result.is_err());
1381 match result.err().unwrap() {
1382 RelayerError::NetworkConfiguration(msg) => assert!(msg.contains("DB down")),
1384 _ => panic!("Expected NetworkConfiguration error for repo failure"),
1385 }
1386 }
1387
1388 #[tokio::test]
1389 async fn test_get_status_no_confirmed_transactions() {
1390 let (
1391 mut provider,
1392 relayer_repo,
1393 network_repo,
1394 mut tx_repo,
1395 job_producer,
1396 signer,
1397 mut counter,
1398 ) = setup_mocks();
1399 let relayer_model = create_test_relayer();
1400
1401 counter
1403 .expect_get()
1404 .returning(|| Box::pin(ready(Ok(Some(10u64)))));
1405 provider
1406 .expect_get_balance()
1407 .returning(|_| Box::pin(ready(Ok(U256::from(1000000000000000000u64)))));
1408 provider
1409 .expect_health_check()
1410 .returning(|| Box::pin(ready(Ok(true))));
1411
1412 tx_repo
1414 .expect_count_by_status()
1415 .withf(|relayer_id, statuses| {
1416 relayer_id == "test-relayer-id"
1417 && statuses
1418 == [
1419 TransactionStatus::Pending,
1420 TransactionStatus::Sent,
1421 TransactionStatus::Submitted,
1422 ]
1423 })
1424 .returning(|_, _| Ok(0u64))
1425 .once();
1426
1427 let relayer_id_clone = relayer_model.id.clone();
1429 tx_repo
1430 .expect_find_by_status_paginated()
1431 .withf(move |relayer_id, statuses, query, oldest_first| {
1432 *relayer_id == relayer_id_clone
1433 && statuses == [TransactionStatus::Confirmed]
1434 && query.page == 1
1435 && query.per_page == 1
1436 && !(*oldest_first)
1437 })
1438 .returning(|_, _, _, _| {
1439 Ok(crate::repositories::PaginatedResult {
1440 items: vec![],
1441 total: 0,
1442 page: 1,
1443 per_page: 1,
1444 })
1445 })
1446 .once();
1447
1448 let relayer = EvmRelayer::new(
1449 relayer_model.clone(),
1450 signer,
1451 provider,
1452 create_test_evm_network(),
1453 Arc::new(relayer_repo),
1454 Arc::new(network_repo),
1455 Arc::new(tx_repo),
1456 Arc::new(counter),
1457 Arc::new(job_producer),
1458 )
1459 .unwrap();
1460
1461 let status = relayer.get_status().await.unwrap();
1462 match status {
1463 RelayerStatus::Evm {
1464 balance,
1465 pending_transactions_count,
1466 last_confirmed_transaction_timestamp,
1467 system_disabled,
1468 paused,
1469 nonce,
1470 } => {
1471 assert_eq!(balance, "1000000000000000000");
1472 assert_eq!(pending_transactions_count, 0);
1473 assert_eq!(last_confirmed_transaction_timestamp, None);
1474 assert_eq!(system_disabled, relayer_model.system_disabled);
1475 assert_eq!(paused, relayer_model.paused);
1476 assert_eq!(nonce, "10");
1477 }
1478 _ => panic!("Expected EVM RelayerStatus"),
1479 }
1480 }
1481
1482 #[tokio::test]
1483 async fn test_cancel_transaction_via_job_success() {
1484 let (provider, relayer_repo, network_repo, tx_repo, mut job_producer, signer, counter) =
1485 setup_mocks();
1486 let relayer_model = create_test_relayer();
1487
1488 let test_transaction = TransactionRepoModel {
1489 id: "test-tx-id".to_string(),
1490 relayer_id: relayer_model.id.clone(),
1491 status: TransactionStatus::Pending,
1492 ..TransactionRepoModel::default()
1493 };
1494
1495 job_producer
1496 .expect_produce_submit_transaction_job()
1497 .withf(|job, delay| {
1498 matches!(job.command, crate::jobs::TransactionCommand::Cancel { ref reason }
1499 if job.transaction_id == "test-tx-id"
1500 && job.relayer_id == "test-relayer-id"
1501 && reason == "Cancelled via delete_pending_transactions")
1502 && delay.is_none()
1503 })
1504 .returning(|_, _| Box::pin(ready(Ok(()))))
1505 .once();
1506
1507 let relayer = EvmRelayer::new(
1508 relayer_model,
1509 signer,
1510 provider,
1511 create_test_evm_network(),
1512 Arc::new(relayer_repo),
1513 Arc::new(network_repo),
1514 Arc::new(tx_repo),
1515 Arc::new(counter),
1516 Arc::new(job_producer),
1517 )
1518 .unwrap();
1519
1520 let result = relayer.cancel_transaction_via_job(test_transaction).await;
1521 assert!(result.is_ok());
1522 }
1523
1524 #[tokio::test]
1525 async fn test_cancel_transaction_via_job_failure() {
1526 let (provider, relayer_repo, network_repo, tx_repo, mut job_producer, signer, counter) =
1527 setup_mocks();
1528 let relayer_model = create_test_relayer();
1529
1530 let test_transaction = TransactionRepoModel {
1531 id: "test-tx-id".to_string(),
1532 relayer_id: relayer_model.id.clone(),
1533 status: TransactionStatus::Pending,
1534 ..TransactionRepoModel::default()
1535 };
1536
1537 job_producer
1538 .expect_produce_submit_transaction_job()
1539 .returning(|_, _| {
1540 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1541 "Queue is full".to_string(),
1542 ))))
1543 })
1544 .once();
1545
1546 let relayer = EvmRelayer::new(
1547 relayer_model,
1548 signer,
1549 provider,
1550 create_test_evm_network(),
1551 Arc::new(relayer_repo),
1552 Arc::new(network_repo),
1553 Arc::new(tx_repo),
1554 Arc::new(counter),
1555 Arc::new(job_producer),
1556 )
1557 .unwrap();
1558
1559 let result = relayer.cancel_transaction_via_job(test_transaction).await;
1560 assert!(result.is_err());
1561 match result.err().unwrap() {
1562 RelayerError::QueueError(_) => (),
1563 _ => panic!("Expected QueueError"),
1564 }
1565 }
1566
1567 #[tokio::test]
1568 async fn test_delete_pending_transactions_no_pending() {
1569 let (provider, relayer_repo, network_repo, mut tx_repo, job_producer, signer, counter) =
1570 setup_mocks();
1571 let relayer_model = create_test_relayer();
1572
1573 tx_repo
1574 .expect_find_by_status()
1575 .withf(|relayer_id, statuses| {
1576 relayer_id == "test-relayer-id"
1577 && statuses
1578 == [
1579 TransactionStatus::Pending,
1580 TransactionStatus::Sent,
1581 TransactionStatus::Submitted,
1582 ]
1583 })
1584 .returning(|_, _| Ok(vec![]))
1585 .once();
1586
1587 let relayer = EvmRelayer::new(
1588 relayer_model,
1589 signer,
1590 provider,
1591 create_test_evm_network(),
1592 Arc::new(relayer_repo),
1593 Arc::new(network_repo),
1594 Arc::new(tx_repo),
1595 Arc::new(counter),
1596 Arc::new(job_producer),
1597 )
1598 .unwrap();
1599
1600 let result = relayer.delete_pending_transactions().await.unwrap();
1601 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 0);
1602 assert_eq!(result.failed_to_queue_transaction_ids.len(), 0);
1603 assert_eq!(result.total_processed, 0);
1604 }
1605
1606 #[tokio::test]
1607 async fn test_delete_pending_transactions_all_successful() {
1608 let (provider, relayer_repo, network_repo, mut tx_repo, mut job_producer, signer, counter) =
1609 setup_mocks();
1610 let relayer_model = create_test_relayer();
1611
1612 let pending_transactions = vec![
1613 TransactionRepoModel {
1614 id: "tx1".to_string(),
1615 relayer_id: relayer_model.id.clone(),
1616 status: TransactionStatus::Pending,
1617 ..TransactionRepoModel::default()
1618 },
1619 TransactionRepoModel {
1620 id: "tx2".to_string(),
1621 relayer_id: relayer_model.id.clone(),
1622 status: TransactionStatus::Sent,
1623 ..TransactionRepoModel::default()
1624 },
1625 TransactionRepoModel {
1626 id: "tx3".to_string(),
1627 relayer_id: relayer_model.id.clone(),
1628 status: TransactionStatus::Submitted,
1629 ..TransactionRepoModel::default()
1630 },
1631 ];
1632
1633 tx_repo
1634 .expect_find_by_status()
1635 .withf(|relayer_id, statuses| {
1636 relayer_id == "test-relayer-id"
1637 && statuses
1638 == [
1639 TransactionStatus::Pending,
1640 TransactionStatus::Sent,
1641 TransactionStatus::Submitted,
1642 ]
1643 })
1644 .returning(move |_, _| Ok(pending_transactions.clone()))
1645 .once();
1646
1647 job_producer
1648 .expect_produce_submit_transaction_job()
1649 .returning(|_, _| Box::pin(ready(Ok(()))))
1650 .times(3);
1651
1652 let relayer = EvmRelayer::new(
1653 relayer_model,
1654 signer,
1655 provider,
1656 create_test_evm_network(),
1657 Arc::new(relayer_repo),
1658 Arc::new(network_repo),
1659 Arc::new(tx_repo),
1660 Arc::new(counter),
1661 Arc::new(job_producer),
1662 )
1663 .unwrap();
1664
1665 let result = relayer.delete_pending_transactions().await.unwrap();
1666 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 3);
1667 assert_eq!(result.failed_to_queue_transaction_ids.len(), 0);
1668 assert_eq!(result.total_processed, 3);
1669
1670 let expected_ids = vec!["tx1", "tx2", "tx3"];
1671 for id in expected_ids {
1672 assert!(result
1673 .queued_for_cancellation_transaction_ids
1674 .contains(&id.to_string()));
1675 }
1676 }
1677
1678 #[tokio::test]
1679 async fn test_delete_pending_transactions_partial_failures() {
1680 let (provider, relayer_repo, network_repo, mut tx_repo, mut job_producer, signer, counter) =
1681 setup_mocks();
1682 let relayer_model = create_test_relayer();
1683
1684 let pending_transactions = vec![
1685 TransactionRepoModel {
1686 id: "tx1".to_string(),
1687 relayer_id: relayer_model.id.clone(),
1688 status: TransactionStatus::Pending,
1689 ..TransactionRepoModel::default()
1690 },
1691 TransactionRepoModel {
1692 id: "tx2".to_string(),
1693 relayer_id: relayer_model.id.clone(),
1694 status: TransactionStatus::Sent,
1695 ..TransactionRepoModel::default()
1696 },
1697 TransactionRepoModel {
1698 id: "tx3".to_string(),
1699 relayer_id: relayer_model.id.clone(),
1700 status: TransactionStatus::Submitted,
1701 ..TransactionRepoModel::default()
1702 },
1703 ];
1704
1705 tx_repo
1706 .expect_find_by_status()
1707 .withf(|relayer_id, statuses| {
1708 relayer_id == "test-relayer-id"
1709 && statuses
1710 == [
1711 TransactionStatus::Pending,
1712 TransactionStatus::Sent,
1713 TransactionStatus::Submitted,
1714 ]
1715 })
1716 .returning(move |_, _| Ok(pending_transactions.clone()))
1717 .once();
1718
1719 job_producer
1721 .expect_produce_submit_transaction_job()
1722 .returning(|_, _| Box::pin(ready(Ok(()))))
1723 .times(1);
1724 job_producer
1725 .expect_produce_submit_transaction_job()
1726 .returning(|_, _| {
1727 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1728 "Queue is full".to_string(),
1729 ))))
1730 })
1731 .times(1);
1732 job_producer
1733 .expect_produce_submit_transaction_job()
1734 .returning(|_, _| Box::pin(ready(Ok(()))))
1735 .times(1);
1736
1737 let relayer = EvmRelayer::new(
1738 relayer_model,
1739 signer,
1740 provider,
1741 create_test_evm_network(),
1742 Arc::new(relayer_repo),
1743 Arc::new(network_repo),
1744 Arc::new(tx_repo),
1745 Arc::new(counter),
1746 Arc::new(job_producer),
1747 )
1748 .unwrap();
1749
1750 let result = relayer.delete_pending_transactions().await.unwrap();
1751 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 2);
1752 assert_eq!(result.failed_to_queue_transaction_ids.len(), 1);
1753 assert_eq!(result.total_processed, 3);
1754 }
1755
1756 #[tokio::test]
1757 async fn test_delete_pending_transactions_repository_error() {
1758 let (provider, relayer_repo, network_repo, mut tx_repo, job_producer, signer, counter) =
1759 setup_mocks();
1760 let relayer_model = create_test_relayer();
1761
1762 tx_repo
1763 .expect_find_by_status()
1764 .withf(|relayer_id, statuses| {
1765 relayer_id == "test-relayer-id"
1766 && statuses
1767 == [
1768 TransactionStatus::Pending,
1769 TransactionStatus::Sent,
1770 TransactionStatus::Submitted,
1771 ]
1772 })
1773 .returning(|_, _| {
1774 Err(RepositoryError::Unknown(
1775 "Database connection failed".to_string(),
1776 ))
1777 })
1778 .once();
1779
1780 let relayer = EvmRelayer::new(
1781 relayer_model,
1782 signer,
1783 provider,
1784 create_test_evm_network(),
1785 Arc::new(relayer_repo),
1786 Arc::new(network_repo),
1787 Arc::new(tx_repo),
1788 Arc::new(counter),
1789 Arc::new(job_producer),
1790 )
1791 .unwrap();
1792
1793 let result = relayer.delete_pending_transactions().await;
1794 assert!(result.is_err());
1795 match result.err().unwrap() {
1796 RelayerError::NetworkConfiguration(msg) => {
1797 assert!(msg.contains("Database connection failed"))
1798 }
1799 _ => panic!("Expected NetworkConfiguration error for repository failure"),
1800 }
1801 }
1802
1803 #[tokio::test]
1804 async fn test_delete_pending_transactions_all_failures() {
1805 let (provider, relayer_repo, network_repo, mut tx_repo, mut job_producer, signer, counter) =
1806 setup_mocks();
1807 let relayer_model = create_test_relayer();
1808
1809 let pending_transactions = vec![
1810 TransactionRepoModel {
1811 id: "tx1".to_string(),
1812 relayer_id: relayer_model.id.clone(),
1813 status: TransactionStatus::Pending,
1814 ..TransactionRepoModel::default()
1815 },
1816 TransactionRepoModel {
1817 id: "tx2".to_string(),
1818 relayer_id: relayer_model.id.clone(),
1819 status: TransactionStatus::Sent,
1820 ..TransactionRepoModel::default()
1821 },
1822 ];
1823
1824 tx_repo
1825 .expect_find_by_status()
1826 .withf(|relayer_id, statuses| {
1827 relayer_id == "test-relayer-id"
1828 && statuses
1829 == [
1830 TransactionStatus::Pending,
1831 TransactionStatus::Sent,
1832 TransactionStatus::Submitted,
1833 ]
1834 })
1835 .returning(move |_, _| Ok(pending_transactions.clone()))
1836 .once();
1837
1838 job_producer
1839 .expect_produce_submit_transaction_job()
1840 .returning(|_, _| {
1841 Box::pin(ready(Err(crate::jobs::JobProducerError::QueueError(
1842 "Queue is full".to_string(),
1843 ))))
1844 })
1845 .times(2);
1846
1847 let relayer = EvmRelayer::new(
1848 relayer_model,
1849 signer,
1850 provider,
1851 create_test_evm_network(),
1852 Arc::new(relayer_repo),
1853 Arc::new(network_repo),
1854 Arc::new(tx_repo),
1855 Arc::new(counter),
1856 Arc::new(job_producer),
1857 )
1858 .unwrap();
1859
1860 let result = relayer.delete_pending_transactions().await.unwrap();
1861 assert_eq!(result.queued_for_cancellation_transaction_ids.len(), 0);
1862 assert_eq!(result.failed_to_queue_transaction_ids.len(), 2);
1863 assert_eq!(result.total_processed, 2);
1864
1865 let expected_failed_ids = vec!["tx1", "tx2"];
1866 for id in expected_failed_ids {
1867 assert!(result
1868 .failed_to_queue_transaction_ids
1869 .contains(&id.to_string()));
1870 }
1871 }
1872
1873 #[tokio::test]
1874 async fn test_rpc_eth_get_balance() {
1875 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
1876 setup_mocks();
1877 let relayer_model = create_test_relayer();
1878
1879 provider
1880 .expect_raw_request_dyn()
1881 .withf(|method, params| {
1882 method == "eth_getBalance"
1883 && params.as_str()
1884 == Some(r#"["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]"#)
1885 })
1886 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("0xde0b6b3a7640000")) }));
1887
1888 let relayer = EvmRelayer::new(
1889 relayer_model,
1890 signer,
1891 provider,
1892 create_test_evm_network(),
1893 Arc::new(relayer_repo),
1894 Arc::new(network_repo),
1895 Arc::new(tx_repo),
1896 Arc::new(counter),
1897 Arc::new(job_producer),
1898 )
1899 .unwrap();
1900
1901 let request = JsonRpcRequest {
1902 jsonrpc: "2.0".to_string(),
1903 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
1904 method: "eth_getBalance".to_string(),
1905 params: serde_json::Value::String(
1906 r#"["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]"#.to_string(),
1907 ),
1908 }),
1909 id: Some(JsonRpcId::Number(1)),
1910 };
1911
1912 let response = relayer.rpc(request).await.unwrap();
1913 assert!(response.error.is_none());
1914 assert!(response.result.is_some());
1915
1916 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
1917 assert_eq!(result, serde_json::json!("0xde0b6b3a7640000")); }
1919 }
1920
1921 #[tokio::test]
1922 async fn test_rpc_eth_block_number() {
1923 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
1924 setup_mocks();
1925 let relayer_model = create_test_relayer();
1926
1927 provider
1928 .expect_raw_request_dyn()
1929 .withf(|method, params| method == "eth_blockNumber" && params.as_str() == Some("[]"))
1930 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("0x3039")) }));
1931
1932 let relayer = EvmRelayer::new(
1933 relayer_model,
1934 signer,
1935 provider,
1936 create_test_evm_network(),
1937 Arc::new(relayer_repo),
1938 Arc::new(network_repo),
1939 Arc::new(tx_repo),
1940 Arc::new(counter),
1941 Arc::new(job_producer),
1942 )
1943 .unwrap();
1944
1945 let request = JsonRpcRequest {
1946 jsonrpc: "2.0".to_string(),
1947 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
1948 method: "eth_blockNumber".to_string(),
1949 params: serde_json::Value::String("[]".to_string()),
1950 }),
1951 id: Some(JsonRpcId::Number(1)),
1952 };
1953
1954 let response = relayer.rpc(request).await.unwrap();
1955 assert!(response.error.is_none());
1956 assert!(response.result.is_some());
1957
1958 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
1959 assert_eq!(result, serde_json::json!("0x3039")); }
1961 }
1962
1963 #[tokio::test]
1964 async fn test_rpc_unsupported_method() {
1965 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
1966 setup_mocks();
1967 let relayer_model = create_test_relayer();
1968
1969 provider
1970 .expect_raw_request_dyn()
1971 .withf(|method, _| method == "eth_unsupportedMethod")
1972 .returning(|_, _| {
1973 Box::pin(async {
1974 Err(ProviderError::Other(
1975 "Unsupported method: eth_unsupportedMethod".to_string(),
1976 ))
1977 })
1978 });
1979
1980 let relayer = EvmRelayer::new(
1981 relayer_model,
1982 signer,
1983 provider,
1984 create_test_evm_network(),
1985 Arc::new(relayer_repo),
1986 Arc::new(network_repo),
1987 Arc::new(tx_repo),
1988 Arc::new(counter),
1989 Arc::new(job_producer),
1990 )
1991 .unwrap();
1992
1993 let request = JsonRpcRequest {
1994 jsonrpc: "2.0".to_string(),
1995 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
1996 method: "eth_unsupportedMethod".to_string(),
1997 params: serde_json::Value::String("[]".to_string()),
1998 }),
1999 id: Some(JsonRpcId::Number(1)),
2000 };
2001
2002 let response = relayer.rpc(request).await.unwrap();
2003 assert!(response.result.is_none());
2004 assert!(response.error.is_some());
2005
2006 let error = response.error.unwrap();
2007 assert_eq!(error.code, -32603); }
2009
2010 #[tokio::test]
2011 async fn test_rpc_invalid_params() {
2012 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2013 setup_mocks();
2014 let relayer_model = create_test_relayer();
2015
2016 provider
2017 .expect_raw_request_dyn()
2018 .withf(|method, params| method == "eth_getBalance" && params.as_str() == Some("[]"))
2019 .returning(|_, _| {
2020 Box::pin(async {
2021 Err(ProviderError::Other(
2022 "Missing address parameter".to_string(),
2023 ))
2024 })
2025 });
2026
2027 let relayer = EvmRelayer::new(
2028 relayer_model,
2029 signer,
2030 provider,
2031 create_test_evm_network(),
2032 Arc::new(relayer_repo),
2033 Arc::new(network_repo),
2034 Arc::new(tx_repo),
2035 Arc::new(counter),
2036 Arc::new(job_producer),
2037 )
2038 .unwrap();
2039
2040 let request = JsonRpcRequest {
2041 jsonrpc: "2.0".to_string(),
2042 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2043 method: "eth_getBalance".to_string(),
2044 params: serde_json::Value::String("[]".to_string()), }),
2046 id: Some(JsonRpcId::Number(1)),
2047 };
2048
2049 let response = relayer.rpc(request).await.unwrap();
2050 assert!(response.result.is_none());
2051 assert!(response.error.is_some());
2052
2053 let error = response.error.unwrap();
2054 assert_eq!(error.code, -32603); }
2056
2057 #[tokio::test]
2058 async fn test_rpc_non_evm_request() {
2059 let (provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2060 setup_mocks();
2061 let relayer_model = create_test_relayer();
2062
2063 let relayer = EvmRelayer::new(
2064 relayer_model,
2065 signer,
2066 provider,
2067 create_test_evm_network(),
2068 Arc::new(relayer_repo),
2069 Arc::new(network_repo),
2070 Arc::new(tx_repo),
2071 Arc::new(counter),
2072 Arc::new(job_producer),
2073 )
2074 .unwrap();
2075
2076 let request = JsonRpcRequest {
2077 jsonrpc: "2.0".to_string(),
2078 params: NetworkRpcRequest::Solana(crate::models::SolanaRpcRequest::GetSupportedTokens(
2079 crate::models::SolanaGetSupportedTokensRequestParams {},
2080 )),
2081 id: Some(JsonRpcId::Number(1)),
2082 };
2083
2084 let response = relayer.rpc(request).await.unwrap();
2085 assert!(response.result.is_none());
2086 assert!(response.error.is_some());
2087
2088 let error = response.error.unwrap();
2089 assert_eq!(error.code, -32602); }
2091
2092 #[tokio::test]
2093 async fn test_rpc_raw_request_with_array_params() {
2094 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2095 setup_mocks();
2096 let relayer_model = create_test_relayer();
2097
2098 provider
2099 .expect_raw_request_dyn()
2100 .withf(|method, params| {
2101 method == "eth_getTransactionByHash"
2102 && params.as_array().is_some_and(|arr| {
2103 arr.len() == 1 && arr[0].as_str() == Some("0x1234567890abcdef")
2104 })
2105 })
2106 .returning(|_, _| {
2107 Box::pin(async {
2108 Ok(serde_json::json!({
2109 "hash": "0x1234567890abcdef",
2110 "blockNumber": "0x1",
2111 "gasUsed": "0x5208"
2112 }))
2113 })
2114 });
2115
2116 let relayer = EvmRelayer::new(
2117 relayer_model,
2118 signer,
2119 provider,
2120 create_test_evm_network(),
2121 Arc::new(relayer_repo),
2122 Arc::new(network_repo),
2123 Arc::new(tx_repo),
2124 Arc::new(counter),
2125 Arc::new(job_producer),
2126 )
2127 .unwrap();
2128
2129 let request = JsonRpcRequest {
2130 jsonrpc: "2.0".to_string(),
2131 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2132 method: "eth_getTransactionByHash".to_string(),
2133 params: serde_json::json!(["0x1234567890abcdef"]),
2134 }),
2135 id: Some(JsonRpcId::Number(42)),
2136 };
2137
2138 let response = relayer.rpc(request).await.unwrap();
2139 assert!(response.error.is_none());
2140 assert!(response.result.is_some());
2141 assert_eq!(response.id, Some(JsonRpcId::Number(42)));
2142
2143 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2144 assert!(result.get("hash").is_some());
2145 assert!(result.get("blockNumber").is_some());
2146 }
2147 }
2148
2149 #[tokio::test]
2150 async fn test_rpc_raw_request_with_object_params() {
2151 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2152 setup_mocks();
2153 let relayer_model = create_test_relayer();
2154
2155 provider
2156 .expect_raw_request_dyn()
2157 .withf(|method, params| {
2158 method == "eth_call"
2159 && params
2160 .as_object()
2161 .is_some_and(|obj| obj.contains_key("to") && obj.contains_key("data"))
2162 })
2163 .returning(|_, _| {
2164 Box::pin(async {
2165 Ok(serde_json::json!(
2166 "0x0000000000000000000000000000000000000000000000000000000000000001"
2167 ))
2168 })
2169 });
2170
2171 let relayer = EvmRelayer::new(
2172 relayer_model,
2173 signer,
2174 provider,
2175 create_test_evm_network(),
2176 Arc::new(relayer_repo),
2177 Arc::new(network_repo),
2178 Arc::new(tx_repo),
2179 Arc::new(counter),
2180 Arc::new(job_producer),
2181 )
2182 .unwrap();
2183
2184 let request = JsonRpcRequest {
2185 jsonrpc: "2.0".to_string(),
2186 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2187 method: "eth_call".to_string(),
2188 params: serde_json::json!({
2189 "to": "0x742d35Cc6634C0532925a3b844Bc454e4438f44e",
2190 "data": "0x70a08231000000000000000000000000742d35cc6634c0532925a3b844bc454e4438f44e"
2191 }),
2192 }),
2193 id: Some(JsonRpcId::Number(123)),
2194 };
2195
2196 let response = relayer.rpc(request).await.unwrap();
2197 assert!(response.error.is_none());
2198 assert!(response.result.is_some());
2199 assert_eq!(response.id, Some(JsonRpcId::Number(123)));
2200 }
2201
2202 #[tokio::test]
2203 async fn test_rpc_generic_request_with_empty_params() {
2204 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2205 setup_mocks();
2206 let relayer_model = create_test_relayer();
2207
2208 provider
2209 .expect_raw_request_dyn()
2210 .withf(|method, params| method == "net_version" && params.as_str() == Some("[]"))
2211 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("1")) }));
2212
2213 let relayer = EvmRelayer::new(
2214 relayer_model,
2215 signer,
2216 provider,
2217 create_test_evm_network(),
2218 Arc::new(relayer_repo),
2219 Arc::new(network_repo),
2220 Arc::new(tx_repo),
2221 Arc::new(counter),
2222 Arc::new(job_producer),
2223 )
2224 .unwrap();
2225
2226 let request = JsonRpcRequest {
2227 jsonrpc: "2.0".to_string(),
2228 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2229 method: "net_version".to_string(),
2230 params: serde_json::Value::String("[]".to_string()),
2231 }),
2232 id: Some(JsonRpcId::Number(999)),
2233 };
2234
2235 let response = relayer.rpc(request).await.unwrap();
2236 assert!(response.error.is_none());
2237 assert!(response.result.is_some());
2238 assert_eq!(response.id, Some(JsonRpcId::Number(999)));
2239 }
2240
2241 #[tokio::test]
2242 async fn test_rpc_provider_invalid_address_error() {
2243 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2244 setup_mocks();
2245 let relayer_model = create_test_relayer();
2246
2247 provider.expect_raw_request_dyn().returning(|_, _| {
2248 Box::pin(async {
2249 Err(ProviderError::InvalidAddress(
2250 "Invalid address format".to_string(),
2251 ))
2252 })
2253 });
2254
2255 let relayer = EvmRelayer::new(
2256 relayer_model,
2257 signer,
2258 provider,
2259 create_test_evm_network(),
2260 Arc::new(relayer_repo),
2261 Arc::new(network_repo),
2262 Arc::new(tx_repo),
2263 Arc::new(counter),
2264 Arc::new(job_producer),
2265 )
2266 .unwrap();
2267
2268 let request = JsonRpcRequest {
2269 jsonrpc: "2.0".to_string(),
2270 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2271 method: "eth_getBalance".to_string(),
2272 params: serde_json::Value::String(r#"["invalid_address", "latest"]"#.to_string()),
2273 }),
2274 id: Some(JsonRpcId::Number(1)),
2275 };
2276
2277 let response = relayer.rpc(request).await.unwrap();
2278 assert!(response.result.is_none());
2279 assert!(response.error.is_some());
2280
2281 let error = response.error.unwrap();
2282 assert_eq!(error.code, -32602); }
2284
2285 #[tokio::test]
2286 async fn test_rpc_provider_network_configuration_error() {
2287 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2288 setup_mocks();
2289 let relayer_model = create_test_relayer();
2290
2291 provider.expect_raw_request_dyn().returning(|_, _| {
2292 Box::pin(async {
2293 Err(ProviderError::NetworkConfiguration(
2294 "Network not reachable".to_string(),
2295 ))
2296 })
2297 });
2298
2299 let relayer = EvmRelayer::new(
2300 relayer_model,
2301 signer,
2302 provider,
2303 create_test_evm_network(),
2304 Arc::new(relayer_repo),
2305 Arc::new(network_repo),
2306 Arc::new(tx_repo),
2307 Arc::new(counter),
2308 Arc::new(job_producer),
2309 )
2310 .unwrap();
2311
2312 let request = JsonRpcRequest {
2313 jsonrpc: "2.0".to_string(),
2314 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2315 method: "eth_chainId".to_string(),
2316 params: serde_json::Value::String("[]".to_string()),
2317 }),
2318 id: Some(JsonRpcId::Number(2)),
2319 };
2320
2321 let response = relayer.rpc(request).await.unwrap();
2322 assert!(response.result.is_none());
2323 assert!(response.error.is_some());
2324
2325 let error = response.error.unwrap();
2326 assert_eq!(error.code, -33004); }
2328
2329 #[tokio::test]
2330 async fn test_rpc_provider_timeout_error() {
2331 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2332 setup_mocks();
2333 let relayer_model = create_test_relayer();
2334
2335 provider
2336 .expect_raw_request_dyn()
2337 .returning(|_, _| Box::pin(async { Err(ProviderError::Timeout) }));
2338
2339 let relayer = EvmRelayer::new(
2340 relayer_model,
2341 signer,
2342 provider,
2343 create_test_evm_network(),
2344 Arc::new(relayer_repo),
2345 Arc::new(network_repo),
2346 Arc::new(tx_repo),
2347 Arc::new(counter),
2348 Arc::new(job_producer),
2349 )
2350 .unwrap();
2351
2352 let request = JsonRpcRequest {
2353 jsonrpc: "2.0".to_string(),
2354 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2355 method: "eth_blockNumber".to_string(),
2356 params: serde_json::json!([]),
2357 }),
2358 id: Some(JsonRpcId::Number(3)),
2359 };
2360
2361 let response = relayer.rpc(request).await.unwrap();
2362 assert!(response.result.is_none());
2363 assert!(response.error.is_some());
2364
2365 let error = response.error.unwrap();
2366 assert_eq!(error.code, -33000); }
2368
2369 #[tokio::test]
2370 async fn test_rpc_provider_rate_limited_error() {
2371 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2372 setup_mocks();
2373 let relayer_model = create_test_relayer();
2374
2375 provider
2376 .expect_raw_request_dyn()
2377 .returning(|_, _| Box::pin(async { Err(ProviderError::RateLimited) }));
2378
2379 let relayer = EvmRelayer::new(
2380 relayer_model,
2381 signer,
2382 provider,
2383 create_test_evm_network(),
2384 Arc::new(relayer_repo),
2385 Arc::new(network_repo),
2386 Arc::new(tx_repo),
2387 Arc::new(counter),
2388 Arc::new(job_producer),
2389 )
2390 .unwrap();
2391
2392 let request = JsonRpcRequest {
2393 jsonrpc: "2.0".to_string(),
2394 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2395 method: "eth_getBalance".to_string(),
2396 params: serde_json::Value::String(
2397 r#"["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]"#.to_string(),
2398 ),
2399 }),
2400 id: Some(JsonRpcId::Number(4)),
2401 };
2402
2403 let response = relayer.rpc(request).await.unwrap();
2404 assert!(response.result.is_none());
2405 assert!(response.error.is_some());
2406
2407 let error = response.error.unwrap();
2408 assert_eq!(error.code, -33001); }
2410
2411 #[tokio::test]
2412 async fn test_rpc_provider_bad_gateway_error() {
2413 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2414 setup_mocks();
2415 let relayer_model = create_test_relayer();
2416
2417 provider
2418 .expect_raw_request_dyn()
2419 .returning(|_, _| Box::pin(async { Err(ProviderError::BadGateway) }));
2420
2421 let relayer = EvmRelayer::new(
2422 relayer_model,
2423 signer,
2424 provider,
2425 create_test_evm_network(),
2426 Arc::new(relayer_repo),
2427 Arc::new(network_repo),
2428 Arc::new(tx_repo),
2429 Arc::new(counter),
2430 Arc::new(job_producer),
2431 )
2432 .unwrap();
2433
2434 let request = JsonRpcRequest {
2435 jsonrpc: "2.0".to_string(),
2436 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2437 method: "eth_gasPrice".to_string(),
2438 params: serde_json::json!([]),
2439 }),
2440 id: Some(JsonRpcId::Number(5)),
2441 };
2442
2443 let response = relayer.rpc(request).await.unwrap();
2444 assert!(response.result.is_none());
2445 assert!(response.error.is_some());
2446
2447 let error = response.error.unwrap();
2448 assert_eq!(error.code, -33002); }
2450
2451 #[tokio::test]
2452 async fn test_rpc_provider_request_error() {
2453 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2454 setup_mocks();
2455 let relayer_model = create_test_relayer();
2456
2457 provider.expect_raw_request_dyn().returning(|_, _| {
2458 Box::pin(async {
2459 Err(ProviderError::RequestError {
2460 error: "Bad request".to_string(),
2461 status_code: 400,
2462 })
2463 })
2464 });
2465
2466 let relayer = EvmRelayer::new(
2467 relayer_model,
2468 signer,
2469 provider,
2470 create_test_evm_network(),
2471 Arc::new(relayer_repo),
2472 Arc::new(network_repo),
2473 Arc::new(tx_repo),
2474 Arc::new(counter),
2475 Arc::new(job_producer),
2476 )
2477 .unwrap();
2478
2479 let request = JsonRpcRequest {
2480 jsonrpc: "2.0".to_string(),
2481 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2482 method: "invalid_method".to_string(),
2483 params: serde_json::Value::String("{}".to_string()),
2484 }),
2485 id: Some(JsonRpcId::Number(6)),
2486 };
2487
2488 let response = relayer.rpc(request).await.unwrap();
2489 assert!(response.result.is_none());
2490 assert!(response.error.is_some());
2491
2492 let error = response.error.unwrap();
2493 assert_eq!(error.code, -33003); }
2495
2496 #[tokio::test]
2497 async fn test_rpc_provider_other_error() {
2498 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2499 setup_mocks();
2500 let relayer_model = create_test_relayer();
2501
2502 provider.expect_raw_request_dyn().returning(|_, _| {
2503 Box::pin(async {
2504 Err(ProviderError::Other(
2505 "Unexpected error occurred".to_string(),
2506 ))
2507 })
2508 });
2509
2510 let relayer = EvmRelayer::new(
2511 relayer_model,
2512 signer,
2513 provider,
2514 create_test_evm_network(),
2515 Arc::new(relayer_repo),
2516 Arc::new(network_repo),
2517 Arc::new(tx_repo),
2518 Arc::new(counter),
2519 Arc::new(job_producer),
2520 )
2521 .unwrap();
2522
2523 let request = JsonRpcRequest {
2524 jsonrpc: "2.0".to_string(),
2525 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2526 method: "eth_getBalance".to_string(),
2527 params: serde_json::json!(["0x742d35Cc6634C0532925a3b844Bc454e4438f44e", "latest"]),
2528 }),
2529 id: Some(JsonRpcId::Number(7)),
2530 };
2531
2532 let response = relayer.rpc(request).await.unwrap();
2533 assert!(response.result.is_none());
2534 assert!(response.error.is_some());
2535
2536 let error = response.error.unwrap();
2537 assert_eq!(error.code, -32603); }
2539
2540 #[tokio::test]
2541 async fn test_rpc_response_preserves_request_id() {
2542 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2543 setup_mocks();
2544 let relayer_model = create_test_relayer();
2545
2546 provider
2547 .expect_raw_request_dyn()
2548 .returning(|_, _| Box::pin(async { Ok(serde_json::json!("0x1")) }));
2549
2550 let relayer = EvmRelayer::new(
2551 relayer_model,
2552 signer,
2553 provider,
2554 create_test_evm_network(),
2555 Arc::new(relayer_repo),
2556 Arc::new(network_repo),
2557 Arc::new(tx_repo),
2558 Arc::new(counter),
2559 Arc::new(job_producer),
2560 )
2561 .unwrap();
2562
2563 let request_id = u64::MAX;
2564 let request = JsonRpcRequest {
2565 jsonrpc: "2.0".to_string(),
2566 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2567 method: "eth_chainId".to_string(),
2568 params: serde_json::Value::String("[]".to_string()),
2569 }),
2570 id: Some(JsonRpcId::Number(request_id as i64)),
2571 };
2572
2573 let response = relayer.rpc(request).await.unwrap();
2574 assert_eq!(response.id, Some(JsonRpcId::Number(request_id as i64)));
2575 assert_eq!(response.jsonrpc, "2.0");
2576 }
2577
2578 #[tokio::test]
2579 async fn test_rpc_handles_complex_json_response() {
2580 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, counter) =
2581 setup_mocks();
2582 let relayer_model = create_test_relayer();
2583
2584 let complex_response = serde_json::json!({
2585 "number": "0x1b4",
2586 "hash": "0xdc0818cf78f21a8e70579cb46a43643f78291264dda342ae31049421c82d21ae",
2587 "parentHash": "0xe99e022112df268ce40b8b654759b4f39c3cc1b8c86b2f4c7da48ba6d8a6ae8b",
2588 "transactions": [
2589 {
2590 "hash": "0x5c504ed432cb51138bcf09aa5e8a410dd4a1e204ef84bfed1be16dfba1b22060",
2591 "from": "0xa7d9ddbe1f17865597fbd27ec712455208b6b76d",
2592 "to": "0xf02c1c8e6114b1dbe8937a39260b5b0a374432bb",
2593 "value": "0xf3dbb76162000"
2594 }
2595 ],
2596 "gasUsed": "0x5208"
2597 });
2598
2599 provider.expect_raw_request_dyn().returning(move |_, _| {
2600 let response = complex_response.clone();
2601 Box::pin(async move { Ok(response) })
2602 });
2603
2604 let relayer = EvmRelayer::new(
2605 relayer_model,
2606 signer,
2607 provider,
2608 create_test_evm_network(),
2609 Arc::new(relayer_repo),
2610 Arc::new(network_repo),
2611 Arc::new(tx_repo),
2612 Arc::new(counter),
2613 Arc::new(job_producer),
2614 )
2615 .unwrap();
2616
2617 let request = JsonRpcRequest {
2618 jsonrpc: "2.0".to_string(),
2619 params: NetworkRpcRequest::Evm(EvmRpcRequest::RawRpcRequest {
2620 method: "eth_getBlockByNumber".to_string(),
2621 params: serde_json::json!(["0x1b4", true]),
2622 }),
2623 id: Some(JsonRpcId::Number(8)),
2624 };
2625
2626 let response = relayer.rpc(request).await.unwrap();
2627 assert!(response.error.is_none());
2628 assert!(response.result.is_some());
2629
2630 if let Some(NetworkRpcResult::Evm(EvmRpcResult::RawRpcResult(result))) = response.result {
2631 assert!(result.get("transactions").is_some());
2632 assert!(result.get("hash").is_some());
2633 assert!(result.get("gasUsed").is_some());
2634 }
2635 }
2636
2637 #[tokio::test]
2638 async fn test_initialize_relayer_disables_when_validation_fails() {
2639 let (
2640 mut provider,
2641 mut relayer_repo,
2642 network_repo,
2643 tx_repo,
2644 mut job_producer,
2645 signer,
2646 mut counter,
2647 ) = setup_mocks();
2648 let mut relayer_model = create_test_relayer();
2649 relayer_model.system_disabled = false; relayer_model.notification_id = Some("test-notification-id".to_string());
2651
2652 provider
2654 .expect_get_transaction_count()
2655 .returning(|_| Box::pin(ready(Err(ProviderError::Other("RPC error".to_string())))));
2656
2657 counter
2658 .expect_get()
2659 .returning(|| Box::pin(ready(Ok(Some(0u64)))));
2660
2661 provider
2663 .expect_get_balance()
2664 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64)))));
2665
2666 provider
2667 .expect_health_check()
2668 .returning(|| Box::pin(ready(Ok(true))));
2669
2670 let mut disabled_relayer = relayer_model.clone();
2672 disabled_relayer.system_disabled = true;
2673 relayer_repo
2674 .expect_disable_relayer()
2675 .with(eq("test-relayer-id".to_string()), always())
2676 .returning(move |_, _| Ok(disabled_relayer.clone()));
2677
2678 job_producer
2680 .expect_produce_send_notification_job()
2681 .returning(|_, _| Box::pin(ready(Ok(()))));
2682
2683 job_producer
2685 .expect_produce_relayer_health_check_job()
2686 .returning(|_, _| Box::pin(ready(Ok(()))));
2687
2688 let relayer = EvmRelayer::new(
2689 relayer_model,
2690 signer,
2691 provider,
2692 create_test_evm_network(),
2693 Arc::new(relayer_repo),
2694 Arc::new(network_repo),
2695 Arc::new(tx_repo),
2696 Arc::new(counter),
2697 Arc::new(job_producer),
2698 )
2699 .unwrap();
2700
2701 let result = relayer.initialize_relayer().await;
2702 assert!(result.is_ok());
2703 }
2704
2705 #[tokio::test]
2706 async fn test_initialize_relayer_enables_when_validation_passes_and_was_disabled() {
2707 let (
2708 mut provider,
2709 mut relayer_repo,
2710 network_repo,
2711 tx_repo,
2712 job_producer,
2713 signer,
2714 mut counter,
2715 ) = setup_mocks();
2716 let mut relayer_model = create_test_relayer();
2717 relayer_model.system_disabled = true; provider
2721 .expect_get_transaction_count()
2722 .returning(|_| Box::pin(ready(Ok(42u64))));
2723
2724 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
2725
2726 counter
2727 .expect_get()
2728 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
2729
2730 provider
2731 .expect_get_balance()
2732 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); provider
2735 .expect_health_check()
2736 .returning(|| Box::pin(ready(Ok(true))));
2737
2738 let mut enabled_relayer = relayer_model.clone();
2740 enabled_relayer.system_disabled = false;
2741 relayer_repo
2742 .expect_enable_relayer()
2743 .with(eq("test-relayer-id".to_string()))
2744 .returning(move |_| Ok(enabled_relayer.clone()));
2745
2746 let relayer = EvmRelayer::new(
2747 relayer_model,
2748 signer,
2749 provider,
2750 create_test_evm_network(),
2751 Arc::new(relayer_repo),
2752 Arc::new(network_repo),
2753 Arc::new(tx_repo),
2754 Arc::new(counter),
2755 Arc::new(job_producer),
2756 )
2757 .unwrap();
2758
2759 let result = relayer.initialize_relayer().await;
2760 assert!(result.is_ok());
2761 }
2762
2763 #[tokio::test]
2764 async fn test_initialize_relayer_no_action_when_enabled_and_validation_passes() {
2765 let (mut provider, relayer_repo, network_repo, tx_repo, job_producer, signer, mut counter) =
2766 setup_mocks();
2767 let mut relayer_model = create_test_relayer();
2768 relayer_model.system_disabled = false; provider
2772 .expect_get_transaction_count()
2773 .returning(|_| Box::pin(ready(Ok(42u64))));
2774
2775 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
2776
2777 counter
2778 .expect_get()
2779 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
2780
2781 provider
2782 .expect_get_balance()
2783 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); provider
2786 .expect_health_check()
2787 .returning(|| Box::pin(ready(Ok(true))));
2788
2789 let relayer = EvmRelayer::new(
2792 relayer_model,
2793 signer,
2794 provider,
2795 create_test_evm_network(),
2796 Arc::new(relayer_repo),
2797 Arc::new(network_repo),
2798 Arc::new(tx_repo),
2799 Arc::new(counter),
2800 Arc::new(job_producer),
2801 )
2802 .unwrap();
2803
2804 let result = relayer.initialize_relayer().await;
2805 assert!(result.is_ok());
2806 }
2807
2808 #[tokio::test]
2809 async fn test_initialize_relayer_sends_notification_when_disabled() {
2810 let (
2811 mut provider,
2812 mut relayer_repo,
2813 network_repo,
2814 tx_repo,
2815 mut job_producer,
2816 signer,
2817 mut counter,
2818 ) = setup_mocks();
2819 let mut relayer_model = create_test_relayer();
2820 relayer_model.system_disabled = false; relayer_model.notification_id = Some("test-notification-id".to_string());
2822
2823 provider
2825 .expect_get_transaction_count()
2826 .returning(|_| Box::pin(ready(Ok(42u64))));
2827
2828 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
2829
2830 counter
2831 .expect_get()
2832 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
2833
2834 provider
2835 .expect_get_balance()
2836 .returning(|_| Box::pin(ready(Ok(U256::from(200000000000000000u64))))); provider.expect_health_check().returning(|| {
2839 Box::pin(ready(Err(ProviderError::Other(
2840 "RPC validation failed".to_string(),
2841 ))))
2842 });
2843
2844 let mut disabled_relayer = relayer_model.clone();
2846 disabled_relayer.system_disabled = true;
2847 relayer_repo
2848 .expect_disable_relayer()
2849 .with(eq("test-relayer-id".to_string()), always())
2850 .returning(move |_, _| Ok(disabled_relayer.clone()));
2851
2852 job_producer
2854 .expect_produce_send_notification_job()
2855 .returning(|_, _| Box::pin(ready(Ok(()))));
2856
2857 job_producer
2859 .expect_produce_relayer_health_check_job()
2860 .returning(|_, _| Box::pin(ready(Ok(()))));
2861
2862 let relayer = EvmRelayer::new(
2863 relayer_model,
2864 signer,
2865 provider,
2866 create_test_evm_network(),
2867 Arc::new(relayer_repo),
2868 Arc::new(network_repo),
2869 Arc::new(tx_repo),
2870 Arc::new(counter),
2871 Arc::new(job_producer),
2872 )
2873 .unwrap();
2874
2875 let result = relayer.initialize_relayer().await;
2876 assert!(result.is_ok());
2877 }
2878
2879 #[tokio::test]
2880 async fn test_initialize_relayer_no_notification_when_no_notification_id() {
2881 let (
2882 mut provider,
2883 mut relayer_repo,
2884 network_repo,
2885 tx_repo,
2886 mut job_producer,
2887 signer,
2888 mut counter,
2889 ) = setup_mocks();
2890 let mut relayer_model = create_test_relayer();
2891 relayer_model.system_disabled = false; relayer_model.notification_id = None; provider
2896 .expect_get_transaction_count()
2897 .returning(|_| Box::pin(ready(Ok(42u64))));
2898
2899 counter.expect_set().returning(|_| Box::pin(ready(Ok(()))));
2900
2901 counter
2902 .expect_get()
2903 .returning(|| Box::pin(ready(Ok(Some(42u64)))));
2904
2905 provider
2906 .expect_get_balance()
2907 .returning(|_| Box::pin(ready(Ok(U256::from(50000000000000000u64))))); provider
2910 .expect_health_check()
2911 .returning(|| Box::pin(ready(Ok(true))));
2912
2913 let mut disabled_relayer = relayer_model.clone();
2915 disabled_relayer.system_disabled = true;
2916 relayer_repo
2917 .expect_disable_relayer()
2918 .with(eq("test-relayer-id".to_string()), always())
2919 .returning(move |_, _| Ok(disabled_relayer.clone()));
2920
2921 job_producer
2924 .expect_produce_relayer_health_check_job()
2925 .returning(|_, _| Box::pin(ready(Ok(()))));
2926
2927 let relayer = EvmRelayer::new(
2928 relayer_model,
2929 signer,
2930 provider,
2931 create_test_evm_network(),
2932 Arc::new(relayer_repo),
2933 Arc::new(network_repo),
2934 Arc::new(tx_repo),
2935 Arc::new(counter),
2936 Arc::new(job_producer),
2937 )
2938 .unwrap();
2939
2940 let result = relayer.initialize_relayer().await;
2941 assert!(result.is_ok());
2942 }
2943}