1use crate::constants::get_stellar_sponsored_transaction_validity_duration;
2use crate::domain::relayer::evm::create_error_response;
3use crate::services::stellar_dex::StellarDexService;
4use crate::utils::{map_provider_error, sanitize_error_description};
5use crate::{
28 constants::{
29 transactions::PENDING_TRANSACTION_STATUSES, STELLAR_SMALLEST_UNIT_NAME,
30 STELLAR_STATUS_CHECK_INITIAL_DELAY_SECONDS,
31 },
32 domain::{
33 create_success_response, transaction::stellar::fetch_next_sequence_from_chain,
34 BalanceResponse, SignDataRequest, SignDataResponse, SignTransactionExternalResponse,
35 SignTransactionExternalResponseStellar, SignTransactionRequest, SignTypedDataRequest,
36 },
37 jobs::{JobProducerTrait, RelayerHealthCheck, TransactionRequest, TransactionStatusCheck},
38 models::{
39 produce_relayer_disabled_payload, DeletePendingTransactionsResponse, DisabledReason,
40 HealthCheckFailure, JsonRpcRequest, JsonRpcResponse, NetworkRepoModel, NetworkRpcRequest,
41 NetworkRpcResult, NetworkTransactionRequest, NetworkType, PaginationQuery,
42 RelayerNetworkPolicy, RelayerRepoModel, RelayerStatus, RelayerStellarPolicy,
43 RepositoryError, RpcErrorCodes, StellarAllowedTokensPolicy, StellarFeePaymentStrategy,
44 StellarNetwork, StellarRpcRequest, TransactionRepoModel, TransactionStatus,
45 TransactionUpdateRequest,
46 },
47 repositories::{NetworkRepository, RelayerRepository, Repository, TransactionRepository},
48 services::{
49 provider::{StellarProvider, StellarProviderTrait},
50 signer::{StellarSignTrait, StellarSigner},
51 stellar_dex::StellarDexServiceTrait,
52 TransactionCounterService, TransactionCounterServiceTrait,
53 },
54 utils::calculate_scheduled_timestamp,
55};
56use async_trait::async_trait;
57use eyre::Result;
58use futures::future::try_join_all;
59use std::sync::Arc;
60use tracing::{debug, error, info, instrument, warn};
61
62use crate::domain::relayer::stellar::xdr_utils::parse_transaction_xdr;
63use crate::domain::relayer::{Relayer, RelayerError, StellarRelayerDexTrait};
64use crate::domain::transaction::stellar::token::get_token_metadata;
65use crate::domain::transaction::stellar::StellarTransactionValidator;
66
67pub struct StellarRelayerDependencies<RR, NR, TR, J, TCS>
69where
70 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
71 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
72 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
73 J: JobProducerTrait + Send + Sync + 'static,
74 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
75{
76 pub relayer_repository: Arc<RR>,
77 pub network_repository: Arc<NR>,
78 pub transaction_repository: Arc<TR>,
79 pub transaction_counter_service: Arc<TCS>,
80 pub job_producer: Arc<J>,
81}
82
83impl<RR, NR, TR, J, TCS> StellarRelayerDependencies<RR, NR, TR, J, TCS>
84where
85 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
86 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
87 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
88 J: JobProducerTrait + Send + Sync,
89 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
90{
91 pub fn new(
105 relayer_repository: Arc<RR>,
106 network_repository: Arc<NR>,
107 transaction_repository: Arc<TR>,
108 transaction_counter_service: Arc<TCS>,
109 job_producer: Arc<J>,
110 ) -> Self {
111 Self {
112 relayer_repository,
113 network_repository,
114 transaction_repository,
115 transaction_counter_service,
116 job_producer,
117 }
118 }
119}
120
121#[allow(dead_code)]
122pub struct StellarRelayer<P, RR, NR, TR, J, TCS, S, D>
123where
124 P: StellarProviderTrait + Send + Sync + 'static,
125 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
126 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
127 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
128 J: JobProducerTrait + Send + Sync + 'static,
129 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
130 S: StellarSignTrait + Send + Sync + 'static,
131 D: StellarDexServiceTrait + Send + Sync + 'static,
132{
133 pub(crate) relayer: RelayerRepoModel,
134 pub(crate) signer: Arc<S>,
135 pub(crate) network: StellarNetwork,
136 pub(crate) provider: P,
137 pub(crate) relayer_repository: Arc<RR>,
138 network_repository: Arc<NR>,
139 transaction_repository: Arc<TR>,
140 transaction_counter_service: Arc<TCS>,
141 pub(crate) job_producer: Arc<J>,
142 pub(crate) dex_service: Arc<D>,
143}
144
145pub type DefaultStellarRelayer<J, TR, NR, RR, TCR> = StellarRelayer<
146 StellarProvider,
147 RR,
148 NR,
149 TR,
150 J,
151 TransactionCounterService<TCR>,
152 StellarSigner,
153 StellarDexService<StellarProvider, StellarSigner>,
154>;
155
156impl<P, RR, NR, TR, J, TCS, S, D> StellarRelayer<P, RR, NR, TR, J, TCS, S, D>
157where
158 P: StellarProviderTrait + Send + Sync,
159 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
160 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
161 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
162 J: JobProducerTrait + Send + Sync + 'static,
163 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
164 S: StellarSignTrait + Send + Sync + 'static,
165 D: StellarDexServiceTrait + Send + Sync + 'static,
166{
167 #[allow(clippy::too_many_arguments)]
186 pub async fn new(
187 relayer: RelayerRepoModel,
188 signer: Arc<S>,
189 provider: P,
190 dependencies: StellarRelayerDependencies<RR, NR, TR, J, TCS>,
191 dex_service: Arc<D>,
192 ) -> Result<Self, RelayerError> {
193 let network_repo = dependencies
194 .network_repository
195 .get_by_name(NetworkType::Stellar, &relayer.network)
196 .await
197 .ok()
198 .flatten()
199 .ok_or_else(|| {
200 RelayerError::NetworkConfiguration(format!("Network {} not found", relayer.network))
201 })?;
202
203 let network = StellarNetwork::try_from(network_repo.clone())?;
204
205 Ok(Self {
206 relayer,
207 signer,
208 network,
209 provider,
210 relayer_repository: dependencies.relayer_repository,
211 network_repository: dependencies.network_repository,
212 transaction_repository: dependencies.transaction_repository,
213 transaction_counter_service: dependencies.transaction_counter_service,
214 job_producer: dependencies.job_producer,
215 dex_service,
216 })
217 }
218
219 #[instrument(
220 level = "debug",
221 skip(self),
222 fields(
223 request_id = ?crate::observability::request_id::get_request_id(),
224 relayer_id = %self.relayer.id,
225 )
226 )]
227 async fn sync_sequence(&self) -> Result<(), RelayerError> {
228 info!(
229 address = %self.relayer.address,
230 "syncing sequence from chain"
231 );
232
233 let next = fetch_next_sequence_from_chain(&self.provider, &self.relayer.address)
234 .await
235 .map_err(RelayerError::ProviderError)?;
236
237 info!(
238 next_sequence = %next,
239 "setting next sequence"
240 );
241 self.transaction_counter_service
242 .set(next)
243 .await
244 .map_err(RelayerError::from)?;
245 Ok(())
246 }
247
248 #[instrument(
258 level = "debug",
259 skip(self),
260 fields(
261 request_id = ?crate::observability::request_id::get_request_id(),
262 relayer_id = %self.relayer.id,
263 )
264 )]
265 async fn populate_allowed_tokens_metadata(&self) -> Result<RelayerStellarPolicy, RelayerError> {
266 let mut policy = self.relayer.policies.get_stellar_policy();
267 let allowed_tokens = match policy.allowed_tokens.as_ref() {
269 Some(tokens) if !tokens.is_empty() => tokens,
270 _ => {
271 info!("No allowed tokens specified; skipping token metadata population.");
272 return Ok(policy);
273 }
274 };
275
276 let token_metadata_futures = allowed_tokens.iter().map(|token| {
277 let asset_id = token.asset.clone();
278 let provider = &self.provider;
279 async move {
280 let metadata = get_token_metadata(provider, &asset_id)
281 .await
282 .map_err(RelayerError::from)?;
283
284 Ok::<StellarAllowedTokensPolicy, RelayerError>(StellarAllowedTokensPolicy {
285 asset: asset_id,
286 metadata: Some(metadata),
287 max_allowed_fee: token.max_allowed_fee,
288 swap_config: token.swap_config.clone(),
289 })
290 }
291 });
292
293 let updated_allowed_tokens = try_join_all(token_metadata_futures).await?;
294
295 policy.allowed_tokens = Some(updated_allowed_tokens.clone());
296
297 self.relayer_repository
298 .update_policy(
299 self.relayer.id.clone(),
300 RelayerNetworkPolicy::Stellar(policy.clone()),
301 )
302 .await?;
303
304 Ok(policy)
305 }
306
307 #[instrument(
316 level = "debug",
317 skip(self),
318 fields(
319 request_id = ?crate::observability::request_id::get_request_id(),
320 relayer_id = %self.relayer.id,
321 )
322 )]
323 async fn migrate_fee_payment_strategy_if_needed(&self) -> Result<(), RelayerError> {
324 if !self.relayer_repository.is_persistent_storage() {
327 debug!(
328 relayer_id = %self.relayer.id,
329 "Skipping migration: using in-memory storage"
330 );
331 return Ok(());
332 }
333
334 let policy = self.relayer.policies.get_stellar_policy();
335
336 if policy.fee_payment_strategy.is_some() {
338 return Ok(());
339 }
340
341 info!(
343 relayer_id = %self.relayer.id,
344 "Migrating Stellar relayer: setting fee_payment_strategy to 'Relayer' (old default behavior)"
345 );
346
347 let mut updated_policy = policy;
349 updated_policy.fee_payment_strategy = Some(StellarFeePaymentStrategy::Relayer);
350
351 self.relayer_repository
353 .update_policy(
354 self.relayer.id.clone(),
355 RelayerNetworkPolicy::Stellar(updated_policy),
356 )
357 .await
358 .map_err(|e| {
359 RelayerError::PolicyConfigurationError(format!(
360 "Failed to migrate fee_payment_strategy policy: {e}"
361 ))
362 })?;
363
364 debug!(
365 relayer_id = %self.relayer.id,
366 "Successfully migrated fee_payment_strategy policy"
367 );
368
369 Ok(())
370 }
371
372 #[instrument(
376 level = "debug",
377 skip(self),
378 fields(
379 request_id = ?crate::observability::request_id::get_request_id(),
380 relayer_id = %self.relayer.id,
381 )
382 )]
383 async fn check_balance_and_trigger_token_swap_if_needed(&self) -> Result<(), RelayerError> {
384 let policy = self.relayer.policies.get_stellar_policy();
385
386 let swap_config = match policy.get_swap_config() {
388 Some(config) => config,
389 None => {
390 debug!(
391 relayer_id = %self.relayer.id,
392 "No swap configuration specified; skipping balance check"
393 );
394 return Ok(());
395 }
396 };
397
398 let threshold = match swap_config.min_balance_threshold {
400 Some(threshold) => threshold,
401 None => {
402 debug!(
403 relayer_id = %self.relayer.id,
404 "No swap min balance threshold specified; skipping validation"
405 );
406 return Ok(());
407 }
408 };
409
410 let balance_response = self.get_balance().await?;
412 let current_balance = u64::try_from(balance_response.balance).map_err(|_| {
413 RelayerError::Internal("Account balance exceeds u64 maximum value".to_string())
414 })?;
415
416 if current_balance < threshold {
418 debug!(
419 relayer_id = %self.relayer.id,
420 balance = current_balance,
421 threshold = threshold,
422 "XLM balance is below threshold, triggering token swap"
423 );
424
425 let _swap_results = self
426 .handle_token_swap_request(self.relayer.id.clone())
427 .await?;
428 } else {
429 debug!(
430 relayer_id = %self.relayer.id,
431 balance = current_balance,
432 threshold = threshold,
433 "XLM balance is above threshold, no swap needed"
434 );
435 }
436
437 Ok(())
438 }
439}
440
441#[async_trait]
442impl<P, RR, NR, TR, J, TCS, S, D> Relayer for StellarRelayer<P, RR, NR, TR, J, TCS, S, D>
443where
444 P: StellarProviderTrait + Send + Sync + 'static,
445 D: StellarDexServiceTrait + Send + Sync + 'static,
446 RR: Repository<RelayerRepoModel, String> + RelayerRepository + Send + Sync + 'static,
447 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
448 TR: Repository<TransactionRepoModel, String> + TransactionRepository + Send + Sync + 'static,
449 J: JobProducerTrait + Send + Sync + 'static,
450 TCS: TransactionCounterServiceTrait + Send + Sync + 'static,
451 S: StellarSignTrait + Send + Sync + 'static,
452{
453 #[instrument(
454 level = "debug",
455 skip(self, network_transaction),
456 fields(
457 request_id = ?crate::observability::request_id::get_request_id(),
458 relayer_id = %self.relayer.id,
459 network_type = ?self.relayer.network_type,
460 )
461 )]
462 async fn process_transaction_request(
463 &self,
464 network_transaction: NetworkTransactionRequest,
465 ) -> Result<TransactionRepoModel, RelayerError> {
466 let network_model = self
467 .network_repository
468 .get_by_name(NetworkType::Stellar, &self.relayer.network)
469 .await?
470 .ok_or_else(|| {
471 RelayerError::NetworkConfiguration(format!(
472 "Network {} not found",
473 self.relayer.network
474 ))
475 })?;
476 let transaction =
477 TransactionRepoModel::try_from((&network_transaction, &self.relayer, &network_model))?;
478
479 self.transaction_repository
480 .create(transaction.clone())
481 .await
482 .map_err(|e| RepositoryError::TransactionFailure(e.to_string()))?;
483
484 if let Err(e) = self
488 .job_producer
489 .produce_check_transaction_status_job(
490 TransactionStatusCheck::new(
491 transaction.id.clone(),
492 transaction.relayer_id.clone(),
493 NetworkType::Stellar,
494 ),
495 Some(calculate_scheduled_timestamp(
496 STELLAR_STATUS_CHECK_INITIAL_DELAY_SECONDS,
497 )),
498 )
499 .await
500 {
501 error!(
503 relayer_id = %self.relayer.id,
504 transaction_id = %transaction.id,
505 error = %e,
506 "Status check queue push failed - marking transaction as failed"
507 );
508 if let Err(update_err) = self
509 .transaction_repository
510 .partial_update(
511 transaction.id.clone(),
512 TransactionUpdateRequest {
513 status: Some(TransactionStatus::Failed),
514 status_reason: Some("Queue unavailable".to_string()),
515 ..Default::default()
516 },
517 )
518 .await
519 {
520 warn!(
521 relayer_id = %self.relayer.id,
522 transaction_id = %transaction.id,
523 error = %update_err,
524 "Failed to mark transaction as failed after queue push failure"
525 );
526 }
527 return Err(e.into());
528 }
529
530 self.job_producer
533 .produce_transaction_request_job(
534 TransactionRequest::new(transaction.id.clone(), transaction.relayer_id.clone()),
535 None,
536 )
537 .await?;
538
539 Ok(transaction)
540 }
541
542 #[instrument(
543 level = "debug",
544 skip(self),
545 fields(
546 request_id = ?crate::observability::request_id::get_request_id(),
547 relayer_id = %self.relayer.id,
548 )
549 )]
550 async fn get_balance(&self) -> Result<BalanceResponse, RelayerError> {
551 let account_entry = self
552 .provider
553 .get_account(&self.relayer.address)
554 .await
555 .map_err(|e| {
556 warn!(
557 relayer_id = %self.relayer.id,
558 address = %self.relayer.address,
559 error = %e,
560 "get_account failed in get_balance (called before transaction creation)"
561 );
562 crate::metrics::API_RPC_FAILURES
564 .with_label_values(&[
565 self.relayer.id.as_str(),
566 "stellar",
567 "get_balance",
568 "get_account_failed",
569 ])
570 .inc();
571 RelayerError::ProviderError(format!("Failed to fetch account for balance: {e}"))
572 })?;
573
574 Ok(BalanceResponse {
575 balance: account_entry.balance as u128,
576 unit: STELLAR_SMALLEST_UNIT_NAME.to_string(),
577 })
578 }
579
580 #[instrument(
581 level = "debug",
582 skip(self),
583 fields(
584 request_id = ?crate::observability::request_id::get_request_id(),
585 relayer_id = %self.relayer.id,
586 )
587 )]
588 async fn get_status(&self) -> Result<RelayerStatus, RelayerError> {
589 let relayer_model = &self.relayer;
590
591 let account_entry = self
592 .provider
593 .get_account(&relayer_model.address)
594 .await
595 .map_err(|e| {
596 warn!(
597 relayer_id = %relayer_model.id,
598 address = %relayer_model.address,
599 error = %e,
600 "get_account failed in get_status (called before transaction creation)"
601 );
602 crate::metrics::API_RPC_FAILURES
604 .with_label_values(&[
605 relayer_model.id.as_str(),
606 "stellar",
607 "get_status",
608 "get_account_failed",
609 ])
610 .inc();
611 RelayerError::ProviderError(format!("Failed to get account details: {e}"))
612 })?;
613
614 let sequence_number_str = account_entry.seq_num.0.to_string();
615
616 let balance_response = self.get_balance().await?;
617
618 let pending_transactions_count = self
620 .transaction_repository
621 .count_by_status(&relayer_model.id, PENDING_TRANSACTION_STATUSES)
622 .await
623 .map_err(RelayerError::from)?;
624
625 let last_confirmed_transaction_timestamp = self
627 .transaction_repository
628 .find_by_status_paginated(
629 &relayer_model.id,
630 &[TransactionStatus::Confirmed],
631 PaginationQuery {
632 page: 1,
633 per_page: 1,
634 },
635 false, )
637 .await
638 .map_err(RelayerError::from)?
639 .items
640 .into_iter()
641 .next()
642 .and_then(|tx| tx.confirmed_at);
643
644 Ok(RelayerStatus::Stellar {
645 balance: balance_response.balance.to_string(),
646 pending_transactions_count,
647 last_confirmed_transaction_timestamp,
648 system_disabled: relayer_model.system_disabled,
649 paused: relayer_model.paused,
650 sequence_number: sequence_number_str,
651 })
652 }
653
654 #[instrument(
655 level = "debug",
656 skip(self),
657 fields(
658 request_id = ?crate::observability::request_id::get_request_id(),
659 relayer_id = %self.relayer.id,
660 )
661 )]
662 async fn delete_pending_transactions(
663 &self,
664 ) -> Result<DeletePendingTransactionsResponse, RelayerError> {
665 println!("Stellar delete_pending_transactions...");
666 Ok(DeletePendingTransactionsResponse {
667 queued_for_cancellation_transaction_ids: vec![],
668 failed_to_queue_transaction_ids: vec![],
669 total_processed: 0,
670 })
671 }
672
673 #[instrument(
674 level = "debug",
675 skip(self, _request),
676 fields(
677 request_id = ?crate::observability::request_id::get_request_id(),
678 relayer_id = %self.relayer.id,
679 )
680 )]
681 async fn sign_data(&self, _request: SignDataRequest) -> Result<SignDataResponse, RelayerError> {
682 Err(RelayerError::NotSupported(
683 "Signing data not supported for Stellar".to_string(),
684 ))
685 }
686
687 #[instrument(
688 level = "debug",
689 skip(self, _request),
690 fields(
691 request_id = ?crate::observability::request_id::get_request_id(),
692 relayer_id = %self.relayer.id,
693 )
694 )]
695 async fn sign_typed_data(
696 &self,
697 _request: SignTypedDataRequest,
698 ) -> Result<SignDataResponse, RelayerError> {
699 Err(RelayerError::NotSupported(
700 "Signing typed data not supported for Stellar".to_string(),
701 ))
702 }
703
704 #[instrument(
705 level = "debug",
706 skip(self, request),
707 fields(
708 request_id = ?crate::observability::request_id::get_request_id(),
709 relayer_id = %self.relayer.id,
710 )
711 )]
712 async fn rpc(
713 &self,
714 request: JsonRpcRequest<NetworkRpcRequest>,
715 ) -> Result<JsonRpcResponse<NetworkRpcResult>, RelayerError> {
716 let JsonRpcRequest { id, params, .. } = request;
717 let stellar_request = match params {
718 NetworkRpcRequest::Stellar(stellar_req) => stellar_req,
719 _ => {
720 return Ok(create_error_response(
721 id.clone(),
722 RpcErrorCodes::INVALID_PARAMS,
723 "Invalid params",
724 "Expected Stellar network request",
725 ))
726 }
727 };
728
729 let (method, params_json) = match stellar_request {
731 StellarRpcRequest::RawRpcRequest { method, params } => (method, params),
732 };
733
734 match self
735 .provider
736 .raw_request_dyn(&method, params_json, id.clone())
737 .await
738 {
739 Ok(result_value) => Ok(create_success_response(id.clone(), result_value)),
740 Err(provider_error) => {
741 tracing::error!(
743 error = %provider_error,
744 "RPC provider error occurred"
745 );
746 let (error_code, error_message) = map_provider_error(&provider_error);
747 let sanitized_description = sanitize_error_description(&provider_error);
748 Ok(create_error_response(
749 id.clone(),
750 error_code,
751 error_message,
752 &sanitized_description,
753 ))
754 }
755 }
756 }
757
758 #[instrument(
759 level = "debug",
760 skip(self),
761 fields(
762 request_id = ?crate::observability::request_id::get_request_id(),
763 relayer_id = %self.relayer.id,
764 )
765 )]
766 async fn validate_min_balance(&self) -> Result<(), RelayerError> {
767 Ok(())
768 }
769
770 #[instrument(
771 level = "debug",
772 skip(self),
773 fields(
774 request_id = ?crate::observability::request_id::get_request_id(),
775 relayer_id = %self.relayer.id,
776 )
777 )]
778 async fn initialize_relayer(&self) -> Result<(), RelayerError> {
779 debug!("initializing Stellar relayer");
780
781 self.migrate_fee_payment_strategy_if_needed().await?;
785
786 self.populate_allowed_tokens_metadata().await.map_err(|e| {
789 RelayerError::PolicyConfigurationError(format!(
790 "Error while processing allowed tokens policy: {e}"
791 ))
792 })?;
793
794 match self.check_health().await {
795 Ok(_) => {
796 if self.relayer.system_disabled {
798 self.relayer_repository
800 .enable_relayer(self.relayer.id.clone())
801 .await?;
802 }
803 }
804 Err(failures) => {
805 let reason = DisabledReason::from_health_failures(failures).unwrap_or_else(|| {
807 DisabledReason::SequenceSyncFailed("Unknown error".to_string())
808 });
809
810 warn!(reason = %reason, "disabling relayer");
811 let updated_relayer = self
812 .relayer_repository
813 .disable_relayer(self.relayer.id.clone(), reason.clone())
814 .await?;
815
816 if let Some(notification_id) = &self.relayer.notification_id {
818 self.job_producer
819 .produce_send_notification_job(
820 produce_relayer_disabled_payload(
821 notification_id,
822 &updated_relayer,
823 &reason.safe_description(),
824 ),
825 None,
826 )
827 .await?;
828 }
829
830 self.job_producer
832 .produce_relayer_health_check_job(
833 RelayerHealthCheck::new(self.relayer.id.clone()),
834 Some(calculate_scheduled_timestamp(10)),
835 )
836 .await?;
837 }
838 }
839 debug!(
840 "Stellar relayer initialized successfully: {}",
841 self.relayer.id
842 );
843 Ok(())
844 }
845
846 #[instrument(
847 level = "debug",
848 skip(self),
849 fields(
850 request_id = ?crate::observability::request_id::get_request_id(),
851 relayer_id = %self.relayer.id,
852 )
853 )]
854 async fn check_health(&self) -> Result<(), Vec<HealthCheckFailure>> {
855 debug!(
856 "running health checks for Stellar relayer {}",
857 self.relayer.id
858 );
859
860 let mut failures = Vec::new();
861
862 match self.sync_sequence().await {
864 Ok(_) => {
865 debug!(
866 "sequence sync passed for Stellar relayer {}",
867 self.relayer.id
868 );
869 }
870 Err(e) => {
871 let reason = HealthCheckFailure::SequenceSyncFailed(e.to_string());
872 warn!("sequence sync failed: {:?}", reason);
873 failures.push(reason);
874 }
875 }
876
877 let policy = self.relayer.policies.get_stellar_policy();
881 if matches!(
882 policy.fee_payment_strategy,
883 Some(StellarFeePaymentStrategy::User)
884 ) {
885 debug!(
886 "checking balance and attempting token swap for user fee payment strategy relayer {}",
887 self.relayer.id
888 );
889 if let Err(e) = self.check_balance_and_trigger_token_swap_if_needed().await {
890 warn!(
891 relayer_id = %self.relayer.id,
892 error = %e,
893 "Balance check or token swap failed, but not treating as health check failure"
894 );
895 } else {
896 debug!(
897 "balance check and token swap completed for Stellar relayer {}",
898 self.relayer.id
899 );
900 }
901 }
902
903 if failures.is_empty() {
904 debug!(
905 "all health checks passed for Stellar relayer {}",
906 self.relayer.id
907 );
908 Ok(())
909 } else {
910 warn!(
911 "health checks failed for Stellar relayer {}: {:?}",
912 self.relayer.id, failures
913 );
914 Err(failures)
915 }
916 }
917
918 #[instrument(
919 level = "debug",
920 skip(self, request),
921 fields(
922 request_id = ?crate::observability::request_id::get_request_id(),
923 relayer_id = %self.relayer.id,
924 )
925 )]
926 async fn sign_transaction(
927 &self,
928 request: &SignTransactionRequest,
929 ) -> Result<SignTransactionExternalResponse, RelayerError> {
930 let stellar_req = match request {
931 SignTransactionRequest::Stellar(req) => req,
932 _ => {
933 return Err(RelayerError::NotSupported(
934 "Invalid request type for Stellar relayer".to_string(),
935 ))
936 }
937 };
938
939 let policy = self.relayer.policies.get_stellar_policy();
940 let user_pays_fee = matches!(
941 policy.fee_payment_strategy,
942 Some(StellarFeePaymentStrategy::User)
943 );
944
945 if user_pays_fee {
947 let envelope = parse_transaction_xdr(&stellar_req.unsigned_xdr, false)
949 .map_err(|e| RelayerError::ValidationError(format!("Failed to parse XDR: {e}")))?;
950
951 StellarTransactionValidator::validate_user_fee_payment_transaction(
954 &envelope,
955 &self.relayer.address,
956 &policy,
957 &self.provider,
958 self.dex_service.as_ref(),
959 Some(get_stellar_sponsored_transaction_validity_duration()), )
961 .await
962 .map_err(|e| {
963 RelayerError::ValidationError(format!("Failed to validate transaction: {e}"))
964 })?;
965 }
966
967 let response = self
969 .signer
970 .sign_xdr_transaction(&stellar_req.unsigned_xdr, &self.network.passphrase)
971 .await
972 .map_err(RelayerError::SignerError)?;
973
974 let signature_bytes = &response.signature.signature.0;
976 let signature_string =
977 base64::Engine::encode(&base64::engine::general_purpose::STANDARD, signature_bytes);
978
979 Ok(SignTransactionExternalResponse::Stellar(
980 SignTransactionExternalResponseStellar {
981 signed_xdr: response.signed_xdr,
982 signature: signature_string,
983 },
984 ))
985 }
986}
987
988#[cfg(test)]
989mod tests {
990 use super::*;
991 use crate::{
992 config::{NetworkConfigCommon, StellarNetworkConfig},
993 constants::STELLAR_SMALLEST_UNIT_NAME,
994 domain::{SignTransactionRequestStellar, SignXdrTransactionResponseStellar},
995 jobs::MockJobProducerTrait,
996 models::{
997 NetworkConfigData, NetworkRepoModel, NetworkType, RelayerNetworkPolicy,
998 RelayerRepoModel, RelayerStellarPolicy, RpcConfig, SignerError,
999 },
1000 repositories::{
1001 InMemoryNetworkRepository, MockRelayerRepository, MockTransactionRepository,
1002 },
1003 services::{
1004 provider::{MockStellarProviderTrait, ProviderError},
1005 signer::MockStellarSignTrait,
1006 stellar_dex::MockStellarDexServiceTrait,
1007 MockTransactionCounterServiceTrait,
1008 },
1009 };
1010 use mockall::predicate::*;
1011 use soroban_rs::xdr::{
1012 AccountEntry, AccountEntryExt, AccountId, DecoratedSignature, PublicKey, SequenceNumber,
1013 Signature, SignatureHint, String32, Thresholds, Uint256, VecM,
1014 };
1015 use std::future::ready;
1016 use std::sync::Arc;
1017
1018 fn create_mock_dex_service() -> Arc<MockStellarDexServiceTrait> {
1020 let mut mock_dex = MockStellarDexServiceTrait::new();
1021 mock_dex.expect_supported_asset_types().returning(|| {
1022 use crate::services::stellar_dex::AssetType;
1023 std::collections::HashSet::from([AssetType::Native, AssetType::Classic])
1024 });
1025 Arc::new(mock_dex)
1026 }
1027
1028 struct TestCtx {
1030 relayer_model: RelayerRepoModel,
1031 network_repository: Arc<InMemoryNetworkRepository>,
1032 }
1033
1034 impl Default for TestCtx {
1035 fn default() -> Self {
1036 let network_repository = Arc::new(InMemoryNetworkRepository::new());
1037
1038 let relayer_model = RelayerRepoModel {
1039 id: "test-relayer-id".to_string(),
1040 name: "Test Relayer".to_string(),
1041 network: "testnet".to_string(),
1042 paused: false,
1043 network_type: NetworkType::Stellar,
1044 signer_id: "signer-id".to_string(),
1045 policies: RelayerNetworkPolicy::Stellar(RelayerStellarPolicy::default()),
1046 address: "GAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAWHF".to_string(),
1047 notification_id: Some("notification-id".to_string()),
1048 system_disabled: false,
1049 custom_rpc_urls: None,
1050 ..Default::default()
1051 };
1052
1053 TestCtx {
1054 relayer_model,
1055 network_repository,
1056 }
1057 }
1058 }
1059
1060 impl TestCtx {
1061 async fn setup_network(&self) {
1062 let test_network = NetworkRepoModel {
1063 id: "stellar:testnet".to_string(),
1064 name: "testnet".to_string(),
1065 network_type: NetworkType::Stellar,
1066 config: NetworkConfigData::Stellar(StellarNetworkConfig {
1067 common: NetworkConfigCommon {
1068 network: "testnet".to_string(),
1069 from: None,
1070 rpc_urls: Some(vec![RpcConfig::new(
1071 "https://horizon-testnet.stellar.org".to_string(),
1072 )]),
1073 explorer_urls: None,
1074 average_blocktime_ms: Some(5000),
1075 is_testnet: Some(true),
1076 tags: None,
1077 },
1078 passphrase: Some("Test SDF Network ; September 2015".to_string()),
1079 horizon_url: Some("https://horizon-testnet.stellar.org".to_string()),
1080 }),
1081 };
1082
1083 self.network_repository.create(test_network).await.unwrap();
1084 }
1085 }
1086
1087 #[tokio::test]
1088 async fn test_sync_sequence_success() {
1089 let ctx = TestCtx::default();
1090 ctx.setup_network().await;
1091 let relayer_model = ctx.relayer_model.clone();
1092 let mut provider = MockStellarProviderTrait::new();
1093 provider
1094 .expect_get_account()
1095 .with(eq(relayer_model.address.clone()))
1096 .returning(|_| {
1097 Box::pin(async {
1098 Ok(AccountEntry {
1099 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
1100 balance: 0,
1101 ext: AccountEntryExt::V0,
1102 flags: 0,
1103 home_domain: String32::default(),
1104 inflation_dest: None,
1105 seq_num: SequenceNumber(5),
1106 num_sub_entries: 0,
1107 signers: VecM::default(),
1108 thresholds: Thresholds([0, 0, 0, 0]),
1109 })
1110 })
1111 });
1112 let mut counter = MockTransactionCounterServiceTrait::new();
1113 counter
1114 .expect_set()
1115 .with(eq(6u64))
1116 .returning(|_| Box::pin(async { Ok(()) }));
1117 let relayer_repo = MockRelayerRepository::new();
1118 let tx_repo = MockTransactionRepository::new();
1119 let job_producer = MockJobProducerTrait::new();
1120 let signer = Arc::new(MockStellarSignTrait::new());
1121 let dex_service = create_mock_dex_service();
1122
1123 let relayer = StellarRelayer::new(
1124 relayer_model.clone(),
1125 signer,
1126 provider,
1127 StellarRelayerDependencies::new(
1128 Arc::new(relayer_repo),
1129 ctx.network_repository.clone(),
1130 Arc::new(tx_repo),
1131 Arc::new(counter),
1132 Arc::new(job_producer),
1133 ),
1134 dex_service,
1135 )
1136 .await
1137 .unwrap();
1138
1139 let result = relayer.sync_sequence().await;
1140 assert!(result.is_ok());
1141 }
1142
1143 #[tokio::test]
1144 async fn test_sync_sequence_provider_error() {
1145 let ctx = TestCtx::default();
1146 ctx.setup_network().await;
1147 let relayer_model = ctx.relayer_model.clone();
1148 let mut provider = MockStellarProviderTrait::new();
1149 provider
1150 .expect_get_account()
1151 .with(eq(relayer_model.address.clone()))
1152 .returning(|_| Box::pin(async { Err(ProviderError::Other("fail".to_string())) }));
1153 let counter = MockTransactionCounterServiceTrait::new();
1154 let relayer_repo = MockRelayerRepository::new();
1155 let tx_repo = MockTransactionRepository::new();
1156 let job_producer = MockJobProducerTrait::new();
1157 let signer = Arc::new(MockStellarSignTrait::new());
1158 let dex_service = create_mock_dex_service();
1159
1160 let relayer = StellarRelayer::new(
1161 relayer_model.clone(),
1162 signer,
1163 provider,
1164 StellarRelayerDependencies::new(
1165 Arc::new(relayer_repo),
1166 ctx.network_repository.clone(),
1167 Arc::new(tx_repo),
1168 Arc::new(counter),
1169 Arc::new(job_producer),
1170 ),
1171 dex_service,
1172 )
1173 .await
1174 .unwrap();
1175
1176 let result = relayer.sync_sequence().await;
1177 assert!(matches!(result, Err(RelayerError::ProviderError(_))));
1178 }
1179
1180 #[tokio::test]
1181 async fn test_get_status_success_stellar() {
1182 let ctx = TestCtx::default();
1183 ctx.setup_network().await;
1184 let relayer_model = ctx.relayer_model.clone();
1185 let mut provider_mock = MockStellarProviderTrait::new();
1186 let mut tx_repo_mock = MockTransactionRepository::new();
1187 let relayer_repo_mock = MockRelayerRepository::new();
1188 let job_producer_mock = MockJobProducerTrait::new();
1189 let counter_mock = MockTransactionCounterServiceTrait::new();
1190
1191 provider_mock.expect_get_account().times(2).returning(|_| {
1192 Box::pin(ready(Ok(AccountEntry {
1193 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
1194 balance: 10000000,
1195 seq_num: SequenceNumber(12345),
1196 ext: AccountEntryExt::V0,
1197 flags: 0,
1198 home_domain: String32::default(),
1199 inflation_dest: None,
1200 num_sub_entries: 0,
1201 signers: VecM::default(),
1202 thresholds: Thresholds([0, 0, 0, 0]),
1203 })))
1204 });
1205
1206 tx_repo_mock
1208 .expect_count_by_status()
1209 .withf(|relayer_id, statuses| {
1210 relayer_id == "test-relayer-id"
1211 && statuses
1212 == [
1213 TransactionStatus::Pending,
1214 TransactionStatus::Sent,
1215 TransactionStatus::Submitted,
1216 ]
1217 })
1218 .returning(|_, _| Ok(0u64))
1219 .once();
1220
1221 let confirmed_tx = TransactionRepoModel {
1223 id: "tx1_stellar".to_string(),
1224 relayer_id: relayer_model.id.clone(),
1225 status: TransactionStatus::Confirmed,
1226 confirmed_at: Some("2023-02-01T12:00:00Z".to_string()),
1227 ..TransactionRepoModel::default()
1228 };
1229 let relayer_id_clone = relayer_model.id.clone();
1230 tx_repo_mock
1231 .expect_find_by_status_paginated()
1232 .withf(move |relayer_id, statuses, query, oldest_first| {
1233 *relayer_id == relayer_id_clone
1234 && statuses == [TransactionStatus::Confirmed]
1235 && query.page == 1
1236 && query.per_page == 1
1237 && !(*oldest_first)
1238 })
1239 .returning(move |_, _, _, _| {
1240 Ok(crate::repositories::PaginatedResult {
1241 items: vec![confirmed_tx.clone()],
1242 total: 1,
1243 page: 1,
1244 per_page: 1,
1245 })
1246 })
1247 .once();
1248 let signer = Arc::new(MockStellarSignTrait::new());
1249 let dex_service = create_mock_dex_service();
1250
1251 let stellar_relayer = StellarRelayer::new(
1252 relayer_model.clone(),
1253 signer,
1254 provider_mock,
1255 StellarRelayerDependencies::new(
1256 Arc::new(relayer_repo_mock),
1257 ctx.network_repository.clone(),
1258 Arc::new(tx_repo_mock),
1259 Arc::new(counter_mock),
1260 Arc::new(job_producer_mock),
1261 ),
1262 dex_service,
1263 )
1264 .await
1265 .unwrap();
1266
1267 let status = stellar_relayer.get_status().await.unwrap();
1268
1269 match status {
1270 RelayerStatus::Stellar {
1271 balance,
1272 pending_transactions_count,
1273 last_confirmed_transaction_timestamp,
1274 system_disabled,
1275 paused,
1276 sequence_number,
1277 } => {
1278 assert_eq!(balance, "10000000");
1279 assert_eq!(pending_transactions_count, 0);
1280 assert_eq!(
1281 last_confirmed_transaction_timestamp,
1282 Some("2023-02-01T12:00:00Z".to_string())
1283 );
1284 assert_eq!(system_disabled, relayer_model.system_disabled);
1285 assert_eq!(paused, relayer_model.paused);
1286 assert_eq!(sequence_number, "12345");
1287 }
1288 _ => panic!("Expected Stellar RelayerStatus"),
1289 }
1290 }
1291
1292 #[tokio::test]
1293 async fn test_get_status_stellar_provider_error() {
1294 let ctx = TestCtx::default();
1295 ctx.setup_network().await;
1296 let relayer_model = ctx.relayer_model.clone();
1297 let mut provider_mock = MockStellarProviderTrait::new();
1298 let tx_repo_mock = MockTransactionRepository::new();
1299 let relayer_repo_mock = MockRelayerRepository::new();
1300 let job_producer_mock = MockJobProducerTrait::new();
1301 let counter_mock = MockTransactionCounterServiceTrait::new();
1302
1303 provider_mock
1304 .expect_get_account()
1305 .with(eq(relayer_model.address.clone()))
1306 .returning(|_| {
1307 Box::pin(async { Err(ProviderError::Other("Stellar provider down".to_string())) })
1308 });
1309 let signer = Arc::new(MockStellarSignTrait::new());
1310 let dex_service = create_mock_dex_service();
1311
1312 let stellar_relayer = StellarRelayer::new(
1313 relayer_model.clone(),
1314 signer,
1315 provider_mock,
1316 StellarRelayerDependencies::new(
1317 Arc::new(relayer_repo_mock),
1318 ctx.network_repository.clone(),
1319 Arc::new(tx_repo_mock),
1320 Arc::new(counter_mock),
1321 Arc::new(job_producer_mock),
1322 ),
1323 dex_service,
1324 )
1325 .await
1326 .unwrap();
1327
1328 let result = stellar_relayer.get_status().await;
1329 assert!(result.is_err());
1330 match result.err().unwrap() {
1331 RelayerError::ProviderError(msg) => {
1332 assert!(msg.contains("Failed to get account details"))
1333 }
1334 _ => panic!("Expected ProviderError for get_account failure"),
1335 }
1336 }
1337
1338 #[tokio::test]
1339 async fn test_get_balance_success() {
1340 let ctx = TestCtx::default();
1341 ctx.setup_network().await;
1342 let relayer_model = ctx.relayer_model.clone();
1343 let mut provider = MockStellarProviderTrait::new();
1344 let expected_balance = 100_000_000i64; provider
1347 .expect_get_account()
1348 .with(eq(relayer_model.address.clone()))
1349 .returning(move |_| {
1350 Box::pin(async move {
1351 Ok(AccountEntry {
1352 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
1353 balance: expected_balance,
1354 ext: AccountEntryExt::V0,
1355 flags: 0,
1356 home_domain: String32::default(),
1357 inflation_dest: None,
1358 seq_num: SequenceNumber(5),
1359 num_sub_entries: 0,
1360 signers: VecM::default(),
1361 thresholds: Thresholds([0, 0, 0, 0]),
1362 })
1363 })
1364 });
1365
1366 let relayer_repo = Arc::new(MockRelayerRepository::new());
1367 let tx_repo = Arc::new(MockTransactionRepository::new());
1368 let job_producer = Arc::new(MockJobProducerTrait::new());
1369 let counter = Arc::new(MockTransactionCounterServiceTrait::new());
1370 let signer = Arc::new(MockStellarSignTrait::new());
1371 let dex_service = create_mock_dex_service();
1372
1373 let relayer = StellarRelayer::new(
1374 relayer_model,
1375 signer,
1376 provider,
1377 StellarRelayerDependencies::new(
1378 relayer_repo,
1379 ctx.network_repository.clone(),
1380 tx_repo,
1381 counter,
1382 job_producer,
1383 ),
1384 dex_service,
1385 )
1386 .await
1387 .unwrap();
1388
1389 let result = relayer.get_balance().await;
1390 assert!(result.is_ok());
1391 let balance_response = result.unwrap();
1392 assert_eq!(balance_response.balance, expected_balance as u128);
1393 assert_eq!(balance_response.unit, STELLAR_SMALLEST_UNIT_NAME);
1394 }
1395
1396 #[tokio::test]
1397 async fn test_get_balance_provider_error() {
1398 let ctx = TestCtx::default();
1399 ctx.setup_network().await;
1400 let relayer_model = ctx.relayer_model.clone();
1401 let mut provider = MockStellarProviderTrait::new();
1402
1403 provider
1404 .expect_get_account()
1405 .with(eq(relayer_model.address.clone()))
1406 .returning(|_| {
1407 Box::pin(async { Err(ProviderError::Other("provider failed".to_string())) })
1408 });
1409
1410 let relayer_repo = Arc::new(MockRelayerRepository::new());
1411 let tx_repo = Arc::new(MockTransactionRepository::new());
1412 let job_producer = Arc::new(MockJobProducerTrait::new());
1413 let counter = Arc::new(MockTransactionCounterServiceTrait::new());
1414 let signer = Arc::new(MockStellarSignTrait::new());
1415 let dex_service = create_mock_dex_service();
1416
1417 let relayer = StellarRelayer::new(
1418 relayer_model,
1419 signer,
1420 provider,
1421 StellarRelayerDependencies::new(
1422 relayer_repo,
1423 ctx.network_repository.clone(),
1424 tx_repo,
1425 counter,
1426 job_producer,
1427 ),
1428 dex_service,
1429 )
1430 .await
1431 .unwrap();
1432
1433 let result = relayer.get_balance().await;
1434 assert!(result.is_err());
1435 match result.err().unwrap() {
1436 RelayerError::ProviderError(msg) => {
1437 assert!(msg.contains("Failed to fetch account for balance"));
1438 }
1439 _ => panic!("Unexpected error type"),
1440 }
1441 }
1442
1443 #[tokio::test]
1444 async fn test_sign_transaction_success() {
1445 let ctx = TestCtx::default();
1446 ctx.setup_network().await;
1447 let relayer_model = ctx.relayer_model.clone();
1448 let provider = MockStellarProviderTrait::new();
1449 let mut signer = MockStellarSignTrait::new();
1450
1451 let unsigned_xdr = "AAAAAgAAAAD///8AAAAAAAAAAQAAAAAAAAACAAAAAQAAAAAAAAAB";
1452 let expected_signed_xdr =
1453 "AAAAAgAAAAD///8AAAAAAAABAAAAAAAAAAIAAAABAAAAAAAAAAEAAAABAAAAA...";
1454 let expected_signature = DecoratedSignature {
1455 hint: SignatureHint([1, 2, 3, 4]),
1456 signature: Signature([5u8; 64].try_into().unwrap()),
1457 };
1458 let expected_signature_for_closure = expected_signature.clone();
1459
1460 signer
1461 .expect_sign_xdr_transaction()
1462 .with(eq(unsigned_xdr), eq("Test SDF Network ; September 2015"))
1463 .returning(move |_, _| {
1464 Ok(SignXdrTransactionResponseStellar {
1465 signed_xdr: expected_signed_xdr.to_string(),
1466 signature: expected_signature_for_closure.clone(),
1467 })
1468 });
1469
1470 let relayer_repo = Arc::new(MockRelayerRepository::new());
1471 let tx_repo = Arc::new(MockTransactionRepository::new());
1472 let job_producer = Arc::new(MockJobProducerTrait::new());
1473 let counter = Arc::new(MockTransactionCounterServiceTrait::new());
1474 let dex_service = create_mock_dex_service();
1475
1476 let relayer = StellarRelayer::new(
1477 relayer_model,
1478 Arc::new(signer),
1479 provider,
1480 StellarRelayerDependencies::new(
1481 relayer_repo,
1482 ctx.network_repository.clone(),
1483 tx_repo,
1484 counter,
1485 job_producer,
1486 ),
1487 dex_service,
1488 )
1489 .await
1490 .unwrap();
1491
1492 let request = SignTransactionRequest::Stellar(SignTransactionRequestStellar {
1493 unsigned_xdr: unsigned_xdr.to_string(),
1494 });
1495 let result = relayer.sign_transaction(&request).await;
1496 assert!(result.is_ok());
1497
1498 match result.unwrap() {
1499 SignTransactionExternalResponse::Stellar(response) => {
1500 assert_eq!(response.signed_xdr, expected_signed_xdr);
1501 let expected_signature_base64 = base64::Engine::encode(
1503 &base64::engine::general_purpose::STANDARD,
1504 &expected_signature.signature.0,
1505 );
1506 assert_eq!(response.signature, expected_signature_base64);
1507 }
1508 _ => panic!("Expected Stellar response"),
1509 }
1510 }
1511
1512 #[tokio::test]
1513 async fn test_sign_transaction_signer_error() {
1514 let ctx = TestCtx::default();
1515 ctx.setup_network().await;
1516 let relayer_model = ctx.relayer_model.clone();
1517 let provider = MockStellarProviderTrait::new();
1518 let mut signer = MockStellarSignTrait::new();
1519
1520 let unsigned_xdr = "INVALID_XDR";
1521
1522 signer
1523 .expect_sign_xdr_transaction()
1524 .with(eq(unsigned_xdr), eq("Test SDF Network ; September 2015"))
1525 .returning(|_, _| Err(SignerError::SigningError("Invalid XDR format".to_string())));
1526
1527 let relayer_repo = Arc::new(MockRelayerRepository::new());
1528 let tx_repo = Arc::new(MockTransactionRepository::new());
1529 let job_producer = Arc::new(MockJobProducerTrait::new());
1530 let counter = Arc::new(MockTransactionCounterServiceTrait::new());
1531 let dex_service = create_mock_dex_service();
1532
1533 let relayer = StellarRelayer::new(
1534 relayer_model,
1535 Arc::new(signer),
1536 provider,
1537 StellarRelayerDependencies::new(
1538 relayer_repo,
1539 ctx.network_repository.clone(),
1540 tx_repo,
1541 counter,
1542 job_producer,
1543 ),
1544 dex_service,
1545 )
1546 .await
1547 .unwrap();
1548
1549 let request = SignTransactionRequest::Stellar(SignTransactionRequestStellar {
1550 unsigned_xdr: unsigned_xdr.to_string(),
1551 });
1552 let result = relayer.sign_transaction(&request).await;
1553 assert!(result.is_err());
1554
1555 match result.err().unwrap() {
1556 RelayerError::SignerError(err) => match err {
1557 SignerError::SigningError(msg) => {
1558 assert_eq!(msg, "Invalid XDR format");
1559 }
1560 _ => panic!("Expected SigningError"),
1561 },
1562 _ => panic!("Expected RelayerError::SignerError"),
1563 }
1564 }
1565
1566 #[tokio::test]
1567 async fn test_sign_transaction_with_different_network_passphrase() {
1568 let ctx = TestCtx::default();
1569 let custom_network = NetworkRepoModel {
1571 id: "stellar:mainnet".to_string(),
1572 name: "mainnet".to_string(),
1573 network_type: NetworkType::Stellar,
1574 config: NetworkConfigData::Stellar(StellarNetworkConfig {
1575 common: NetworkConfigCommon {
1576 network: "mainnet".to_string(),
1577 from: None,
1578 rpc_urls: Some(vec![RpcConfig::new(
1579 "https://horizon.stellar.org".to_string(),
1580 )]),
1581 explorer_urls: None,
1582 average_blocktime_ms: Some(5000),
1583 is_testnet: Some(false),
1584 tags: None,
1585 },
1586 passphrase: Some("Public Global Stellar Network ; September 2015".to_string()),
1587 horizon_url: Some("https://horizon.stellar.org".to_string()),
1588 }),
1589 };
1590 ctx.network_repository.create(custom_network).await.unwrap();
1591
1592 let mut relayer_model = ctx.relayer_model.clone();
1593 relayer_model.network = "mainnet".to_string();
1594
1595 let provider = MockStellarProviderTrait::new();
1596 let mut signer = MockStellarSignTrait::new();
1597
1598 let unsigned_xdr = "AAAAAgAAAAD///8AAAAAAAAAAQAAAAAAAAACAAAAAQAAAAAAAAAB";
1599 let expected_signature = DecoratedSignature {
1600 hint: SignatureHint([10, 20, 30, 40]),
1601 signature: Signature([15u8; 64].try_into().unwrap()),
1602 };
1603 let expected_signature_for_closure = expected_signature.clone();
1604
1605 signer
1606 .expect_sign_xdr_transaction()
1607 .with(
1608 eq(unsigned_xdr),
1609 eq("Public Global Stellar Network ; September 2015"),
1610 )
1611 .returning(move |_, _| {
1612 Ok(SignXdrTransactionResponseStellar {
1613 signed_xdr: "mainnet_signed_xdr".to_string(),
1614 signature: expected_signature_for_closure.clone(),
1615 })
1616 });
1617
1618 let relayer_repo = Arc::new(MockRelayerRepository::new());
1619 let tx_repo = Arc::new(MockTransactionRepository::new());
1620 let job_producer = Arc::new(MockJobProducerTrait::new());
1621 let counter = Arc::new(MockTransactionCounterServiceTrait::new());
1622 let dex_service = create_mock_dex_service();
1623
1624 let relayer = StellarRelayer::new(
1625 relayer_model,
1626 Arc::new(signer),
1627 provider,
1628 StellarRelayerDependencies::new(
1629 relayer_repo,
1630 ctx.network_repository.clone(),
1631 tx_repo,
1632 counter,
1633 job_producer,
1634 ),
1635 dex_service,
1636 )
1637 .await
1638 .unwrap();
1639
1640 let request = SignTransactionRequest::Stellar(SignTransactionRequestStellar {
1641 unsigned_xdr: unsigned_xdr.to_string(),
1642 });
1643 let result = relayer.sign_transaction(&request).await;
1644 assert!(result.is_ok());
1645
1646 match result.unwrap() {
1647 SignTransactionExternalResponse::Stellar(response) => {
1648 assert_eq!(response.signed_xdr, "mainnet_signed_xdr");
1649 let expected_signature_string = base64::Engine::encode(
1651 &base64::engine::general_purpose::STANDARD,
1652 &expected_signature.signature.0,
1653 );
1654 assert_eq!(response.signature, expected_signature_string);
1655 }
1656 _ => panic!("Expected Stellar response"),
1657 }
1658 }
1659
1660 #[tokio::test]
1661 async fn test_initialize_relayer_disables_when_validation_fails() {
1662 let ctx = TestCtx::default();
1663 ctx.setup_network().await;
1664 let mut relayer_model = ctx.relayer_model.clone();
1665 relayer_model.system_disabled = false; relayer_model.notification_id = Some("test-notification-id".to_string());
1667
1668 let mut provider = MockStellarProviderTrait::new();
1669 let mut relayer_repo = MockRelayerRepository::new();
1670 let mut job_producer = MockJobProducerTrait::new();
1671
1672 relayer_repo
1673 .expect_is_persistent_storage()
1674 .returning(|| false);
1675
1676 provider
1678 .expect_get_account()
1679 .returning(|_| Box::pin(ready(Err(ProviderError::Other("RPC error".to_string())))));
1680
1681 let mut disabled_relayer = relayer_model.clone();
1683 disabled_relayer.system_disabled = true;
1684 relayer_repo
1685 .expect_disable_relayer()
1686 .withf(|id, reason| {
1687 id == "test-relayer-id"
1688 && matches!(reason, crate::models::DisabledReason::SequenceSyncFailed(_))
1689 })
1690 .returning(move |_, _| Ok(disabled_relayer.clone()));
1691
1692 job_producer
1694 .expect_produce_send_notification_job()
1695 .returning(|_, _| Box::pin(async { Ok(()) }));
1696
1697 job_producer
1699 .expect_produce_relayer_health_check_job()
1700 .returning(|_, _| Box::pin(async { Ok(()) }));
1701
1702 let tx_repo = MockTransactionRepository::new();
1703 let counter = MockTransactionCounterServiceTrait::new();
1704 let signer = Arc::new(MockStellarSignTrait::new());
1705 let dex_service = create_mock_dex_service();
1706
1707 let relayer = StellarRelayer::new(
1708 relayer_model.clone(),
1709 signer,
1710 provider,
1711 StellarRelayerDependencies::new(
1712 Arc::new(relayer_repo),
1713 ctx.network_repository.clone(),
1714 Arc::new(tx_repo),
1715 Arc::new(counter),
1716 Arc::new(job_producer),
1717 ),
1718 dex_service,
1719 )
1720 .await
1721 .unwrap();
1722
1723 let result = relayer.initialize_relayer().await;
1724 assert!(result.is_ok());
1725 }
1726
1727 #[tokio::test]
1728 async fn test_initialize_relayer_enables_when_validation_passes_and_was_disabled() {
1729 let ctx = TestCtx::default();
1730 ctx.setup_network().await;
1731 let mut relayer_model = ctx.relayer_model.clone();
1732 relayer_model.system_disabled = true; let mut provider = MockStellarProviderTrait::new();
1735 let mut relayer_repo = MockRelayerRepository::new();
1736
1737 relayer_repo
1738 .expect_is_persistent_storage()
1739 .returning(|| false);
1740
1741 provider.expect_get_account().returning(|_| {
1743 Box::pin(ready(Ok(AccountEntry {
1744 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
1745 balance: 1000000000, seq_num: SequenceNumber(1),
1747 num_sub_entries: 0,
1748 inflation_dest: None,
1749 flags: 0,
1750 home_domain: String32::default(),
1751 thresholds: Thresholds([0; 4]),
1752 signers: VecM::default(),
1753 ext: AccountEntryExt::V0,
1754 })))
1755 });
1756
1757 let mut enabled_relayer = relayer_model.clone();
1759 enabled_relayer.system_disabled = false;
1760 relayer_repo
1761 .expect_enable_relayer()
1762 .with(eq("test-relayer-id".to_string()))
1763 .returning(move |_| Ok(enabled_relayer.clone()));
1764
1765 let tx_repo = MockTransactionRepository::new();
1766 let mut counter = MockTransactionCounterServiceTrait::new();
1767 counter
1768 .expect_set()
1769 .returning(|_| Box::pin(async { Ok(()) }));
1770 let signer = Arc::new(MockStellarSignTrait::new());
1771 let dex_service = create_mock_dex_service();
1772 let job_producer = MockJobProducerTrait::new();
1773
1774 let relayer = StellarRelayer::new(
1775 relayer_model.clone(),
1776 signer,
1777 provider,
1778 StellarRelayerDependencies::new(
1779 Arc::new(relayer_repo),
1780 ctx.network_repository.clone(),
1781 Arc::new(tx_repo),
1782 Arc::new(counter),
1783 Arc::new(job_producer),
1784 ),
1785 dex_service,
1786 )
1787 .await
1788 .unwrap();
1789
1790 let result = relayer.initialize_relayer().await;
1791 assert!(result.is_ok());
1792 }
1793
1794 #[tokio::test]
1795 async fn test_initialize_relayer_no_action_when_enabled_and_validation_passes() {
1796 let ctx = TestCtx::default();
1797 ctx.setup_network().await;
1798 let mut relayer_model = ctx.relayer_model.clone();
1799 relayer_model.system_disabled = false; let mut provider = MockStellarProviderTrait::new();
1802
1803 provider.expect_get_account().returning(|_| {
1805 Box::pin(ready(Ok(AccountEntry {
1806 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
1807 balance: 1000000000, seq_num: SequenceNumber(1),
1809 num_sub_entries: 0,
1810 inflation_dest: None,
1811 flags: 0,
1812 home_domain: String32::default(),
1813 thresholds: Thresholds([0; 4]),
1814 signers: VecM::default(),
1815 ext: AccountEntryExt::V0,
1816 })))
1817 });
1818
1819 let tx_repo = MockTransactionRepository::new();
1822 let mut counter = MockTransactionCounterServiceTrait::new();
1823 counter
1824 .expect_set()
1825 .returning(|_| Box::pin(async { Ok(()) }));
1826 let signer = Arc::new(MockStellarSignTrait::new());
1827 let dex_service = create_mock_dex_service();
1828 let job_producer = MockJobProducerTrait::new();
1829 let mut relayer_repo = MockRelayerRepository::new();
1830
1831 relayer_repo
1832 .expect_is_persistent_storage()
1833 .returning(|| false);
1834
1835 let relayer = StellarRelayer::new(
1836 relayer_model.clone(),
1837 signer,
1838 provider,
1839 StellarRelayerDependencies::new(
1840 Arc::new(relayer_repo),
1841 ctx.network_repository.clone(),
1842 Arc::new(tx_repo),
1843 Arc::new(counter),
1844 Arc::new(job_producer),
1845 ),
1846 dex_service,
1847 )
1848 .await
1849 .unwrap();
1850
1851 let result = relayer.initialize_relayer().await;
1852 assert!(result.is_ok());
1853 }
1854
1855 #[tokio::test]
1856 async fn test_initialize_relayer_sends_notification_when_disabled() {
1857 let ctx = TestCtx::default();
1858 ctx.setup_network().await;
1859 let mut relayer_model = ctx.relayer_model.clone();
1860 relayer_model.system_disabled = false; relayer_model.notification_id = Some("test-notification-id".to_string());
1862
1863 let mut provider = MockStellarProviderTrait::new();
1864 let mut relayer_repo = MockRelayerRepository::new();
1865 let mut job_producer = MockJobProducerTrait::new();
1866
1867 relayer_repo
1868 .expect_is_persistent_storage()
1869 .returning(|| false);
1870
1871 provider.expect_get_account().returning(|_| {
1873 Box::pin(ready(Err(ProviderError::Other(
1874 "Sequence sync failed".to_string(),
1875 ))))
1876 });
1877
1878 let mut disabled_relayer = relayer_model.clone();
1880 disabled_relayer.system_disabled = true;
1881 relayer_repo
1882 .expect_disable_relayer()
1883 .withf(|id, reason| {
1884 id == "test-relayer-id"
1885 && matches!(reason, crate::models::DisabledReason::SequenceSyncFailed(_))
1886 })
1887 .returning(move |_, _| Ok(disabled_relayer.clone()));
1888
1889 job_producer
1891 .expect_produce_send_notification_job()
1892 .returning(|_, _| Box::pin(async { Ok(()) }));
1893
1894 job_producer
1896 .expect_produce_relayer_health_check_job()
1897 .returning(|_, _| Box::pin(async { Ok(()) }));
1898
1899 let tx_repo = MockTransactionRepository::new();
1900 let counter = MockTransactionCounterServiceTrait::new();
1901 let signer = Arc::new(MockStellarSignTrait::new());
1902 let dex_service = create_mock_dex_service();
1903
1904 let relayer = StellarRelayer::new(
1905 relayer_model.clone(),
1906 signer,
1907 provider,
1908 StellarRelayerDependencies::new(
1909 Arc::new(relayer_repo),
1910 ctx.network_repository.clone(),
1911 Arc::new(tx_repo),
1912 Arc::new(counter),
1913 Arc::new(job_producer),
1914 ),
1915 dex_service,
1916 )
1917 .await
1918 .unwrap();
1919
1920 let result = relayer.initialize_relayer().await;
1921 assert!(result.is_ok());
1922 }
1923
1924 #[tokio::test]
1925 async fn test_initialize_relayer_no_notification_when_no_notification_id() {
1926 let ctx = TestCtx::default();
1927 ctx.setup_network().await;
1928 let mut relayer_model = ctx.relayer_model.clone();
1929 relayer_model.system_disabled = false; relayer_model.notification_id = None; let mut provider = MockStellarProviderTrait::new();
1933 let mut relayer_repo = MockRelayerRepository::new();
1934 relayer_repo
1935 .expect_is_persistent_storage()
1936 .returning(|| false);
1937
1938 provider.expect_get_account().returning(|_| {
1940 Box::pin(ready(Err(ProviderError::Other(
1941 "Sequence sync failed".to_string(),
1942 ))))
1943 });
1944
1945 let mut disabled_relayer = relayer_model.clone();
1947 disabled_relayer.system_disabled = true;
1948 relayer_repo
1949 .expect_disable_relayer()
1950 .withf(|id, reason| {
1951 id == "test-relayer-id"
1952 && matches!(reason, crate::models::DisabledReason::SequenceSyncFailed(_))
1953 })
1954 .returning(move |_, _| Ok(disabled_relayer.clone()));
1955
1956 let mut job_producer = MockJobProducerTrait::new();
1959 job_producer
1960 .expect_produce_relayer_health_check_job()
1961 .returning(|_, _| Box::pin(async { Ok(()) }));
1962
1963 let tx_repo = MockTransactionRepository::new();
1964 let counter = MockTransactionCounterServiceTrait::new();
1965 let signer = Arc::new(MockStellarSignTrait::new());
1966 let dex_service = create_mock_dex_service();
1967
1968 let relayer = StellarRelayer::new(
1969 relayer_model.clone(),
1970 signer,
1971 provider,
1972 StellarRelayerDependencies::new(
1973 Arc::new(relayer_repo),
1974 ctx.network_repository.clone(),
1975 Arc::new(tx_repo),
1976 Arc::new(counter),
1977 Arc::new(job_producer),
1978 ),
1979 dex_service,
1980 )
1981 .await
1982 .unwrap();
1983
1984 let result = relayer.initialize_relayer().await;
1985 assert!(result.is_ok());
1986 }
1987
1988 mod process_transaction_request_tests {
1989 use super::*;
1990 use crate::constants::STELLAR_STATUS_CHECK_INITIAL_DELAY_SECONDS;
1991 use crate::models::{
1992 NetworkTransactionRequest, NetworkType, StellarTransactionRequest, TransactionStatus,
1993 };
1994 use chrono::Utc;
1995
1996 fn create_test_transaction_request() -> NetworkTransactionRequest {
1998 NetworkTransactionRequest::Stellar(StellarTransactionRequest {
1999 source_account: None,
2000 network: "testnet".to_string(),
2001 operations: None,
2002 memo: None,
2003 valid_until: None,
2004 transaction_xdr: Some("AAAAAgAAAACige4lTdwSB/sto4SniEdJ2kOa2X65s5bqkd40J4DjSwAAAAEAAHAkAAAADwAAAAAAAAAAAAAAAQAAAAAAAAABAAAAAKKB7iVN3BIH+y2jhKeIR0naQ5rZfrmzluqR3jQngONLAAAAAAAAAAAAD0JAAAAAAAAAAAA=".to_string()),
2005 fee_bump: None,
2006 max_fee: None,
2007 signed_auth_entry: None,
2008 })
2009 }
2010
2011 #[tokio::test]
2012 async fn test_process_transaction_request_calls_job_producer_methods() {
2013 let ctx = TestCtx::default();
2014 ctx.setup_network().await;
2015 let relayer_model = ctx.relayer_model.clone();
2016
2017 let provider = MockStellarProviderTrait::new();
2018 let signer = Arc::new(MockStellarSignTrait::new());
2019 let dex_service = create_mock_dex_service();
2020
2021 let tx_request = create_test_transaction_request();
2023
2024 let mut tx_repo = MockTransactionRepository::new();
2026 tx_repo.expect_create().returning(|t| Ok(t.clone()));
2027
2028 let mut job_producer = MockJobProducerTrait::new();
2030
2031 job_producer
2033 .expect_produce_transaction_request_job()
2034 .withf(|req, delay| {
2035 !req.transaction_id.is_empty() && !req.relayer_id.is_empty() && delay.is_none()
2036 })
2037 .times(1)
2038 .returning(|_, _| Box::pin(async { Ok(()) }));
2039
2040 job_producer
2042 .expect_produce_check_transaction_status_job()
2043 .withf(|check, delay| {
2044 !check.transaction_id.is_empty()
2045 && !check.relayer_id.is_empty()
2046 && check.network_type == Some(NetworkType::Stellar)
2047 && delay.is_some()
2048 })
2049 .times(1)
2050 .returning(|_, _| Box::pin(async { Ok(()) }));
2051
2052 let relayer_repo = Arc::new(MockRelayerRepository::new());
2053 let counter = MockTransactionCounterServiceTrait::new();
2054
2055 let relayer = StellarRelayer::new(
2056 relayer_model,
2057 signer,
2058 provider,
2059 StellarRelayerDependencies::new(
2060 relayer_repo,
2061 ctx.network_repository.clone(),
2062 Arc::new(tx_repo),
2063 Arc::new(counter),
2064 Arc::new(job_producer),
2065 ),
2066 dex_service,
2067 )
2068 .await
2069 .unwrap();
2070
2071 let result = relayer.process_transaction_request(tx_request).await;
2072 if let Err(e) = &result {
2073 panic!("process_transaction_request failed: {e}");
2074 }
2075 assert!(result.is_ok());
2076 }
2077
2078 #[tokio::test]
2079 async fn test_process_transaction_request_with_scheduled_delay() {
2080 let ctx = TestCtx::default();
2081 ctx.setup_network().await;
2082 let relayer_model = ctx.relayer_model.clone();
2083
2084 let provider = MockStellarProviderTrait::new();
2085 let signer = Arc::new(MockStellarSignTrait::new());
2086 let dex_service = create_mock_dex_service();
2087
2088 let tx_request = create_test_transaction_request();
2089
2090 let mut tx_repo = MockTransactionRepository::new();
2091 tx_repo.expect_create().returning(|t| Ok(t.clone()));
2092
2093 let mut job_producer = MockJobProducerTrait::new();
2094
2095 job_producer
2096 .expect_produce_transaction_request_job()
2097 .returning(|_, _| Box::pin(async { Ok(()) }));
2098
2099 job_producer
2101 .expect_produce_check_transaction_status_job()
2102 .withf(|_, delay| {
2103 if let Some(scheduled_at) = delay {
2105 let now = Utc::now().timestamp();
2107 let diff = scheduled_at - now;
2108 ((STELLAR_STATUS_CHECK_INITIAL_DELAY_SECONDS - 2)
2110 ..=(STELLAR_STATUS_CHECK_INITIAL_DELAY_SECONDS + 2))
2111 .contains(&diff)
2112 } else {
2113 false
2114 }
2115 })
2116 .times(1)
2117 .returning(|_, _| Box::pin(async { Ok(()) }));
2118
2119 let relayer_repo = Arc::new(MockRelayerRepository::new());
2120 let counter = MockTransactionCounterServiceTrait::new();
2121
2122 let relayer = StellarRelayer::new(
2123 relayer_model,
2124 signer,
2125 provider,
2126 StellarRelayerDependencies::new(
2127 relayer_repo,
2128 ctx.network_repository.clone(),
2129 Arc::new(tx_repo),
2130 Arc::new(counter),
2131 Arc::new(job_producer),
2132 ),
2133 dex_service,
2134 )
2135 .await
2136 .unwrap();
2137
2138 let result = relayer.process_transaction_request(tx_request).await;
2139 assert!(result.is_ok());
2140 }
2141
2142 #[tokio::test]
2143 async fn test_process_transaction_request_repository_failure() {
2144 let ctx = TestCtx::default();
2145 ctx.setup_network().await;
2146 let relayer_model = ctx.relayer_model.clone();
2147
2148 let provider = MockStellarProviderTrait::new();
2149 let signer = Arc::new(MockStellarSignTrait::new());
2150 let dex_service = create_mock_dex_service();
2151
2152 let tx_request = create_test_transaction_request();
2153
2154 let mut tx_repo = MockTransactionRepository::new();
2156 tx_repo.expect_create().returning(|_| {
2157 Err(RepositoryError::TransactionFailure(
2158 "Database connection failed".to_string(),
2159 ))
2160 });
2161
2162 let job_producer = MockJobProducerTrait::new();
2164
2165 let relayer_repo = Arc::new(MockRelayerRepository::new());
2166 let counter = MockTransactionCounterServiceTrait::new();
2167
2168 let relayer = StellarRelayer::new(
2169 relayer_model,
2170 signer,
2171 provider,
2172 StellarRelayerDependencies::new(
2173 relayer_repo,
2174 ctx.network_repository.clone(),
2175 Arc::new(tx_repo),
2176 Arc::new(counter),
2177 Arc::new(job_producer),
2178 ),
2179 dex_service,
2180 )
2181 .await
2182 .unwrap();
2183
2184 let result = relayer.process_transaction_request(tx_request).await;
2185 assert!(result.is_err());
2186 let err_msg = result.err().unwrap().to_string();
2188 assert!(
2189 err_msg.contains("Database connection failed"),
2190 "Error was: {err_msg}"
2191 );
2192 }
2193
2194 #[tokio::test]
2195 async fn test_process_transaction_request_job_producer_request_failure() {
2196 let ctx = TestCtx::default();
2197 ctx.setup_network().await;
2198 let relayer_model = ctx.relayer_model.clone();
2199
2200 let provider = MockStellarProviderTrait::new();
2201 let signer = Arc::new(MockStellarSignTrait::new());
2202 let dex_service = create_mock_dex_service();
2203
2204 let tx_request = create_test_transaction_request();
2205
2206 let mut tx_repo = MockTransactionRepository::new();
2207 tx_repo.expect_create().returning(|t| Ok(t.clone()));
2208
2209 let mut job_producer = MockJobProducerTrait::new();
2210
2211 job_producer
2213 .expect_produce_check_transaction_status_job()
2214 .returning(|_, _| Box::pin(async { Ok(()) }));
2215
2216 job_producer
2219 .expect_produce_transaction_request_job()
2220 .returning(|_, _| {
2221 Box::pin(async {
2222 Err(crate::jobs::JobProducerError::QueueError(
2223 "Queue is full".to_string(),
2224 ))
2225 })
2226 });
2227
2228 let relayer_repo = Arc::new(MockRelayerRepository::new());
2229 let counter = MockTransactionCounterServiceTrait::new();
2230
2231 let relayer = StellarRelayer::new(
2232 relayer_model,
2233 signer,
2234 provider,
2235 StellarRelayerDependencies::new(
2236 relayer_repo,
2237 ctx.network_repository.clone(),
2238 Arc::new(tx_repo),
2239 Arc::new(counter),
2240 Arc::new(job_producer),
2241 ),
2242 dex_service,
2243 )
2244 .await
2245 .unwrap();
2246
2247 let result = relayer.process_transaction_request(tx_request).await;
2248 assert!(result.is_err());
2249 }
2250
2251 #[tokio::test]
2252 async fn test_process_transaction_request_job_producer_status_check_failure() {
2253 let ctx = TestCtx::default();
2254 ctx.setup_network().await;
2255 let relayer_model = ctx.relayer_model.clone();
2256
2257 let provider = MockStellarProviderTrait::new();
2258 let signer = Arc::new(MockStellarSignTrait::new());
2259 let dex_service = create_mock_dex_service();
2260
2261 let tx_request = create_test_transaction_request();
2262
2263 let mut tx_repo = MockTransactionRepository::new();
2264 tx_repo.expect_create().returning(|t| Ok(t.clone()));
2265 tx_repo
2267 .expect_partial_update()
2268 .returning(|_, _| Ok(TransactionRepoModel::default()));
2269
2270 let mut job_producer = MockJobProducerTrait::new();
2271
2272 job_producer
2275 .expect_produce_check_transaction_status_job()
2276 .returning(|_, _| {
2277 Box::pin(async {
2278 Err(crate::jobs::JobProducerError::QueueError(
2279 "Failed to queue job".to_string(),
2280 ))
2281 })
2282 });
2283
2284 let relayer_repo = Arc::new(MockRelayerRepository::new());
2288 let counter = MockTransactionCounterServiceTrait::new();
2289
2290 let relayer = StellarRelayer::new(
2291 relayer_model,
2292 signer,
2293 provider,
2294 StellarRelayerDependencies::new(
2295 relayer_repo,
2296 ctx.network_repository.clone(),
2297 Arc::new(tx_repo),
2298 Arc::new(counter),
2299 Arc::new(job_producer),
2300 ),
2301 dex_service,
2302 )
2303 .await
2304 .unwrap();
2305
2306 let result = relayer.process_transaction_request(tx_request).await;
2307 assert!(result.is_err());
2308 }
2309
2310 #[tokio::test]
2311 async fn test_process_transaction_request_status_check_failure_marks_tx_failed() {
2312 let ctx = TestCtx::default();
2315 ctx.setup_network().await;
2316 let relayer_model = ctx.relayer_model.clone();
2317
2318 let provider = MockStellarProviderTrait::new();
2319 let signer = Arc::new(MockStellarSignTrait::new());
2320 let dex_service = create_mock_dex_service();
2321
2322 let tx_request = create_test_transaction_request();
2323
2324 let mut tx_repo = MockTransactionRepository::new();
2325 tx_repo.expect_create().returning(|t| Ok(t.clone()));
2326
2327 tx_repo
2329 .expect_partial_update()
2330 .withf(|_tx_id, update| {
2331 update.status == Some(TransactionStatus::Failed)
2332 && update.status_reason == Some("Queue unavailable".to_string())
2333 })
2334 .returning(|_, _| Ok(TransactionRepoModel::default()));
2335
2336 let mut job_producer = MockJobProducerTrait::new();
2337 job_producer
2338 .expect_produce_check_transaction_status_job()
2339 .returning(|_, _| {
2340 Box::pin(async {
2341 Err(crate::jobs::JobProducerError::QueueError(
2342 "Redis timeout".to_string(),
2343 ))
2344 })
2345 });
2346
2347 let relayer_repo = Arc::new(MockRelayerRepository::new());
2348 let counter = MockTransactionCounterServiceTrait::new();
2349
2350 let relayer = StellarRelayer::new(
2351 relayer_model,
2352 signer,
2353 provider,
2354 StellarRelayerDependencies::new(
2355 relayer_repo,
2356 ctx.network_repository.clone(),
2357 Arc::new(tx_repo),
2358 Arc::new(counter),
2359 Arc::new(job_producer),
2360 ),
2361 dex_service,
2362 )
2363 .await
2364 .unwrap();
2365
2366 let result = relayer.process_transaction_request(tx_request).await;
2367 assert!(result.is_err());
2368 }
2370
2371 #[tokio::test]
2372 async fn test_process_transaction_request_preserves_transaction_data() {
2373 let ctx = TestCtx::default();
2374 ctx.setup_network().await;
2375 let relayer_model = ctx.relayer_model.clone();
2376
2377 let provider = MockStellarProviderTrait::new();
2378 let signer = Arc::new(MockStellarSignTrait::new());
2379 let dex_service = create_mock_dex_service();
2380
2381 let tx_request = create_test_transaction_request();
2382
2383 let mut tx_repo = MockTransactionRepository::new();
2384 tx_repo.expect_create().returning(|t| Ok(t.clone()));
2385
2386 let mut job_producer = MockJobProducerTrait::new();
2387 job_producer
2388 .expect_produce_transaction_request_job()
2389 .returning(|_, _| Box::pin(async { Ok(()) }));
2390 job_producer
2391 .expect_produce_check_transaction_status_job()
2392 .returning(|_, _| Box::pin(async { Ok(()) }));
2393
2394 let relayer_repo = Arc::new(MockRelayerRepository::new());
2395 let counter = MockTransactionCounterServiceTrait::new();
2396
2397 let relayer = StellarRelayer::new(
2398 relayer_model.clone(),
2399 signer,
2400 provider,
2401 StellarRelayerDependencies::new(
2402 relayer_repo,
2403 ctx.network_repository.clone(),
2404 Arc::new(tx_repo),
2405 Arc::new(counter),
2406 Arc::new(job_producer),
2407 ),
2408 dex_service,
2409 )
2410 .await
2411 .unwrap();
2412
2413 let result = relayer.process_transaction_request(tx_request).await;
2414 assert!(result.is_ok());
2415
2416 let returned_tx = result.unwrap();
2417 assert_eq!(returned_tx.relayer_id, relayer_model.id);
2418 assert_eq!(returned_tx.network_type, NetworkType::Stellar);
2419 assert_eq!(returned_tx.status, TransactionStatus::Pending);
2420 }
2421 }
2422
2423 mod populate_allowed_tokens_metadata_tests {
2425 use super::*;
2426 use crate::models::StellarTokenKind;
2427
2428 #[tokio::test]
2429 async fn test_populate_allowed_tokens_metadata_no_tokens() {
2430 let ctx = TestCtx::default();
2431 ctx.setup_network().await;
2432 let relayer_model = ctx.relayer_model.clone();
2433
2434 let provider = MockStellarProviderTrait::new();
2435 let signer = Arc::new(MockStellarSignTrait::new());
2436 let dex_service = create_mock_dex_service();
2437
2438 let mut relayer_repo = MockRelayerRepository::new();
2439 relayer_repo.expect_update_policy().times(0);
2441
2442 let tx_repo = MockTransactionRepository::new();
2443 let job_producer = MockJobProducerTrait::new();
2444 let counter = MockTransactionCounterServiceTrait::new();
2445
2446 let relayer = StellarRelayer::new(
2447 relayer_model.clone(),
2448 signer,
2449 provider,
2450 StellarRelayerDependencies::new(
2451 Arc::new(relayer_repo),
2452 ctx.network_repository.clone(),
2453 Arc::new(tx_repo),
2454 Arc::new(counter),
2455 Arc::new(job_producer),
2456 ),
2457 dex_service,
2458 )
2459 .await
2460 .unwrap();
2461
2462 let result = relayer.populate_allowed_tokens_metadata().await;
2463 assert!(result.is_ok());
2464 }
2465
2466 #[tokio::test]
2467 async fn test_populate_allowed_tokens_metadata_empty_tokens() {
2468 let ctx = TestCtx::default();
2469 ctx.setup_network().await;
2470 let mut relayer_model = ctx.relayer_model.clone();
2471
2472 let mut policy = RelayerStellarPolicy::default();
2474 policy.allowed_tokens = Some(vec![]);
2475 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
2476
2477 let provider = MockStellarProviderTrait::new();
2478 let signer = Arc::new(MockStellarSignTrait::new());
2479 let dex_service = create_mock_dex_service();
2480
2481 let mut relayer_repo = MockRelayerRepository::new();
2482 relayer_repo.expect_update_policy().times(0);
2484
2485 let tx_repo = MockTransactionRepository::new();
2486 let job_producer = MockJobProducerTrait::new();
2487 let counter = MockTransactionCounterServiceTrait::new();
2488
2489 let relayer = StellarRelayer::new(
2490 relayer_model.clone(),
2491 signer,
2492 provider,
2493 StellarRelayerDependencies::new(
2494 Arc::new(relayer_repo),
2495 ctx.network_repository.clone(),
2496 Arc::new(tx_repo),
2497 Arc::new(counter),
2498 Arc::new(job_producer),
2499 ),
2500 dex_service,
2501 )
2502 .await
2503 .unwrap();
2504
2505 let result = relayer.populate_allowed_tokens_metadata().await;
2506 assert!(result.is_ok());
2507 }
2508
2509 #[tokio::test]
2510 async fn test_populate_allowed_tokens_metadata_classic_asset_success() {
2511 let ctx = TestCtx::default();
2512 ctx.setup_network().await;
2513 let mut relayer_model = ctx.relayer_model.clone();
2514
2515 let mut policy = RelayerStellarPolicy::default();
2517 policy.allowed_tokens = Some(vec![crate::models::StellarAllowedTokensPolicy {
2518 asset: "USDC:GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5".to_string(),
2519 metadata: None,
2520 max_allowed_fee: None,
2521 swap_config: None,
2522 }]);
2523 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
2524
2525 let provider = MockStellarProviderTrait::new();
2526 let signer = Arc::new(MockStellarSignTrait::new());
2527 let dex_service = create_mock_dex_service();
2528
2529 let mut relayer_repo = MockRelayerRepository::new();
2530 relayer_repo
2531 .expect_update_policy()
2532 .times(1)
2533 .returning(|_, _| Ok(RelayerRepoModel::default()));
2534
2535 let tx_repo = MockTransactionRepository::new();
2536 let job_producer = MockJobProducerTrait::new();
2537 let counter = MockTransactionCounterServiceTrait::new();
2538
2539 let relayer = StellarRelayer::new(
2540 relayer_model.clone(),
2541 signer,
2542 provider,
2543 StellarRelayerDependencies::new(
2544 Arc::new(relayer_repo),
2545 ctx.network_repository.clone(),
2546 Arc::new(tx_repo),
2547 Arc::new(counter),
2548 Arc::new(job_producer),
2549 ),
2550 dex_service,
2551 )
2552 .await
2553 .unwrap();
2554
2555 let result = relayer.populate_allowed_tokens_metadata().await;
2556 assert!(result.is_ok());
2557
2558 let updated_policy = result.unwrap();
2559 assert!(updated_policy.allowed_tokens.is_some());
2560
2561 let tokens = updated_policy.allowed_tokens.unwrap();
2562 assert_eq!(tokens.len(), 1);
2563
2564 let token = &tokens[0];
2566 assert!(token.metadata.is_some());
2567
2568 let metadata = token.metadata.as_ref().unwrap();
2569 assert_eq!(metadata.decimals, 7); assert_eq!(
2571 metadata.canonical_asset_id,
2572 "USDC:GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5"
2573 );
2574
2575 match &metadata.kind {
2577 StellarTokenKind::Classic { code, issuer } => {
2578 assert_eq!(code, "USDC");
2579 assert_eq!(
2580 issuer,
2581 "GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5"
2582 );
2583 }
2584 _ => panic!("Expected Classic token kind"),
2585 }
2586 }
2587
2588 #[tokio::test]
2589 async fn test_populate_allowed_tokens_metadata_multiple_tokens() {
2590 let ctx = TestCtx::default();
2591 ctx.setup_network().await;
2592 let mut relayer_model = ctx.relayer_model.clone();
2593
2594 let mut policy = RelayerStellarPolicy::default();
2596 policy.allowed_tokens = Some(vec![
2597 crate::models::StellarAllowedTokensPolicy {
2598 asset: "USDC:GBBD47IF6LWK7P7MDEVSCWR7DPUWV3NY3DTQEVFL4NAT4AQH3ZLLFLA5"
2599 .to_string(),
2600 metadata: None,
2601 max_allowed_fee: None,
2602 swap_config: None,
2603 },
2604 crate::models::StellarAllowedTokensPolicy {
2605 asset: "AQUA:GAHPYWLK6YRN7CVYZOO4H3VDRZ7PVF5UJGLZCSPAEIKJE2XSWF5LAGER"
2606 .to_string(),
2607 metadata: None,
2608 max_allowed_fee: Some(1000000),
2609 swap_config: None,
2610 },
2611 ]);
2612 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
2613
2614 let provider = MockStellarProviderTrait::new();
2615 let signer = Arc::new(MockStellarSignTrait::new());
2616 let dex_service = create_mock_dex_service();
2617
2618 let mut relayer_repo = MockRelayerRepository::new();
2619 relayer_repo
2620 .expect_update_policy()
2621 .times(1)
2622 .returning(|_, _| Ok(RelayerRepoModel::default()));
2623
2624 let tx_repo = MockTransactionRepository::new();
2625 let job_producer = MockJobProducerTrait::new();
2626 let counter = MockTransactionCounterServiceTrait::new();
2627
2628 let relayer = StellarRelayer::new(
2629 relayer_model.clone(),
2630 signer,
2631 provider,
2632 StellarRelayerDependencies::new(
2633 Arc::new(relayer_repo),
2634 ctx.network_repository.clone(),
2635 Arc::new(tx_repo),
2636 Arc::new(counter),
2637 Arc::new(job_producer),
2638 ),
2639 dex_service,
2640 )
2641 .await
2642 .unwrap();
2643
2644 let result = relayer.populate_allowed_tokens_metadata().await;
2645 assert!(result.is_ok());
2646
2647 let updated_policy = result.unwrap();
2648 let tokens = updated_policy.allowed_tokens.unwrap();
2649 assert_eq!(tokens.len(), 2);
2650
2651 assert!(tokens[0].metadata.is_some());
2653 assert!(tokens[1].metadata.is_some());
2654
2655 let usdc_metadata = tokens[0].metadata.as_ref().unwrap();
2657 match &usdc_metadata.kind {
2658 StellarTokenKind::Classic { code, .. } => {
2659 assert_eq!(code, "USDC");
2660 }
2661 _ => panic!("Expected Classic token kind for USDC"),
2662 }
2663
2664 let aqua_metadata = tokens[1].metadata.as_ref().unwrap();
2666 match &aqua_metadata.kind {
2667 StellarTokenKind::Classic { code, .. } => {
2668 assert_eq!(code, "AQUA");
2669 }
2670 _ => panic!("Expected Classic token kind for AQUA"),
2671 }
2672
2673 assert_eq!(tokens[1].max_allowed_fee, Some(1000000));
2675 }
2676
2677 #[tokio::test]
2678 async fn test_populate_allowed_tokens_metadata_invalid_asset() {
2679 let ctx = TestCtx::default();
2680 ctx.setup_network().await;
2681 let mut relayer_model = ctx.relayer_model.clone();
2682
2683 let mut policy = RelayerStellarPolicy::default();
2685 policy.allowed_tokens = Some(vec![crate::models::StellarAllowedTokensPolicy {
2686 asset: "INVALID_FORMAT".to_string(), metadata: None,
2688 max_allowed_fee: None,
2689 swap_config: None,
2690 }]);
2691 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
2692
2693 let provider = MockStellarProviderTrait::new();
2694 let signer = Arc::new(MockStellarSignTrait::new());
2695 let dex_service = create_mock_dex_service();
2696
2697 let relayer_repo = MockRelayerRepository::new();
2698 let tx_repo = MockTransactionRepository::new();
2699 let job_producer = MockJobProducerTrait::new();
2700 let counter = MockTransactionCounterServiceTrait::new();
2701
2702 let relayer = StellarRelayer::new(
2703 relayer_model.clone(),
2704 signer,
2705 provider,
2706 StellarRelayerDependencies::new(
2707 Arc::new(relayer_repo),
2708 ctx.network_repository.clone(),
2709 Arc::new(tx_repo),
2710 Arc::new(counter),
2711 Arc::new(job_producer),
2712 ),
2713 dex_service,
2714 )
2715 .await
2716 .unwrap();
2717
2718 let result = relayer.populate_allowed_tokens_metadata().await;
2719 assert!(result.is_err());
2720 }
2721 }
2722
2723 mod migrate_fee_payment_strategy_tests {
2725 use super::*;
2726
2727 #[tokio::test]
2728 async fn test_migrate_fee_payment_strategy_in_memory_storage() {
2729 let ctx = TestCtx::default();
2730 ctx.setup_network().await;
2731 let relayer_model = ctx.relayer_model.clone();
2732
2733 let provider = MockStellarProviderTrait::new();
2734 let signer = Arc::new(MockStellarSignTrait::new());
2735 let dex_service = create_mock_dex_service();
2736
2737 let mut relayer_repo = MockRelayerRepository::new();
2738 relayer_repo
2740 .expect_is_persistent_storage()
2741 .returning(|| false);
2742 relayer_repo.expect_update_policy().times(0);
2744
2745 let tx_repo = MockTransactionRepository::new();
2746 let job_producer = MockJobProducerTrait::new();
2747 let counter = MockTransactionCounterServiceTrait::new();
2748
2749 let relayer = StellarRelayer::new(
2750 relayer_model.clone(),
2751 signer,
2752 provider,
2753 StellarRelayerDependencies::new(
2754 Arc::new(relayer_repo),
2755 ctx.network_repository.clone(),
2756 Arc::new(tx_repo),
2757 Arc::new(counter),
2758 Arc::new(job_producer),
2759 ),
2760 dex_service,
2761 )
2762 .await
2763 .unwrap();
2764
2765 let result = relayer.migrate_fee_payment_strategy_if_needed().await;
2766 assert!(result.is_ok());
2767 }
2768
2769 #[tokio::test]
2770 async fn test_migrate_fee_payment_strategy_already_set() {
2771 let ctx = TestCtx::default();
2772 ctx.setup_network().await;
2773 let mut relayer_model = ctx.relayer_model.clone();
2774
2775 let mut policy = RelayerStellarPolicy::default();
2777 policy.fee_payment_strategy = Some(StellarFeePaymentStrategy::User);
2778 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
2779
2780 let provider = MockStellarProviderTrait::new();
2781 let signer = Arc::new(MockStellarSignTrait::new());
2782 let dex_service = create_mock_dex_service();
2783
2784 let mut relayer_repo = MockRelayerRepository::new();
2785 relayer_repo
2786 .expect_is_persistent_storage()
2787 .returning(|| true);
2788 relayer_repo.expect_update_policy().times(0);
2790
2791 let tx_repo = MockTransactionRepository::new();
2792 let job_producer = MockJobProducerTrait::new();
2793 let counter = MockTransactionCounterServiceTrait::new();
2794
2795 let relayer = StellarRelayer::new(
2796 relayer_model.clone(),
2797 signer,
2798 provider,
2799 StellarRelayerDependencies::new(
2800 Arc::new(relayer_repo),
2801 ctx.network_repository.clone(),
2802 Arc::new(tx_repo),
2803 Arc::new(counter),
2804 Arc::new(job_producer),
2805 ),
2806 dex_service,
2807 )
2808 .await
2809 .unwrap();
2810
2811 let result = relayer.migrate_fee_payment_strategy_if_needed().await;
2812 assert!(result.is_ok());
2813 }
2814
2815 #[tokio::test]
2816 async fn test_migrate_fee_payment_strategy_migration_needed() {
2817 let ctx = TestCtx::default();
2818 ctx.setup_network().await;
2819 let relayer_model = ctx.relayer_model.clone();
2820
2821 let provider = MockStellarProviderTrait::new();
2822 let signer = Arc::new(MockStellarSignTrait::new());
2823 let dex_service = create_mock_dex_service();
2824
2825 let mut relayer_repo = MockRelayerRepository::new();
2826 relayer_repo
2827 .expect_is_persistent_storage()
2828 .returning(|| true);
2829 relayer_repo
2830 .expect_update_policy()
2831 .times(1)
2832 .returning(|_, policy| {
2833 if let RelayerNetworkPolicy::Stellar(stellar_policy) = &policy {
2835 assert_eq!(
2836 stellar_policy.fee_payment_strategy,
2837 Some(StellarFeePaymentStrategy::Relayer)
2838 );
2839 }
2840 Ok(RelayerRepoModel::default())
2841 });
2842
2843 let tx_repo = MockTransactionRepository::new();
2844 let job_producer = MockJobProducerTrait::new();
2845 let counter = MockTransactionCounterServiceTrait::new();
2846
2847 let relayer = StellarRelayer::new(
2848 relayer_model.clone(),
2849 signer,
2850 provider,
2851 StellarRelayerDependencies::new(
2852 Arc::new(relayer_repo),
2853 ctx.network_repository.clone(),
2854 Arc::new(tx_repo),
2855 Arc::new(counter),
2856 Arc::new(job_producer),
2857 ),
2858 dex_service,
2859 )
2860 .await
2861 .unwrap();
2862
2863 let result = relayer.migrate_fee_payment_strategy_if_needed().await;
2864 assert!(result.is_ok());
2865 }
2866
2867 #[tokio::test]
2868 async fn test_migrate_fee_payment_strategy_update_fails() {
2869 let ctx = TestCtx::default();
2870 ctx.setup_network().await;
2871 let relayer_model = ctx.relayer_model.clone();
2872
2873 let provider = MockStellarProviderTrait::new();
2874 let signer = Arc::new(MockStellarSignTrait::new());
2875 let dex_service = create_mock_dex_service();
2876
2877 let mut relayer_repo = MockRelayerRepository::new();
2878 relayer_repo
2879 .expect_is_persistent_storage()
2880 .returning(|| true);
2881 relayer_repo
2882 .expect_update_policy()
2883 .times(1)
2884 .returning(|_, _| {
2885 Err(RepositoryError::TransactionFailure(
2886 "Database error".to_string(),
2887 ))
2888 });
2889
2890 let tx_repo = MockTransactionRepository::new();
2891 let job_producer = MockJobProducerTrait::new();
2892 let counter = MockTransactionCounterServiceTrait::new();
2893
2894 let relayer = StellarRelayer::new(
2895 relayer_model.clone(),
2896 signer,
2897 provider,
2898 StellarRelayerDependencies::new(
2899 Arc::new(relayer_repo),
2900 ctx.network_repository.clone(),
2901 Arc::new(tx_repo),
2902 Arc::new(counter),
2903 Arc::new(job_producer),
2904 ),
2905 dex_service,
2906 )
2907 .await
2908 .unwrap();
2909
2910 let result = relayer.migrate_fee_payment_strategy_if_needed().await;
2911 assert!(result.is_err());
2912 assert!(matches!(
2913 result.unwrap_err(),
2914 RelayerError::PolicyConfigurationError(_)
2915 ));
2916 }
2917 }
2918
2919 mod check_balance_and_trigger_token_swap_tests {
2921 use super::*;
2922 use crate::models::RelayerStellarSwapConfig;
2923
2924 #[tokio::test]
2925 async fn test_check_balance_no_swap_config() {
2926 let ctx = TestCtx::default();
2927 ctx.setup_network().await;
2928 let relayer_model = ctx.relayer_model.clone();
2929
2930 let provider = MockStellarProviderTrait::new();
2931 let signer = Arc::new(MockStellarSignTrait::new());
2932 let dex_service = create_mock_dex_service();
2933
2934 let relayer_repo = MockRelayerRepository::new();
2935 let tx_repo = MockTransactionRepository::new();
2936 let job_producer = MockJobProducerTrait::new();
2937 let counter = MockTransactionCounterServiceTrait::new();
2938
2939 let relayer = StellarRelayer::new(
2940 relayer_model.clone(),
2941 signer,
2942 provider,
2943 StellarRelayerDependencies::new(
2944 Arc::new(relayer_repo),
2945 ctx.network_repository.clone(),
2946 Arc::new(tx_repo),
2947 Arc::new(counter),
2948 Arc::new(job_producer),
2949 ),
2950 dex_service,
2951 )
2952 .await
2953 .unwrap();
2954
2955 let result = relayer
2956 .check_balance_and_trigger_token_swap_if_needed()
2957 .await;
2958 assert!(result.is_ok());
2959 }
2960
2961 #[tokio::test]
2962 async fn test_check_balance_no_threshold() {
2963 let ctx = TestCtx::default();
2964 ctx.setup_network().await;
2965 let mut relayer_model = ctx.relayer_model.clone();
2966
2967 let mut policy = RelayerStellarPolicy::default();
2969 policy.swap_config = Some(RelayerStellarSwapConfig {
2970 strategies: vec![],
2971 min_balance_threshold: None,
2972 cron_schedule: None,
2973 });
2974 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
2975
2976 let provider = MockStellarProviderTrait::new();
2977 let signer = Arc::new(MockStellarSignTrait::new());
2978 let dex_service = create_mock_dex_service();
2979
2980 let relayer_repo = MockRelayerRepository::new();
2981 let tx_repo = MockTransactionRepository::new();
2982 let job_producer = MockJobProducerTrait::new();
2983 let counter = MockTransactionCounterServiceTrait::new();
2984
2985 let relayer = StellarRelayer::new(
2986 relayer_model.clone(),
2987 signer,
2988 provider,
2989 StellarRelayerDependencies::new(
2990 Arc::new(relayer_repo),
2991 ctx.network_repository.clone(),
2992 Arc::new(tx_repo),
2993 Arc::new(counter),
2994 Arc::new(job_producer),
2995 ),
2996 dex_service,
2997 )
2998 .await
2999 .unwrap();
3000
3001 let result = relayer
3002 .check_balance_and_trigger_token_swap_if_needed()
3003 .await;
3004 assert!(result.is_ok());
3005 }
3006
3007 #[tokio::test]
3008 async fn test_check_balance_above_threshold() {
3009 let ctx = TestCtx::default();
3010 ctx.setup_network().await;
3011 let mut relayer_model = ctx.relayer_model.clone();
3012
3013 let mut policy = RelayerStellarPolicy::default();
3015 policy.swap_config = Some(RelayerStellarSwapConfig {
3016 strategies: vec![],
3017 min_balance_threshold: Some(1000000), cron_schedule: None,
3019 });
3020 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
3021
3022 let mut provider = MockStellarProviderTrait::new();
3023 provider.expect_get_account().returning(|_| {
3025 Box::pin(async {
3026 Ok(AccountEntry {
3027 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
3028 balance: 10000000, ext: AccountEntryExt::V0,
3030 flags: 0,
3031 home_domain: String32::default(),
3032 inflation_dest: None,
3033 seq_num: SequenceNumber(5),
3034 num_sub_entries: 0,
3035 signers: VecM::default(),
3036 thresholds: Thresholds([0, 0, 0, 0]),
3037 })
3038 })
3039 });
3040
3041 let signer = Arc::new(MockStellarSignTrait::new());
3042 let dex_service = create_mock_dex_service();
3043
3044 let relayer_repo = MockRelayerRepository::new();
3045 let tx_repo = MockTransactionRepository::new();
3046 let job_producer = MockJobProducerTrait::new();
3047 let counter = MockTransactionCounterServiceTrait::new();
3048
3049 let relayer = StellarRelayer::new(
3050 relayer_model.clone(),
3051 signer,
3052 provider,
3053 StellarRelayerDependencies::new(
3054 Arc::new(relayer_repo),
3055 ctx.network_repository.clone(),
3056 Arc::new(tx_repo),
3057 Arc::new(counter),
3058 Arc::new(job_producer),
3059 ),
3060 dex_service,
3061 )
3062 .await
3063 .unwrap();
3064
3065 let result = relayer
3066 .check_balance_and_trigger_token_swap_if_needed()
3067 .await;
3068 assert!(result.is_ok());
3069 }
3070
3071 #[tokio::test]
3072 async fn test_check_balance_provider_error() {
3073 let ctx = TestCtx::default();
3074 ctx.setup_network().await;
3075 let mut relayer_model = ctx.relayer_model.clone();
3076
3077 let mut policy = RelayerStellarPolicy::default();
3079 policy.swap_config = Some(RelayerStellarSwapConfig {
3080 strategies: vec![],
3081 min_balance_threshold: Some(1000000),
3082 cron_schedule: None,
3083 });
3084 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
3085
3086 let mut provider = MockStellarProviderTrait::new();
3087 provider.expect_get_account().returning(|_| {
3088 Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
3089 });
3090
3091 let signer = Arc::new(MockStellarSignTrait::new());
3092 let dex_service = create_mock_dex_service();
3093
3094 let relayer_repo = MockRelayerRepository::new();
3095 let tx_repo = MockTransactionRepository::new();
3096 let job_producer = MockJobProducerTrait::new();
3097 let counter = MockTransactionCounterServiceTrait::new();
3098
3099 let relayer = StellarRelayer::new(
3100 relayer_model.clone(),
3101 signer,
3102 provider,
3103 StellarRelayerDependencies::new(
3104 Arc::new(relayer_repo),
3105 ctx.network_repository.clone(),
3106 Arc::new(tx_repo),
3107 Arc::new(counter),
3108 Arc::new(job_producer),
3109 ),
3110 dex_service,
3111 )
3112 .await
3113 .unwrap();
3114
3115 let result = relayer
3116 .check_balance_and_trigger_token_swap_if_needed()
3117 .await;
3118 assert!(result.is_err());
3119 }
3120 }
3121
3122 mod check_health_tests {
3124 use super::*;
3125 use crate::models::RelayerStellarSwapConfig;
3126
3127 #[tokio::test]
3128 async fn test_check_health_success() {
3129 let ctx = TestCtx::default();
3130 ctx.setup_network().await;
3131 let relayer_model = ctx.relayer_model.clone();
3132
3133 let mut provider = MockStellarProviderTrait::new();
3134 provider.expect_get_account().returning(|_| {
3135 Box::pin(async {
3136 Ok(AccountEntry {
3137 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
3138 balance: 10000000,
3139 ext: AccountEntryExt::V0,
3140 flags: 0,
3141 home_domain: String32::default(),
3142 inflation_dest: None,
3143 seq_num: SequenceNumber(5),
3144 num_sub_entries: 0,
3145 signers: VecM::default(),
3146 thresholds: Thresholds([0, 0, 0, 0]),
3147 })
3148 })
3149 });
3150
3151 let signer = Arc::new(MockStellarSignTrait::new());
3152 let dex_service = create_mock_dex_service();
3153
3154 let relayer_repo = MockRelayerRepository::new();
3155 let tx_repo = MockTransactionRepository::new();
3156 let job_producer = MockJobProducerTrait::new();
3157
3158 let mut counter = MockTransactionCounterServiceTrait::new();
3159 counter
3160 .expect_set()
3161 .returning(|_| Box::pin(async { Ok(()) }));
3162
3163 let relayer = StellarRelayer::new(
3164 relayer_model.clone(),
3165 signer,
3166 provider,
3167 StellarRelayerDependencies::new(
3168 Arc::new(relayer_repo),
3169 ctx.network_repository.clone(),
3170 Arc::new(tx_repo),
3171 Arc::new(counter),
3172 Arc::new(job_producer),
3173 ),
3174 dex_service,
3175 )
3176 .await
3177 .unwrap();
3178
3179 let result = relayer.check_health().await;
3180 assert!(result.is_ok());
3181 }
3182
3183 #[tokio::test]
3184 async fn test_check_health_sequence_sync_fails() {
3185 let ctx = TestCtx::default();
3186 ctx.setup_network().await;
3187 let relayer_model = ctx.relayer_model.clone();
3188
3189 let mut provider = MockStellarProviderTrait::new();
3190 provider.expect_get_account().returning(|_| {
3191 Box::pin(async { Err(ProviderError::Other("Network error".to_string())) })
3192 });
3193
3194 let signer = Arc::new(MockStellarSignTrait::new());
3195 let dex_service = create_mock_dex_service();
3196
3197 let relayer_repo = MockRelayerRepository::new();
3198 let tx_repo = MockTransactionRepository::new();
3199 let job_producer = MockJobProducerTrait::new();
3200 let counter = MockTransactionCounterServiceTrait::new();
3201
3202 let relayer = StellarRelayer::new(
3203 relayer_model.clone(),
3204 signer,
3205 provider,
3206 StellarRelayerDependencies::new(
3207 Arc::new(relayer_repo),
3208 ctx.network_repository.clone(),
3209 Arc::new(tx_repo),
3210 Arc::new(counter),
3211 Arc::new(job_producer),
3212 ),
3213 dex_service,
3214 )
3215 .await
3216 .unwrap();
3217
3218 let result = relayer.check_health().await;
3219 assert!(result.is_err());
3220 let failures = result.unwrap_err();
3221 assert_eq!(failures.len(), 1);
3222 assert!(matches!(
3223 failures[0],
3224 HealthCheckFailure::SequenceSyncFailed(_)
3225 ));
3226 }
3227
3228 #[tokio::test]
3229 async fn test_check_health_with_user_fee_strategy() {
3230 let ctx = TestCtx::default();
3231 ctx.setup_network().await;
3232 let mut relayer_model = ctx.relayer_model.clone();
3233
3234 let mut policy = RelayerStellarPolicy::default();
3236 policy.fee_payment_strategy = Some(StellarFeePaymentStrategy::User);
3237 policy.swap_config = Some(RelayerStellarSwapConfig {
3238 strategies: vec![],
3239 min_balance_threshold: Some(1000000),
3240 cron_schedule: None,
3241 });
3242 relayer_model.policies = RelayerNetworkPolicy::Stellar(policy);
3243
3244 let mut provider = MockStellarProviderTrait::new();
3245 provider.expect_get_account().returning(|_| {
3246 Box::pin(async {
3247 Ok(AccountEntry {
3248 account_id: AccountId(PublicKey::PublicKeyTypeEd25519(Uint256([0; 32]))),
3249 balance: 10000000, ext: AccountEntryExt::V0,
3251 flags: 0,
3252 home_domain: String32::default(),
3253 inflation_dest: None,
3254 seq_num: SequenceNumber(5),
3255 num_sub_entries: 0,
3256 signers: VecM::default(),
3257 thresholds: Thresholds([0, 0, 0, 0]),
3258 })
3259 })
3260 });
3261
3262 let signer = Arc::new(MockStellarSignTrait::new());
3263 let dex_service = create_mock_dex_service();
3264
3265 let relayer_repo = MockRelayerRepository::new();
3266 let tx_repo = MockTransactionRepository::new();
3267 let job_producer = MockJobProducerTrait::new();
3268
3269 let mut counter = MockTransactionCounterServiceTrait::new();
3270 counter
3271 .expect_set()
3272 .returning(|_| Box::pin(async { Ok(()) }));
3273
3274 let relayer = StellarRelayer::new(
3275 relayer_model.clone(),
3276 signer,
3277 provider,
3278 StellarRelayerDependencies::new(
3279 Arc::new(relayer_repo),
3280 ctx.network_repository.clone(),
3281 Arc::new(tx_repo),
3282 Arc::new(counter),
3283 Arc::new(job_producer),
3284 ),
3285 dex_service,
3286 )
3287 .await
3288 .unwrap();
3289
3290 let result = relayer.check_health().await;
3291 assert!(result.is_ok());
3293 }
3294 }
3295
3296 mod rpc_tests {
3298 use super::*;
3299 use crate::models::{JsonRpcId, StellarRpcRequest};
3300
3301 #[tokio::test]
3302 async fn test_rpc_invalid_network_request() {
3303 let ctx = TestCtx::default();
3304 ctx.setup_network().await;
3305 let relayer_model = ctx.relayer_model.clone();
3306
3307 let provider = MockStellarProviderTrait::new();
3308 let signer = Arc::new(MockStellarSignTrait::new());
3309 let dex_service = create_mock_dex_service();
3310
3311 let relayer_repo = MockRelayerRepository::new();
3312 let tx_repo = MockTransactionRepository::new();
3313 let job_producer = MockJobProducerTrait::new();
3314 let counter = MockTransactionCounterServiceTrait::new();
3315
3316 let relayer = StellarRelayer::new(
3317 relayer_model.clone(),
3318 signer,
3319 provider,
3320 StellarRelayerDependencies::new(
3321 Arc::new(relayer_repo),
3322 ctx.network_repository.clone(),
3323 Arc::new(tx_repo),
3324 Arc::new(counter),
3325 Arc::new(job_producer),
3326 ),
3327 dex_service,
3328 )
3329 .await
3330 .unwrap();
3331
3332 let request = JsonRpcRequest {
3334 jsonrpc: "2.0".to_string(),
3335 id: Some(JsonRpcId::Number(1)),
3336 params: NetworkRpcRequest::Evm(crate::models::EvmRpcRequest::RawRpcRequest {
3337 method: "eth_blockNumber".to_string(),
3338 params: serde_json::Value::Null,
3339 }),
3340 };
3341
3342 let result = relayer.rpc(request).await;
3343 assert!(result.is_ok());
3344 let response = result.unwrap();
3345 assert!(response.error.is_some());
3347 }
3348
3349 #[tokio::test]
3350 async fn test_rpc_provider_error() {
3351 let ctx = TestCtx::default();
3352 ctx.setup_network().await;
3353 let relayer_model = ctx.relayer_model.clone();
3354
3355 let mut provider = MockStellarProviderTrait::new();
3356 provider.expect_raw_request_dyn().returning(|_, _, _| {
3357 Box::pin(async { Err(ProviderError::Other("RPC error".to_string())) })
3358 });
3359
3360 let signer = Arc::new(MockStellarSignTrait::new());
3361 let dex_service = create_mock_dex_service();
3362
3363 let relayer_repo = MockRelayerRepository::new();
3364 let tx_repo = MockTransactionRepository::new();
3365 let job_producer = MockJobProducerTrait::new();
3366 let counter = MockTransactionCounterServiceTrait::new();
3367
3368 let relayer = StellarRelayer::new(
3369 relayer_model.clone(),
3370 signer,
3371 provider,
3372 StellarRelayerDependencies::new(
3373 Arc::new(relayer_repo),
3374 ctx.network_repository.clone(),
3375 Arc::new(tx_repo),
3376 Arc::new(counter),
3377 Arc::new(job_producer),
3378 ),
3379 dex_service,
3380 )
3381 .await
3382 .unwrap();
3383
3384 let request = JsonRpcRequest {
3385 jsonrpc: "2.0".to_string(),
3386 id: Some(JsonRpcId::Number(1)),
3387 params: NetworkRpcRequest::Stellar(StellarRpcRequest::RawRpcRequest {
3388 method: "getHealth".to_string(),
3389 params: serde_json::Value::Null,
3390 }),
3391 };
3392
3393 let result = relayer.rpc(request).await;
3394 assert!(result.is_ok());
3395 let response = result.unwrap();
3396 assert!(response.error.is_some());
3398 }
3399
3400 #[tokio::test]
3401 async fn test_rpc_success() {
3402 let ctx = TestCtx::default();
3403 ctx.setup_network().await;
3404 let relayer_model = ctx.relayer_model.clone();
3405
3406 let mut provider = MockStellarProviderTrait::new();
3407 provider.expect_raw_request_dyn().returning(|_, _, _| {
3408 Box::pin(async { Ok(serde_json::json!({"status": "healthy"})) })
3409 });
3410
3411 let signer = Arc::new(MockStellarSignTrait::new());
3412 let dex_service = create_mock_dex_service();
3413
3414 let relayer_repo = MockRelayerRepository::new();
3415 let tx_repo = MockTransactionRepository::new();
3416 let job_producer = MockJobProducerTrait::new();
3417 let counter = MockTransactionCounterServiceTrait::new();
3418
3419 let relayer = StellarRelayer::new(
3420 relayer_model.clone(),
3421 signer,
3422 provider,
3423 StellarRelayerDependencies::new(
3424 Arc::new(relayer_repo),
3425 ctx.network_repository.clone(),
3426 Arc::new(tx_repo),
3427 Arc::new(counter),
3428 Arc::new(job_producer),
3429 ),
3430 dex_service,
3431 )
3432 .await
3433 .unwrap();
3434
3435 let request = JsonRpcRequest {
3436 jsonrpc: "2.0".to_string(),
3437 id: Some(JsonRpcId::Number(1)),
3438 params: NetworkRpcRequest::Stellar(StellarRpcRequest::RawRpcRequest {
3439 method: "getHealth".to_string(),
3440 params: serde_json::Value::Null,
3441 }),
3442 };
3443
3444 let result = relayer.rpc(request).await;
3445 assert!(result.is_ok());
3446 let response = result.unwrap();
3447 assert!(response.error.is_none());
3448 assert!(response.result.is_some());
3449 }
3450 }
3451}