1use chrono::{DateTime, Utc};
6use soroban_rs::xdr::{
7 Error, Hash, InnerTransactionResultResult, InvokeHostFunctionResult, Limits, OperationResult,
8 OperationResultTr, TransactionEnvelope, TransactionResultResult, WriteXdr,
9};
10use tracing::{debug, info, warn};
11
12use super::{is_final_state, StellarRelayerTransaction};
13use crate::constants::{
14 get_stellar_max_stuck_transaction_lifetime, get_stellar_resend_timeout,
15 STELLAR_RESUBMIT_BASE_INTERVAL_SECONDS, STELLAR_RESUBMIT_MAX_INTERVAL_SECONDS,
16};
17use crate::domain::transaction::stellar::prepare::common::send_submit_transaction_job;
18use crate::domain::transaction::stellar::utils::{
19 compute_resubmit_backoff_interval, extract_return_value_from_meta, extract_time_bounds,
20};
21use crate::domain::transaction::util::{get_age_since_created, get_age_since_sent_or_created};
22use crate::domain::xdr_utils::parse_transaction_xdr;
23use crate::{
24 constants::STELLAR_PENDING_RECOVERY_TRIGGER_SECONDS,
25 jobs::{JobProducerTrait, StatusCheckContext, TransactionRequest},
26 models::{
27 NetworkTransactionData, RelayerRepoModel, TransactionError, TransactionRepoModel,
28 TransactionStatus, TransactionUpdateRequest,
29 },
30 repositories::{Repository, TransactionCounterTrait, TransactionRepository},
31 services::{
32 provider::StellarProviderTrait,
33 signer::{Signer, StellarSignTrait},
34 },
35};
36
37impl<R, T, J, S, P, C, D> StellarRelayerTransaction<R, T, J, S, P, C, D>
38where
39 R: Repository<RelayerRepoModel, String> + Send + Sync,
40 T: TransactionRepository + Send + Sync,
41 J: JobProducerTrait + Send + Sync,
42 S: Signer + StellarSignTrait + Send + Sync,
43 P: StellarProviderTrait + Send + Sync,
44 C: TransactionCounterTrait + Send + Sync,
45 D: crate::services::stellar_dex::StellarDexServiceTrait + Send + Sync + 'static,
46{
47 pub async fn handle_transaction_status_impl(
55 &self,
56 tx: TransactionRepoModel,
57 context: Option<StatusCheckContext>,
58 ) -> Result<TransactionRepoModel, TransactionError> {
59 debug!(
60 tx_id = %tx.id,
61 relayer_id = %tx.relayer_id,
62 status = ?tx.status,
63 "handling transaction status"
64 );
65
66 if is_final_state(&tx.status) {
68 debug!(
69 tx_id = %tx.id,
70 relayer_id = %tx.relayer_id,
71 status = ?tx.status,
72 "transaction in final state, skipping status check"
73 );
74 return Ok(tx);
75 }
76
77 if let Some(ref ctx) = context {
79 if ctx.should_force_finalize() {
80 let reason = format!(
81 "Transaction status monitoring failed after {} consecutive errors (total: {}). \
82 Last status: {:?}. Unable to determine final on-chain state.",
83 ctx.consecutive_failures, ctx.total_failures, tx.status
84 );
85 warn!(
86 tx_id = %tx.id,
87 consecutive_failures = ctx.consecutive_failures,
88 total_failures = ctx.total_failures,
89 max_consecutive = ctx.max_consecutive_failures,
90 "circuit breaker triggered, forcing transaction to failed state"
91 );
92 return self.mark_as_failed(tx, reason).await;
96 }
97 }
98
99 match self.status_core(tx.clone()).await {
100 Ok(updated_tx) => {
101 debug!(
102 tx_id = %updated_tx.id,
103 status = ?updated_tx.status,
104 "status check completed successfully"
105 );
106 Ok(updated_tx)
107 }
108 Err(error) => {
109 debug!(
110 tx_id = %tx.id,
111 error = ?error,
112 "status check encountered error"
113 );
114
115 match error {
117 TransactionError::ValidationError(ref msg) => {
118 warn!(
121 tx_id = %tx.id,
122 error = %msg,
123 "validation error detected - marking transaction as failed"
124 );
125
126 self.mark_as_failed(tx, format!("Validation error: {msg}"))
127 .await
128 }
129 _ => {
130 warn!(
133 tx_id = %tx.id,
134 error = ?error,
135 "status check failed with retriable error, will retry"
136 );
137 Err(error)
138 }
139 }
140 }
141 }
142 }
143
144 async fn status_core(
147 &self,
148 tx: TransactionRepoModel,
149 ) -> Result<TransactionRepoModel, TransactionError> {
150 match tx.status {
151 TransactionStatus::Pending => self.handle_pending_state(tx).await,
152 TransactionStatus::Sent => self.handle_sent_state(tx).await,
153 _ => self.handle_submitted_state(tx).await,
154 }
155 }
156
157 pub fn parse_and_validate_hash(
160 &self,
161 tx: &TransactionRepoModel,
162 ) -> Result<Hash, TransactionError> {
163 let stellar_network_data = tx.network_data.get_stellar_transaction_data()?;
164
165 let tx_hash_str = stellar_network_data.hash.as_deref().filter(|s| !s.is_empty()).ok_or_else(|| {
166 TransactionError::ValidationError(format!(
167 "Stellar transaction {} is missing or has an empty on-chain hash in network_data. Cannot check status.",
168 tx.id
169 ))
170 })?;
171
172 let stellar_hash: Hash = tx_hash_str.parse().map_err(|e: Error| {
173 TransactionError::UnexpectedError(format!(
174 "Failed to parse transaction hash '{}' for tx {}: {:?}. This hash may be corrupted or not a valid Stellar hash.",
175 tx_hash_str, tx.id, e
176 ))
177 })?;
178
179 Ok(stellar_hash)
180 }
181
182 pub(super) async fn mark_as_failed(
184 &self,
185 tx: TransactionRepoModel,
186 reason: String,
187 ) -> Result<TransactionRepoModel, TransactionError> {
188 warn!(tx_id = %tx.id, reason = %reason, "marking transaction as failed");
189
190 let update_request = TransactionUpdateRequest {
191 status: Some(TransactionStatus::Failed),
192 status_reason: Some(reason),
193 ..Default::default()
194 };
195
196 let failed_tx = self
197 .finalize_transaction_state(tx.id.clone(), update_request)
198 .await?;
199
200 if let Err(e) = self.enqueue_next_pending_transaction(&tx.id).await {
202 warn!(error = %e, "failed to enqueue next pending transaction after failure");
203 }
204
205 Ok(failed_tx)
206 }
207
208 pub(super) async fn mark_as_expired(
210 &self,
211 tx: TransactionRepoModel,
212 reason: String,
213 ) -> Result<TransactionRepoModel, TransactionError> {
214 info!(tx_id = %tx.id, reason = %reason, "marking transaction as expired");
215
216 let update_request = TransactionUpdateRequest {
217 status: Some(TransactionStatus::Expired),
218 status_reason: Some(reason),
219 ..Default::default()
220 };
221
222 let expired_tx = self
223 .finalize_transaction_state(tx.id.clone(), update_request)
224 .await?;
225
226 if let Err(e) = self.enqueue_next_pending_transaction(&tx.id).await {
228 warn!(tx_id = %tx.id, relayer_id = %tx.relayer_id, error = %e, "failed to enqueue next pending transaction after expiration");
229 }
230
231 Ok(expired_tx)
232 }
233
234 pub(super) fn is_transaction_expired(
236 &self,
237 tx: &TransactionRepoModel,
238 ) -> Result<bool, TransactionError> {
239 if let Some(valid_until_str) = &tx.valid_until {
240 return Ok(Self::is_valid_until_string_expired(valid_until_str));
241 }
242
243 let stellar_data = tx.network_data.get_stellar_transaction_data()?;
245 if let Some(signed_xdr) = &stellar_data.signed_envelope_xdr {
246 if let Ok(envelope) = parse_transaction_xdr(signed_xdr, true) {
247 if let Some(tb) = extract_time_bounds(&envelope) {
248 if tb.max_time.0 == 0 {
249 return Ok(false); }
251 return Ok(Utc::now().timestamp() as u64 > tb.max_time.0);
252 }
253 }
254 }
255
256 Ok(false)
257 }
258
259 fn is_valid_until_string_expired(valid_until: &str) -> bool {
261 if let Ok(dt) = chrono::DateTime::parse_from_rfc3339(valid_until) {
262 return Utc::now() > dt.with_timezone(&Utc);
263 }
264 match valid_until.parse::<i64>() {
265 Ok(0) => false,
266 Ok(ts) => Utc::now().timestamp() > ts,
267 Err(_) => false,
268 }
269 }
270
271 pub async fn handle_stellar_success(
273 &self,
274 tx: TransactionRepoModel,
275 provider_response: soroban_rs::stellar_rpc_client::GetTransactionResponse,
276 ) -> Result<TransactionRepoModel, TransactionError> {
277 let updated_network_data =
279 tx.network_data
280 .get_stellar_transaction_data()
281 .ok()
282 .map(|mut stellar_data| {
283 if let Some(tx_result) = provider_response.result.as_ref() {
285 stellar_data = stellar_data.with_fee(tx_result.fee_charged as u32);
286 }
287
288 if let Some(result_meta) = provider_response.result_meta.as_ref() {
290 if let Some(return_value) = extract_return_value_from_meta(result_meta) {
291 let xdr_base64 = return_value.to_xdr_base64(Limits::none());
292 if let Ok(xdr_base64) = xdr_base64 {
293 stellar_data = stellar_data.with_transaction_result_xdr(xdr_base64);
294 } else {
295 warn!("Failed to serialize return value to XDR base64");
296 }
297 }
298 }
299
300 NetworkTransactionData::Stellar(stellar_data)
301 });
302
303 let update_request = TransactionUpdateRequest {
304 status: Some(TransactionStatus::Confirmed),
305 confirmed_at: Some(Utc::now().to_rfc3339()),
306 network_data: updated_network_data,
307 ..Default::default()
308 };
309
310 let confirmed_tx = self
311 .finalize_transaction_state(tx.id.clone(), update_request)
312 .await?;
313
314 self.enqueue_next_pending_transaction(&tx.id).await?;
315
316 Ok(confirmed_tx)
317 }
318
319 pub async fn handle_stellar_failed(
321 &self,
322 tx: TransactionRepoModel,
323 provider_response: soroban_rs::stellar_rpc_client::GetTransactionResponse,
324 ) -> Result<TransactionRepoModel, TransactionError> {
325 let result_code = provider_response
326 .result
327 .as_ref()
328 .map(|r| r.result.name())
329 .unwrap_or("unknown");
330
331 let (inner_result_code, op_result_code, inner_tx_hash, inner_fee_charged) =
333 match provider_response.result.as_ref().map(|r| &r.result) {
334 Some(TransactionResultResult::TxFeeBumpInnerFailed(pair)) => {
335 let inner = &pair.result.result;
336 let op = match inner {
337 InnerTransactionResultResult::TxFailed(ops) => {
338 first_failing_op(ops.as_slice())
339 }
340 _ => None,
341 };
342 (
343 Some(inner.name()),
344 op,
345 Some(hex::encode(pair.transaction_hash.0)),
346 pair.result.fee_charged,
347 )
348 }
349 Some(TransactionResultResult::TxFailed(ops)) => {
350 (None, first_failing_op(ops.as_slice()), None, 0)
351 }
352 _ => (None, None, None, 0),
353 };
354
355 let fee_charged = provider_response.result.as_ref().map(|r| r.fee_charged);
356 let fee_bid = provider_response.envelope.as_ref().map(extract_fee_bid);
357
358 warn!(
359 tx_id = %tx.id,
360 result_code,
361 inner_result_code = inner_result_code.unwrap_or("n/a"),
362 op_result_code = op_result_code.unwrap_or("n/a"),
363 inner_tx_hash = inner_tx_hash.as_deref().unwrap_or("n/a"),
364 inner_fee_charged,
365 fee_charged = ?fee_charged,
366 fee_bid = ?fee_bid,
367 "stellar transaction failed"
368 );
369
370 let status_reason = format!(
371 "Transaction failed on-chain. Provider status: FAILED. Specific XDR reason: {result_code}."
372 );
373
374 let update_request = TransactionUpdateRequest {
375 status: Some(TransactionStatus::Failed),
376 status_reason: Some(status_reason),
377 ..Default::default()
378 };
379
380 let updated_tx = self
381 .finalize_transaction_state(tx.id.clone(), update_request)
382 .await?;
383
384 self.enqueue_next_pending_transaction(&tx.id).await?;
385
386 Ok(updated_tx)
387 }
388
389 async fn check_expiration_and_max_lifetime(
392 &self,
393 tx: TransactionRepoModel,
394 failed_reason: String,
395 ) -> Option<Result<TransactionRepoModel, TransactionError>> {
396 let age = match get_age_since_created(&tx) {
397 Ok(age) => age,
398 Err(e) => return Some(Err(e)),
399 };
400
401 if let Ok(true) = self.is_transaction_expired(&tx) {
403 info!(tx_id = %tx.id, valid_until = ?tx.valid_until, "Transaction has expired");
404 return Some(
405 self.mark_as_expired(tx, "Transaction time_bounds expired".to_string())
406 .await,
407 );
408 }
409
410 if age > get_stellar_max_stuck_transaction_lifetime() {
412 warn!(tx_id = %tx.id, age_minutes = age.num_minutes(),
413 "Transaction exceeded max lifetime, marking as Failed");
414 return Some(self.mark_as_failed(tx, failed_reason).await);
415 }
416
417 None
418 }
419
420 async fn handle_sent_state(
423 &self,
424 tx: TransactionRepoModel,
425 ) -> Result<TransactionRepoModel, TransactionError> {
426 if let Some(result) = self
428 .check_expiration_and_max_lifetime(
429 tx.clone(),
430 "Transaction stuck in Sent status for too long".to_string(),
431 )
432 .await
433 {
434 return result;
435 }
436
437 let age = get_age_since_sent_or_created(&tx)?;
439 if age > get_stellar_resend_timeout() {
440 info!(tx_id = %tx.id, age_seconds = age.num_seconds(),
441 "re-enqueueing submit job for stuck Sent transaction");
442 send_submit_transaction_job(self.job_producer(), &tx, None).await?;
443 }
444
445 Ok(tx)
446 }
447
448 async fn handle_pending_state(
451 &self,
452 tx: TransactionRepoModel,
453 ) -> Result<TransactionRepoModel, TransactionError> {
454 if let Some(result) = self
456 .check_expiration_and_max_lifetime(
457 tx.clone(),
458 "Transaction stuck in Pending status for too long".to_string(),
459 )
460 .await
461 {
462 return result;
463 }
464
465 let age = self.get_time_since_created_at(&tx)?;
467
468 if age.num_seconds() >= STELLAR_PENDING_RECOVERY_TRIGGER_SECONDS {
471 info!(
472 tx_id = %tx.id,
473 age_seconds = age.num_seconds(),
474 "pending transaction without hash may be stuck, scheduling recovery job"
475 );
476
477 let transaction_request = TransactionRequest::new(tx.id.clone(), tx.relayer_id.clone());
478 if let Err(e) = self
479 .job_producer()
480 .produce_transaction_request_job(transaction_request, None)
481 .await
482 {
483 warn!(
484 tx_id = %tx.id,
485 error = %e,
486 "failed to schedule recovery job for pending transaction"
487 );
488 }
489 } else {
490 debug!(
491 tx_id = %tx.id,
492 age_seconds = age.num_seconds(),
493 "pending transaction without hash too young for recovery check"
494 );
495 }
496
497 Ok(tx)
498 }
499
500 fn get_time_since_created_at(
503 &self,
504 tx: &TransactionRepoModel,
505 ) -> Result<chrono::Duration, TransactionError> {
506 match DateTime::parse_from_rfc3339(&tx.created_at) {
507 Ok(dt) => Ok(Utc::now().signed_duration_since(dt.with_timezone(&Utc))),
508 Err(e) => {
509 warn!(tx_id = %tx.id, ts = %tx.created_at, error = %e, "failed to parse created_at timestamp");
510 Err(TransactionError::UnexpectedError(format!(
511 "Invalid created_at timestamp for transaction {}: {}",
512 tx.id, e
513 )))
514 }
515 }
516 }
517
518 async fn handle_submitted_state(
522 &self,
523 tx: TransactionRepoModel,
524 ) -> Result<TransactionRepoModel, TransactionError> {
525 let stellar_hash = match self.parse_and_validate_hash(&tx) {
526 Ok(hash) => hash,
527 Err(e) => {
528 warn!(
530 tx_id = %tx.id,
531 status = ?tx.status,
532 error = ?e,
533 "failed to parse and validate hash for submitted transaction"
534 );
535 return self
536 .mark_as_failed(tx, format!("Failed to parse and validate hash: {e}"))
537 .await;
538 }
539 };
540
541 let provider_response = match self.provider().get_transaction(&stellar_hash).await {
542 Ok(response) => response,
543 Err(e) => {
544 warn!(error = ?e, "provider get_transaction failed");
545 return Err(TransactionError::from(e));
546 }
547 };
548
549 match provider_response.status.as_str().to_uppercase().as_str() {
550 "SUCCESS" => self.handle_stellar_success(tx, provider_response).await,
551 "FAILED" => self.handle_stellar_failed(tx, provider_response).await,
552 _ => {
553 debug!(
554 tx_id = %tx.id,
555 relayer_id = %tx.relayer_id,
556 status = %provider_response.status,
557 "submitted transaction not yet final on-chain, will retry check later"
558 );
559
560 if let Some(result) = self
562 .check_expiration_and_max_lifetime(
563 tx.clone(),
564 "Transaction stuck in Submitted status for too long".to_string(),
565 )
566 .await
567 {
568 return result;
569 }
570
571 let total_age = get_age_since_created(&tx)?;
574 if let Some(backoff_interval) = compute_resubmit_backoff_interval(
575 total_age,
576 STELLAR_RESUBMIT_BASE_INTERVAL_SECONDS,
577 STELLAR_RESUBMIT_MAX_INTERVAL_SECONDS,
578 ) {
579 let age_since_last_submit = get_age_since_sent_or_created(&tx)?;
580 if age_since_last_submit > backoff_interval {
581 info!(
582 tx_id = %tx.id,
583 relayer_id = %tx.relayer_id,
584 total_age_seconds = total_age.num_seconds(),
585 since_last_submit_seconds = age_since_last_submit.num_seconds(),
586 backoff_interval_seconds = backoff_interval.num_seconds(),
587 "resubmitting Submitted transaction to ensure mempool inclusion"
588 );
589 send_submit_transaction_job(self.job_producer(), &tx, None).await?;
590 }
591 }
592
593 Ok(tx)
594 }
595 }
596 }
597}
598
599fn extract_fee_bid(envelope: &TransactionEnvelope) -> i64 {
604 match envelope {
605 TransactionEnvelope::TxFeeBump(fb) => fb.tx.fee,
606 TransactionEnvelope::Tx(v1) => v1.tx.fee as i64,
607 TransactionEnvelope::TxV0(v0) => v0.tx.fee as i64,
608 }
609}
610
611fn first_failing_op(ops: &[OperationResult]) -> Option<&'static str> {
616 let op = ops.iter().find(|op| match op {
617 OperationResult::OpInner(tr) => match tr {
618 OperationResultTr::InvokeHostFunction(r) => {
619 !matches!(r, InvokeHostFunctionResult::Success(_))
620 }
621 OperationResultTr::ExtendFootprintTtl(r) => r.name() != "Success",
622 OperationResultTr::RestoreFootprint(r) => r.name() != "Success",
623 _ => false,
624 },
625 _ => true,
626 })?;
627 match op {
628 OperationResult::OpInner(tr) => match tr {
629 OperationResultTr::InvokeHostFunction(r) => Some(r.name()),
630 OperationResultTr::ExtendFootprintTtl(r) => Some(r.name()),
631 OperationResultTr::RestoreFootprint(r) => Some(r.name()),
632 _ => Some(tr.name()),
633 },
634 _ => Some(op.name()),
635 }
636}
637
638#[cfg(test)]
639mod tests {
640 use super::*;
641 use crate::models::{NetworkTransactionData, RepositoryError};
642 use crate::repositories::PaginatedResult;
643 use chrono::Duration;
644 use mockall::predicate::eq;
645 use soroban_rs::stellar_rpc_client::GetTransactionResponse;
646
647 use crate::domain::transaction::stellar::test_helpers::*;
648
649 fn dummy_get_transaction_response(status: &str) -> GetTransactionResponse {
650 GetTransactionResponse {
651 status: status.to_string(),
652 ledger: None,
653 envelope: None,
654 result: None,
655 result_meta: None,
656 events: soroban_rs::stellar_rpc_client::GetTransactionEvents {
657 contract_events: vec![],
658 diagnostic_events: vec![],
659 transaction_events: vec![],
660 },
661 }
662 }
663
664 fn dummy_get_transaction_response_with_result_meta(
665 status: &str,
666 has_return_value: bool,
667 ) -> GetTransactionResponse {
668 use soroban_rs::xdr::{ScVal, SorobanTransactionMeta, TransactionMeta, TransactionMetaV3};
669
670 let result_meta = if has_return_value {
671 let return_value = ScVal::I32(42);
673 Some(TransactionMeta::V3(TransactionMetaV3 {
674 ext: soroban_rs::xdr::ExtensionPoint::V0,
675 tx_changes_before: soroban_rs::xdr::LedgerEntryChanges::default(),
676 operations: soroban_rs::xdr::VecM::default(),
677 tx_changes_after: soroban_rs::xdr::LedgerEntryChanges::default(),
678 soroban_meta: Some(SorobanTransactionMeta {
679 ext: soroban_rs::xdr::SorobanTransactionMetaExt::V0,
680 return_value,
681 events: soroban_rs::xdr::VecM::default(),
682 diagnostic_events: soroban_rs::xdr::VecM::default(),
683 }),
684 }))
685 } else {
686 None
687 };
688
689 GetTransactionResponse {
690 status: status.to_string(),
691 ledger: None,
692 envelope: None,
693 result: None,
694 result_meta,
695 events: soroban_rs::stellar_rpc_client::GetTransactionEvents {
696 contract_events: vec![],
697 diagnostic_events: vec![],
698 transaction_events: vec![],
699 },
700 }
701 }
702
703 mod handle_transaction_status_tests {
704 use crate::services::provider::ProviderError;
705
706 use super::*;
707
708 #[tokio::test]
709 async fn handle_transaction_status_confirmed_triggers_next() {
710 let relayer = create_test_relayer();
711 let mut mocks = default_test_mocks();
712
713 let mut tx_to_handle = create_test_transaction(&relayer.id);
714 tx_to_handle.id = "tx-confirm-this".to_string();
715 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
716 let tx_hash_bytes = [1u8; 32];
717 let tx_hash_hex = hex::encode(tx_hash_bytes);
718 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
719 {
720 stellar_data.hash = Some(tx_hash_hex.clone());
721 } else {
722 panic!("Expected Stellar network data for tx_to_handle");
723 }
724 tx_to_handle.status = TransactionStatus::Submitted;
725
726 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
727
728 mocks
730 .provider
731 .expect_get_transaction()
732 .with(eq(expected_stellar_hash.clone()))
733 .times(1)
734 .returning(move |_| {
735 Box::pin(async { Ok(dummy_get_transaction_response("SUCCESS")) })
736 });
737
738 mocks
740 .tx_repo
741 .expect_partial_update()
742 .withf(move |id, update| {
743 id == "tx-confirm-this"
744 && update.status == Some(TransactionStatus::Confirmed)
745 && update.confirmed_at.is_some()
746 })
747 .times(1)
748 .returning(move |id, update| {
749 let mut updated_tx = tx_to_handle.clone(); updated_tx.id = id;
751 updated_tx.status = update.status.unwrap();
752 updated_tx.confirmed_at = update.confirmed_at;
753 Ok(updated_tx)
754 });
755
756 mocks
758 .job_producer
759 .expect_produce_send_notification_job()
760 .times(1)
761 .returning(|_, _| Box::pin(async { Ok(()) }));
762
763 let mut oldest_pending_tx = create_test_transaction(&relayer.id);
765 oldest_pending_tx.id = "tx-oldest-pending".to_string();
766 oldest_pending_tx.status = TransactionStatus::Pending;
767 let captured_oldest_pending_tx = oldest_pending_tx.clone();
768 let relayer_id_clone = relayer.id.clone();
769 mocks
770 .tx_repo
771 .expect_find_by_status_paginated()
772 .withf(move |relayer_id, statuses, query, oldest_first| {
773 *relayer_id == relayer_id_clone
774 && statuses == [TransactionStatus::Pending]
775 && query.page == 1
776 && query.per_page == 1
777 && *oldest_first
778 })
779 .times(1)
780 .returning(move |_, _, _, _| {
781 Ok(PaginatedResult {
782 items: vec![captured_oldest_pending_tx.clone()],
783 total: 1,
784 page: 1,
785 per_page: 1,
786 })
787 });
788
789 mocks
791 .job_producer
792 .expect_produce_transaction_request_job()
793 .withf(move |job, _delay| job.transaction_id == "tx-oldest-pending")
794 .times(1)
795 .returning(|_, _| Box::pin(async { Ok(()) }));
796
797 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
798 let mut initial_tx_for_handling = create_test_transaction(&relayer.id);
799 initial_tx_for_handling.id = "tx-confirm-this".to_string();
800 initial_tx_for_handling.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
801 if let NetworkTransactionData::Stellar(ref mut stellar_data) =
802 initial_tx_for_handling.network_data
803 {
804 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
805 } else {
806 panic!("Expected Stellar network data for initial_tx_for_handling");
807 }
808 initial_tx_for_handling.status = TransactionStatus::Submitted;
809
810 let result = handler
811 .handle_transaction_status_impl(initial_tx_for_handling, None)
812 .await;
813
814 assert!(result.is_ok());
815 let handled_tx = result.unwrap();
816 assert_eq!(handled_tx.id, "tx-confirm-this");
817 assert_eq!(handled_tx.status, TransactionStatus::Confirmed);
818 assert!(handled_tx.confirmed_at.is_some());
819 }
820
821 #[tokio::test]
822 async fn handle_transaction_status_still_pending() {
823 let relayer = create_test_relayer();
824 let mut mocks = default_test_mocks();
825
826 let mut tx_to_handle = create_test_transaction(&relayer.id);
827 tx_to_handle.id = "tx-pending-check".to_string();
828 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
829 let tx_hash_bytes = [2u8; 32];
830 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
831 {
832 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
833 } else {
834 panic!("Expected Stellar network data");
835 }
836 tx_to_handle.status = TransactionStatus::Submitted; let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
839
840 mocks
842 .provider
843 .expect_get_transaction()
844 .with(eq(expected_stellar_hash.clone()))
845 .times(1)
846 .returning(move |_| {
847 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
848 });
849
850 mocks.tx_repo.expect_partial_update().never();
852
853 mocks
855 .job_producer
856 .expect_produce_send_notification_job()
857 .never();
858
859 mocks
861 .job_producer
862 .expect_produce_submit_transaction_job()
863 .times(1)
864 .returning(|_, _| Box::pin(async { Ok(()) }));
865
866 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
867 let original_tx_clone = tx_to_handle.clone();
868
869 let result = handler
870 .handle_transaction_status_impl(tx_to_handle, None)
871 .await;
872
873 assert!(result.is_ok());
874 let returned_tx = result.unwrap();
875 assert_eq!(returned_tx.id, original_tx_clone.id);
877 assert_eq!(returned_tx.status, original_tx_clone.status);
878 assert!(returned_tx.confirmed_at.is_none()); }
880
881 #[tokio::test]
882 async fn handle_transaction_status_failed() {
883 let relayer = create_test_relayer();
884 let mut mocks = default_test_mocks();
885
886 let mut tx_to_handle = create_test_transaction(&relayer.id);
887 tx_to_handle.id = "tx-fail-this".to_string();
888 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
889 let tx_hash_bytes = [3u8; 32];
890 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
891 {
892 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
893 } else {
894 panic!("Expected Stellar network data");
895 }
896 tx_to_handle.status = TransactionStatus::Submitted;
897
898 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
899
900 mocks
902 .provider
903 .expect_get_transaction()
904 .with(eq(expected_stellar_hash.clone()))
905 .times(1)
906 .returning(move |_| {
907 Box::pin(async { Ok(dummy_get_transaction_response("FAILED")) })
908 });
909
910 let relayer_id_for_mock = relayer.id.clone();
912 mocks
913 .tx_repo
914 .expect_partial_update()
915 .times(1)
916 .returning(move |id, update| {
917 let mut updated_tx = create_test_transaction(&relayer_id_for_mock);
919 updated_tx.id = id;
920 updated_tx.status = update.status.unwrap();
921 updated_tx.status_reason = update.status_reason.clone();
922 Ok::<_, RepositoryError>(updated_tx)
923 });
924
925 mocks
927 .job_producer
928 .expect_produce_send_notification_job()
929 .times(1)
930 .returning(|_, _| Box::pin(async { Ok(()) }));
931
932 let relayer_id_clone = relayer.id.clone();
934 mocks
935 .tx_repo
936 .expect_find_by_status_paginated()
937 .withf(move |relayer_id, statuses, query, oldest_first| {
938 *relayer_id == relayer_id_clone
939 && statuses == [TransactionStatus::Pending]
940 && query.page == 1
941 && query.per_page == 1
942 && *oldest_first
943 })
944 .times(1)
945 .returning(move |_, _, _, _| {
946 Ok(PaginatedResult {
947 items: vec![],
948 total: 0,
949 page: 1,
950 per_page: 1,
951 })
952 }); mocks
956 .job_producer
957 .expect_produce_transaction_request_job()
958 .never();
959 mocks
961 .job_producer
962 .expect_produce_check_transaction_status_job()
963 .never();
964
965 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
966 let mut initial_tx_for_handling = create_test_transaction(&relayer.id);
967 initial_tx_for_handling.id = "tx-fail-this".to_string();
968 initial_tx_for_handling.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
969 if let NetworkTransactionData::Stellar(ref mut stellar_data) =
970 initial_tx_for_handling.network_data
971 {
972 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
973 } else {
974 panic!("Expected Stellar network data");
975 }
976 initial_tx_for_handling.status = TransactionStatus::Submitted;
977
978 let result = handler
979 .handle_transaction_status_impl(initial_tx_for_handling, None)
980 .await;
981
982 assert!(result.is_ok());
983 let handled_tx = result.unwrap();
984 assert_eq!(handled_tx.id, "tx-fail-this");
985 assert_eq!(handled_tx.status, TransactionStatus::Failed);
986 assert!(handled_tx.status_reason.is_some());
987 assert_eq!(
988 handled_tx.status_reason.unwrap(),
989 "Transaction failed on-chain. Provider status: FAILED. Specific XDR reason: unknown."
990 );
991 }
992
993 #[tokio::test]
994 async fn handle_transaction_status_provider_error() {
995 let relayer = create_test_relayer();
996 let mut mocks = default_test_mocks();
997
998 let mut tx_to_handle = create_test_transaction(&relayer.id);
999 tx_to_handle.id = "tx-provider-error".to_string();
1000 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1001 let tx_hash_bytes = [4u8; 32];
1002 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1003 {
1004 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
1005 } else {
1006 panic!("Expected Stellar network data");
1007 }
1008 tx_to_handle.status = TransactionStatus::Submitted;
1009
1010 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1011
1012 mocks
1014 .provider
1015 .expect_get_transaction()
1016 .with(eq(expected_stellar_hash.clone()))
1017 .times(1)
1018 .returning(move |_| {
1019 Box::pin(async { Err(ProviderError::Other("RPC boom".to_string())) })
1020 });
1021
1022 mocks.tx_repo.expect_partial_update().never();
1024
1025 mocks
1027 .job_producer
1028 .expect_produce_send_notification_job()
1029 .never();
1030 mocks
1032 .job_producer
1033 .expect_produce_transaction_request_job()
1034 .never();
1035
1036 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1037
1038 let result = handler
1039 .handle_transaction_status_impl(tx_to_handle, None)
1040 .await;
1041
1042 assert!(result.is_err());
1044 matches!(result.unwrap_err(), TransactionError::UnderlyingProvider(_));
1045 }
1046
1047 #[tokio::test]
1048 async fn handle_transaction_status_no_hashes() {
1049 let relayer = create_test_relayer();
1050 let mut mocks = default_test_mocks();
1051
1052 let mut tx_to_handle = create_test_transaction(&relayer.id);
1053 tx_to_handle.id = "tx-no-hashes".to_string();
1054 tx_to_handle.status = TransactionStatus::Submitted;
1055 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1056
1057 mocks.provider.expect_get_transaction().never();
1059
1060 mocks
1062 .tx_repo
1063 .expect_partial_update()
1064 .times(1)
1065 .returning(|_, update| {
1066 let mut updated_tx = create_test_transaction("test-relayer");
1067 updated_tx.status = update.status.unwrap_or(updated_tx.status);
1068 updated_tx.status_reason = update.status_reason.clone();
1069 Ok(updated_tx)
1070 });
1071
1072 mocks
1074 .job_producer
1075 .expect_produce_send_notification_job()
1076 .times(1)
1077 .returning(|_, _| Box::pin(async { Ok(()) }));
1078
1079 let relayer_id_clone = relayer.id.clone();
1081 mocks
1082 .tx_repo
1083 .expect_find_by_status_paginated()
1084 .withf(move |relayer_id, statuses, query, oldest_first| {
1085 *relayer_id == relayer_id_clone
1086 && statuses == [TransactionStatus::Pending]
1087 && query.page == 1
1088 && query.per_page == 1
1089 && *oldest_first
1090 })
1091 .times(1)
1092 .returning(move |_, _, _, _| {
1093 Ok(PaginatedResult {
1094 items: vec![],
1095 total: 0,
1096 page: 1,
1097 per_page: 1,
1098 })
1099 }); let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1102 let result = handler
1103 .handle_transaction_status_impl(tx_to_handle, None)
1104 .await;
1105
1106 assert!(result.is_ok(), "Expected Ok result");
1108 let updated_tx = result.unwrap();
1109 assert_eq!(updated_tx.status, TransactionStatus::Failed);
1110 assert!(
1111 updated_tx
1112 .status_reason
1113 .as_ref()
1114 .unwrap()
1115 .contains("Failed to parse and validate hash"),
1116 "Expected hash validation error in status_reason, got: {:?}",
1117 updated_tx.status_reason
1118 );
1119 }
1120
1121 #[tokio::test]
1122 async fn test_on_chain_failure_does_not_decrement_sequence() {
1123 let relayer = create_test_relayer();
1124 let mut mocks = default_test_mocks();
1125
1126 let mut tx_to_handle = create_test_transaction(&relayer.id);
1127 tx_to_handle.id = "tx-on-chain-fail".to_string();
1128 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1129 let tx_hash_bytes = [4u8; 32];
1130 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1131 {
1132 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
1133 stellar_data.sequence_number = Some(100); }
1135 tx_to_handle.status = TransactionStatus::Submitted;
1136
1137 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1138
1139 mocks
1141 .provider
1142 .expect_get_transaction()
1143 .with(eq(expected_stellar_hash.clone()))
1144 .times(1)
1145 .returning(move |_| {
1146 Box::pin(async { Ok(dummy_get_transaction_response("FAILED")) })
1147 });
1148
1149 mocks.counter.expect_decrement().never();
1151
1152 mocks
1154 .tx_repo
1155 .expect_partial_update()
1156 .times(1)
1157 .returning(move |id, update| {
1158 let mut updated_tx = create_test_transaction("test");
1159 updated_tx.id = id;
1160 updated_tx.status = update.status.unwrap();
1161 updated_tx.status_reason = update.status_reason.clone();
1162 Ok::<_, RepositoryError>(updated_tx)
1163 });
1164
1165 mocks
1167 .job_producer
1168 .expect_produce_send_notification_job()
1169 .times(1)
1170 .returning(|_, _| Box::pin(async { Ok(()) }));
1171
1172 mocks
1174 .tx_repo
1175 .expect_find_by_status_paginated()
1176 .returning(move |_, _, _, _| {
1177 Ok(PaginatedResult {
1178 items: vec![],
1179 total: 0,
1180 page: 1,
1181 per_page: 1,
1182 })
1183 });
1184
1185 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1186 let initial_tx = tx_to_handle.clone();
1187
1188 let result = handler
1189 .handle_transaction_status_impl(initial_tx, None)
1190 .await;
1191
1192 assert!(result.is_ok());
1193 let handled_tx = result.unwrap();
1194 assert_eq!(handled_tx.id, "tx-on-chain-fail");
1195 assert_eq!(handled_tx.status, TransactionStatus::Failed);
1196 }
1197
1198 #[tokio::test]
1199 async fn test_on_chain_success_does_not_decrement_sequence() {
1200 let relayer = create_test_relayer();
1201 let mut mocks = default_test_mocks();
1202
1203 let mut tx_to_handle = create_test_transaction(&relayer.id);
1204 tx_to_handle.id = "tx-on-chain-success".to_string();
1205 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1206 let tx_hash_bytes = [5u8; 32];
1207 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1208 {
1209 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
1210 stellar_data.sequence_number = Some(101); }
1212 tx_to_handle.status = TransactionStatus::Submitted;
1213
1214 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1215
1216 mocks
1218 .provider
1219 .expect_get_transaction()
1220 .with(eq(expected_stellar_hash.clone()))
1221 .times(1)
1222 .returning(move |_| {
1223 Box::pin(async { Ok(dummy_get_transaction_response("SUCCESS")) })
1224 });
1225
1226 mocks.counter.expect_decrement().never();
1228
1229 mocks
1231 .tx_repo
1232 .expect_partial_update()
1233 .withf(move |id, update| {
1234 id == "tx-on-chain-success"
1235 && update.status == Some(TransactionStatus::Confirmed)
1236 && update.confirmed_at.is_some()
1237 })
1238 .times(1)
1239 .returning(move |id, update| {
1240 let mut updated_tx = create_test_transaction("test");
1241 updated_tx.id = id;
1242 updated_tx.status = update.status.unwrap();
1243 updated_tx.confirmed_at = update.confirmed_at;
1244 Ok(updated_tx)
1245 });
1246
1247 mocks
1249 .job_producer
1250 .expect_produce_send_notification_job()
1251 .times(1)
1252 .returning(|_, _| Box::pin(async { Ok(()) }));
1253
1254 mocks
1256 .tx_repo
1257 .expect_find_by_status_paginated()
1258 .returning(move |_, _, _, _| {
1259 Ok(PaginatedResult {
1260 items: vec![],
1261 total: 0,
1262 page: 1,
1263 per_page: 1,
1264 })
1265 });
1266
1267 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1268 let initial_tx = tx_to_handle.clone();
1269
1270 let result = handler
1271 .handle_transaction_status_impl(initial_tx, None)
1272 .await;
1273
1274 assert!(result.is_ok());
1275 let handled_tx = result.unwrap();
1276 assert_eq!(handled_tx.id, "tx-on-chain-success");
1277 assert_eq!(handled_tx.status, TransactionStatus::Confirmed);
1278 }
1279
1280 #[tokio::test]
1281 async fn test_handle_transaction_status_with_xdr_error_requeues() {
1282 let relayer = create_test_relayer();
1284 let mut mocks = default_test_mocks();
1285
1286 let mut tx_to_handle = create_test_transaction(&relayer.id);
1287 tx_to_handle.id = "tx-xdr-error-requeue".to_string();
1288 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1289 let tx_hash_bytes = [8u8; 32];
1290 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1291 {
1292 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
1293 }
1294 tx_to_handle.status = TransactionStatus::Submitted;
1295
1296 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1297
1298 mocks
1300 .provider
1301 .expect_get_transaction()
1302 .with(eq(expected_stellar_hash.clone()))
1303 .times(1)
1304 .returning(move |_| {
1305 Box::pin(async { Err(ProviderError::Other("Network timeout".to_string())) })
1306 });
1307
1308 mocks.tx_repo.expect_partial_update().never();
1310 mocks
1311 .job_producer
1312 .expect_produce_send_notification_job()
1313 .never();
1314
1315 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1316
1317 let result = handler
1318 .handle_transaction_status_impl(tx_to_handle, None)
1319 .await;
1320
1321 assert!(result.is_err());
1323 matches!(result.unwrap_err(), TransactionError::UnderlyingProvider(_));
1324 }
1325
1326 #[tokio::test]
1327 async fn handle_transaction_status_extracts_transaction_result_xdr() {
1328 let relayer = create_test_relayer();
1329 let mut mocks = default_test_mocks();
1330
1331 let mut tx_to_handle = create_test_transaction(&relayer.id);
1332 tx_to_handle.id = "tx-with-result".to_string();
1333 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1334 let tx_hash_bytes = [9u8; 32];
1335 let tx_hash_hex = hex::encode(tx_hash_bytes);
1336 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1337 {
1338 stellar_data.hash = Some(tx_hash_hex.clone());
1339 } else {
1340 panic!("Expected Stellar network data");
1341 }
1342 tx_to_handle.status = TransactionStatus::Submitted;
1343
1344 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1345
1346 mocks
1348 .provider
1349 .expect_get_transaction()
1350 .with(eq(expected_stellar_hash.clone()))
1351 .times(1)
1352 .returning(move |_| {
1353 Box::pin(async {
1354 Ok(dummy_get_transaction_response_with_result_meta(
1355 "SUCCESS", true,
1356 ))
1357 })
1358 });
1359
1360 let tx_to_handle_clone = tx_to_handle.clone();
1362 mocks
1363 .tx_repo
1364 .expect_partial_update()
1365 .withf(move |id, update| {
1366 id == "tx-with-result"
1367 && update.status == Some(TransactionStatus::Confirmed)
1368 && update.confirmed_at.is_some()
1369 && update.network_data.as_ref().is_some_and(|and| {
1370 if let NetworkTransactionData::Stellar(stellar_data) = and {
1371 stellar_data.transaction_result_xdr.is_some()
1373 } else {
1374 false
1375 }
1376 })
1377 })
1378 .times(1)
1379 .returning(move |id, update| {
1380 let mut updated_tx = tx_to_handle_clone.clone();
1381 updated_tx.id = id;
1382 updated_tx.status = update.status.unwrap();
1383 updated_tx.confirmed_at = update.confirmed_at;
1384 if let Some(network_data) = update.network_data {
1385 updated_tx.network_data = network_data;
1386 }
1387 Ok(updated_tx)
1388 });
1389
1390 mocks
1392 .job_producer
1393 .expect_produce_send_notification_job()
1394 .times(1)
1395 .returning(|_, _| Box::pin(async { Ok(()) }));
1396
1397 mocks
1399 .tx_repo
1400 .expect_find_by_status_paginated()
1401 .returning(move |_, _, _, _| {
1402 Ok(PaginatedResult {
1403 items: vec![],
1404 total: 0,
1405 page: 1,
1406 per_page: 1,
1407 })
1408 });
1409
1410 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1411 let result = handler
1412 .handle_transaction_status_impl(tx_to_handle, None)
1413 .await;
1414
1415 assert!(result.is_ok());
1416 let handled_tx = result.unwrap();
1417 assert_eq!(handled_tx.id, "tx-with-result");
1418 assert_eq!(handled_tx.status, TransactionStatus::Confirmed);
1419
1420 if let NetworkTransactionData::Stellar(stellar_data) = handled_tx.network_data {
1422 assert!(
1423 stellar_data.transaction_result_xdr.is_some(),
1424 "transaction_result_xdr should be stored when result_meta contains return_value"
1425 );
1426 } else {
1427 panic!("Expected Stellar network data");
1428 }
1429 }
1430
1431 #[tokio::test]
1432 async fn handle_transaction_status_no_result_meta_does_not_store_xdr() {
1433 let relayer = create_test_relayer();
1434 let mut mocks = default_test_mocks();
1435
1436 let mut tx_to_handle = create_test_transaction(&relayer.id);
1437 tx_to_handle.id = "tx-no-result-meta".to_string();
1438 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1439 let tx_hash_bytes = [10u8; 32];
1440 let tx_hash_hex = hex::encode(tx_hash_bytes);
1441 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
1442 {
1443 stellar_data.hash = Some(tx_hash_hex.clone());
1444 } else {
1445 panic!("Expected Stellar network data");
1446 }
1447 tx_to_handle.status = TransactionStatus::Submitted;
1448
1449 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1450
1451 mocks
1453 .provider
1454 .expect_get_transaction()
1455 .with(eq(expected_stellar_hash.clone()))
1456 .times(1)
1457 .returning(move |_| {
1458 Box::pin(async {
1459 Ok(dummy_get_transaction_response_with_result_meta(
1460 "SUCCESS", false,
1461 ))
1462 })
1463 });
1464
1465 let tx_to_handle_clone = tx_to_handle.clone();
1467 mocks
1468 .tx_repo
1469 .expect_partial_update()
1470 .times(1)
1471 .returning(move |id, update| {
1472 let mut updated_tx = tx_to_handle_clone.clone();
1473 updated_tx.id = id;
1474 updated_tx.status = update.status.unwrap();
1475 updated_tx.confirmed_at = update.confirmed_at;
1476 if let Some(network_data) = update.network_data {
1477 updated_tx.network_data = network_data;
1478 }
1479 Ok(updated_tx)
1480 });
1481
1482 mocks
1484 .job_producer
1485 .expect_produce_send_notification_job()
1486 .times(1)
1487 .returning(|_, _| Box::pin(async { Ok(()) }));
1488
1489 mocks
1491 .tx_repo
1492 .expect_find_by_status_paginated()
1493 .returning(move |_, _, _, _| {
1494 Ok(PaginatedResult {
1495 items: vec![],
1496 total: 0,
1497 page: 1,
1498 per_page: 1,
1499 })
1500 });
1501
1502 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1503 let result = handler
1504 .handle_transaction_status_impl(tx_to_handle, None)
1505 .await;
1506
1507 assert!(result.is_ok());
1508 let handled_tx = result.unwrap();
1509
1510 if let NetworkTransactionData::Stellar(stellar_data) = handled_tx.network_data {
1512 assert!(
1513 stellar_data.transaction_result_xdr.is_none(),
1514 "transaction_result_xdr should be None when result_meta is missing"
1515 );
1516 } else {
1517 panic!("Expected Stellar network data");
1518 }
1519 }
1520
1521 #[tokio::test]
1522 async fn test_sent_transaction_not_stuck_yet_returns_ok() {
1523 let relayer = create_test_relayer();
1525 let mut mocks = default_test_mocks();
1526
1527 let mut tx = create_test_transaction(&relayer.id);
1528 tx.id = "tx-sent-not-stuck".to_string();
1529 tx.status = TransactionStatus::Sent;
1530 tx.created_at = Utc::now().to_rfc3339();
1532 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1534 stellar_data.hash = None;
1535 }
1536
1537 mocks.provider.expect_get_transaction().never();
1539 mocks.tx_repo.expect_partial_update().never();
1540 mocks
1541 .job_producer
1542 .expect_produce_submit_transaction_job()
1543 .never();
1544
1545 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1546 let result = handler
1547 .handle_transaction_status_impl(tx.clone(), None)
1548 .await;
1549
1550 assert!(result.is_ok());
1551 let returned_tx = result.unwrap();
1552 assert_eq!(returned_tx.id, tx.id);
1554 assert_eq!(returned_tx.status, TransactionStatus::Sent);
1555 }
1556
1557 #[tokio::test]
1558 async fn test_stuck_sent_transaction_reenqueues_submit_job() {
1559 let relayer = create_test_relayer();
1562 let mut mocks = default_test_mocks();
1563
1564 let mut tx = create_test_transaction(&relayer.id);
1565 tx.id = "tx-stuck-with-xdr".to_string();
1566 tx.status = TransactionStatus::Sent;
1567 tx.created_at = (Utc::now() - Duration::minutes(10)).to_rfc3339();
1569 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1571 stellar_data.hash = None;
1572 stellar_data.signed_envelope_xdr = Some("AAAA...signed...".to_string());
1573 }
1574
1575 mocks
1577 .job_producer
1578 .expect_produce_submit_transaction_job()
1579 .times(1)
1580 .returning(|_, _| Box::pin(async { Ok(()) }));
1581
1582 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1583 let result = handler
1584 .handle_transaction_status_impl(tx.clone(), None)
1585 .await;
1586
1587 assert!(result.is_ok());
1588 let returned_tx = result.unwrap();
1589 assert_eq!(returned_tx.status, TransactionStatus::Sent);
1591 }
1592
1593 #[tokio::test]
1594 async fn test_stuck_sent_transaction_expired_marks_expired() {
1595 let relayer = create_test_relayer();
1597 let mut mocks = default_test_mocks();
1598
1599 let mut tx = create_test_transaction(&relayer.id);
1600 tx.id = "tx-expired".to_string();
1601 tx.status = TransactionStatus::Sent;
1602 tx.created_at = (Utc::now() - Duration::minutes(10)).to_rfc3339();
1604 tx.valid_until = Some((Utc::now() - Duration::minutes(5)).to_rfc3339());
1606 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1607 stellar_data.hash = None;
1608 stellar_data.signed_envelope_xdr = Some("AAAA...signed...".to_string());
1609 }
1610
1611 mocks
1613 .tx_repo
1614 .expect_partial_update()
1615 .withf(|_id, update| update.status == Some(TransactionStatus::Expired))
1616 .times(1)
1617 .returning(|id, update| {
1618 let mut updated = create_test_transaction("test");
1619 updated.id = id;
1620 updated.status = update.status.unwrap();
1621 updated.status_reason = update.status_reason.clone();
1622 Ok(updated)
1623 });
1624
1625 mocks
1627 .job_producer
1628 .expect_produce_submit_transaction_job()
1629 .never();
1630
1631 mocks
1633 .job_producer
1634 .expect_produce_send_notification_job()
1635 .times(1)
1636 .returning(|_, _| Box::pin(async { Ok(()) }));
1637
1638 mocks
1640 .tx_repo
1641 .expect_find_by_status_paginated()
1642 .returning(move |_, _, _, _| {
1643 Ok(PaginatedResult {
1644 items: vec![],
1645 total: 0,
1646 page: 1,
1647 per_page: 1,
1648 })
1649 });
1650
1651 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1652 let result = handler.handle_transaction_status_impl(tx, None).await;
1653
1654 assert!(result.is_ok());
1655 let expired_tx = result.unwrap();
1656 assert_eq!(expired_tx.status, TransactionStatus::Expired);
1657 assert!(expired_tx
1658 .status_reason
1659 .as_ref()
1660 .unwrap()
1661 .contains("expired"));
1662 }
1663
1664 #[tokio::test]
1665 async fn test_stuck_sent_transaction_max_lifetime_marks_failed() {
1666 let relayer = create_test_relayer();
1668 let mut mocks = default_test_mocks();
1669
1670 let mut tx = create_test_transaction(&relayer.id);
1671 tx.id = "tx-max-lifetime".to_string();
1672 tx.status = TransactionStatus::Sent;
1673 tx.created_at = (Utc::now() - Duration::minutes(35)).to_rfc3339();
1675 tx.valid_until = None;
1677 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1678 stellar_data.hash = None;
1679 stellar_data.signed_envelope_xdr = Some("AAAA...signed...".to_string());
1680 }
1681
1682 mocks
1684 .tx_repo
1685 .expect_partial_update()
1686 .withf(|_id, update| update.status == Some(TransactionStatus::Failed))
1687 .times(1)
1688 .returning(|id, update| {
1689 let mut updated = create_test_transaction("test");
1690 updated.id = id;
1691 updated.status = update.status.unwrap();
1692 updated.status_reason = update.status_reason.clone();
1693 Ok(updated)
1694 });
1695
1696 mocks
1698 .job_producer
1699 .expect_produce_submit_transaction_job()
1700 .never();
1701
1702 mocks
1704 .job_producer
1705 .expect_produce_send_notification_job()
1706 .times(1)
1707 .returning(|_, _| Box::pin(async { Ok(()) }));
1708
1709 mocks
1711 .tx_repo
1712 .expect_find_by_status_paginated()
1713 .returning(|_, _, _, _| {
1714 Ok(PaginatedResult {
1715 items: vec![],
1716 total: 0,
1717 page: 1,
1718 per_page: 1,
1719 })
1720 });
1721
1722 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1723 let result = handler.handle_transaction_status_impl(tx, None).await;
1724
1725 assert!(result.is_ok());
1726 let failed_tx = result.unwrap();
1727 assert_eq!(failed_tx.status, TransactionStatus::Failed);
1728 assert!(failed_tx
1730 .status_reason
1731 .as_ref()
1732 .unwrap()
1733 .contains("stuck in Sent status for too long"));
1734 }
1735 }
1736
1737 mod handle_pending_state_tests {
1738 use super::*;
1739 use crate::constants::get_stellar_max_stuck_transaction_lifetime;
1740 use crate::constants::STELLAR_PENDING_RECOVERY_TRIGGER_SECONDS;
1741
1742 #[tokio::test]
1743 async fn test_pending_exceeds_max_lifetime_marks_failed() {
1744 let relayer = create_test_relayer();
1745 let mut mocks = default_test_mocks();
1746
1747 let mut tx = create_test_transaction(&relayer.id);
1748 tx.id = "tx-pending-old".to_string();
1749 tx.status = TransactionStatus::Pending;
1750 tx.created_at =
1752 (Utc::now() - get_stellar_max_stuck_transaction_lifetime() - Duration::minutes(1))
1753 .to_rfc3339();
1754
1755 mocks
1757 .tx_repo
1758 .expect_partial_update()
1759 .withf(|_id, update| update.status == Some(TransactionStatus::Failed))
1760 .times(1)
1761 .returning(|id, update| {
1762 let mut updated = create_test_transaction("test");
1763 updated.id = id;
1764 updated.status = update.status.unwrap();
1765 updated.status_reason = update.status_reason.clone();
1766 Ok(updated)
1767 });
1768
1769 mocks
1771 .job_producer
1772 .expect_produce_send_notification_job()
1773 .times(1)
1774 .returning(|_, _| Box::pin(async { Ok(()) }));
1775
1776 mocks
1778 .tx_repo
1779 .expect_find_by_status_paginated()
1780 .returning(move |_, _, _, _| {
1781 Ok(PaginatedResult {
1782 items: vec![],
1783 total: 0,
1784 page: 1,
1785 per_page: 1,
1786 })
1787 });
1788
1789 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1790 let result = handler.handle_transaction_status_impl(tx, None).await;
1791
1792 assert!(result.is_ok());
1793 let failed_tx = result.unwrap();
1794 assert_eq!(failed_tx.status, TransactionStatus::Failed);
1795 assert!(failed_tx
1796 .status_reason
1797 .as_ref()
1798 .unwrap()
1799 .contains("stuck in Pending status for too long"));
1800 }
1801
1802 #[tokio::test]
1803 async fn test_pending_triggers_recovery_job_when_old_enough() {
1804 let relayer = create_test_relayer();
1805 let mut mocks = default_test_mocks();
1806
1807 let mut tx = create_test_transaction(&relayer.id);
1808 tx.id = "tx-pending-recovery".to_string();
1809 tx.status = TransactionStatus::Pending;
1810 tx.created_at = (Utc::now()
1812 - Duration::seconds(STELLAR_PENDING_RECOVERY_TRIGGER_SECONDS + 5))
1813 .to_rfc3339();
1814
1815 mocks
1817 .job_producer
1818 .expect_produce_transaction_request_job()
1819 .times(1)
1820 .returning(|_, _| Box::pin(async { Ok(()) }));
1821
1822 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1823 let result = handler.handle_transaction_status_impl(tx, None).await;
1824
1825 assert!(result.is_ok());
1826 let tx_result = result.unwrap();
1827 assert_eq!(tx_result.status, TransactionStatus::Pending);
1828 }
1829
1830 #[tokio::test]
1831 async fn test_pending_too_young_does_not_schedule_recovery() {
1832 let relayer = create_test_relayer();
1833 let mut mocks = default_test_mocks();
1834
1835 let mut tx = create_test_transaction(&relayer.id);
1836 tx.id = "tx-pending-young".to_string();
1837 tx.status = TransactionStatus::Pending;
1838 tx.created_at = (Utc::now()
1840 - Duration::seconds(STELLAR_PENDING_RECOVERY_TRIGGER_SECONDS - 5))
1841 .to_rfc3339();
1842
1843 mocks
1845 .job_producer
1846 .expect_produce_transaction_request_job()
1847 .never();
1848
1849 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1850 let result = handler.handle_transaction_status_impl(tx, None).await;
1851
1852 assert!(result.is_ok());
1853 let tx_result = result.unwrap();
1854 assert_eq!(tx_result.status, TransactionStatus::Pending);
1855 }
1856
1857 #[tokio::test]
1858 async fn test_sent_without_hash_handles_stuck_recovery() {
1859 use crate::constants::get_stellar_resend_timeout;
1860
1861 let relayer = create_test_relayer();
1862 let mut mocks = default_test_mocks();
1863
1864 let mut tx = create_test_transaction(&relayer.id);
1865 tx.id = "tx-sent-no-hash".to_string();
1866 tx.status = TransactionStatus::Sent;
1867 tx.created_at =
1869 (Utc::now() - get_stellar_resend_timeout() - Duration::seconds(1)).to_rfc3339();
1870 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1871 stellar_data.hash = None; }
1873
1874 mocks
1876 .job_producer
1877 .expect_produce_submit_transaction_job()
1878 .times(1)
1879 .returning(|_, _| Box::pin(async { Ok(()) }));
1880
1881 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1882 let result = handler.handle_transaction_status_impl(tx, None).await;
1883
1884 assert!(result.is_ok());
1885 let tx_result = result.unwrap();
1886 assert_eq!(tx_result.status, TransactionStatus::Sent);
1887 }
1888
1889 #[tokio::test]
1890 async fn test_submitted_without_hash_marks_failed() {
1891 let relayer = create_test_relayer();
1892 let mut mocks = default_test_mocks();
1893
1894 let mut tx = create_test_transaction(&relayer.id);
1895 tx.id = "tx-submitted-no-hash".to_string();
1896 tx.status = TransactionStatus::Submitted;
1897 tx.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
1898 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1899 stellar_data.hash = None; }
1901
1902 mocks
1904 .tx_repo
1905 .expect_partial_update()
1906 .withf(|_id, update| update.status == Some(TransactionStatus::Failed))
1907 .times(1)
1908 .returning(|id, update| {
1909 let mut updated = create_test_transaction("test");
1910 updated.id = id;
1911 updated.status = update.status.unwrap();
1912 updated.status_reason = update.status_reason.clone();
1913 Ok(updated)
1914 });
1915
1916 mocks
1918 .job_producer
1919 .expect_produce_send_notification_job()
1920 .times(1)
1921 .returning(|_, _| Box::pin(async { Ok(()) }));
1922
1923 mocks
1925 .tx_repo
1926 .expect_find_by_status_paginated()
1927 .returning(move |_, _, _, _| {
1928 Ok(PaginatedResult {
1929 items: vec![],
1930 total: 0,
1931 page: 1,
1932 per_page: 1,
1933 })
1934 });
1935
1936 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
1937 let result = handler.handle_transaction_status_impl(tx, None).await;
1938
1939 assert!(result.is_ok());
1940 let failed_tx = result.unwrap();
1941 assert_eq!(failed_tx.status, TransactionStatus::Failed);
1942 assert!(failed_tx
1943 .status_reason
1944 .as_ref()
1945 .unwrap()
1946 .contains("Failed to parse and validate hash"));
1947 }
1948
1949 #[tokio::test]
1950 async fn test_submitted_exceeds_max_lifetime_marks_failed() {
1951 let relayer = create_test_relayer();
1952 let mut mocks = default_test_mocks();
1953
1954 let mut tx = create_test_transaction(&relayer.id);
1955 tx.id = "tx-submitted-old".to_string();
1956 tx.status = TransactionStatus::Submitted;
1957 tx.created_at =
1959 (Utc::now() - get_stellar_max_stuck_transaction_lifetime() - Duration::minutes(1))
1960 .to_rfc3339();
1961 let tx_hash_bytes = [6u8; 32];
1963 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
1964 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
1965 }
1966
1967 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
1968
1969 mocks
1971 .provider
1972 .expect_get_transaction()
1973 .with(eq(expected_stellar_hash.clone()))
1974 .times(1)
1975 .returning(move |_| {
1976 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
1977 });
1978
1979 mocks
1981 .tx_repo
1982 .expect_partial_update()
1983 .withf(|_id, update| update.status == Some(TransactionStatus::Failed))
1984 .times(1)
1985 .returning(|id, update| {
1986 let mut updated = create_test_transaction("test");
1987 updated.id = id;
1988 updated.status = update.status.unwrap();
1989 updated.status_reason = update.status_reason.clone();
1990 Ok(updated)
1991 });
1992
1993 mocks
1995 .job_producer
1996 .expect_produce_send_notification_job()
1997 .times(1)
1998 .returning(|_, _| Box::pin(async { Ok(()) }));
1999
2000 mocks
2002 .tx_repo
2003 .expect_find_by_status_paginated()
2004 .returning(move |_, _, _, _| {
2005 Ok(PaginatedResult {
2006 items: vec![],
2007 total: 0,
2008 page: 1,
2009 per_page: 1,
2010 })
2011 });
2012
2013 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2014 let result = handler.handle_transaction_status_impl(tx, None).await;
2015
2016 assert!(result.is_ok());
2017 let failed_tx = result.unwrap();
2018 assert_eq!(failed_tx.status, TransactionStatus::Failed);
2019 assert!(failed_tx
2020 .status_reason
2021 .as_ref()
2022 .unwrap()
2023 .contains("stuck in Submitted status for too long"));
2024 }
2025
2026 #[tokio::test]
2027 async fn test_submitted_expired_marks_expired() {
2028 let relayer = create_test_relayer();
2029 let mut mocks = default_test_mocks();
2030
2031 let mut tx = create_test_transaction(&relayer.id);
2032 tx.id = "tx-submitted-expired".to_string();
2033 tx.status = TransactionStatus::Submitted;
2034 tx.created_at = (Utc::now() - Duration::minutes(10)).to_rfc3339();
2035 tx.valid_until = Some((Utc::now() - Duration::minutes(5)).to_rfc3339());
2037 let tx_hash_bytes = [7u8; 32];
2039 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2040 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
2041 }
2042
2043 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
2044
2045 mocks
2047 .provider
2048 .expect_get_transaction()
2049 .with(eq(expected_stellar_hash.clone()))
2050 .times(1)
2051 .returning(move |_| {
2052 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
2053 });
2054
2055 mocks
2057 .tx_repo
2058 .expect_partial_update()
2059 .withf(|_id, update| update.status == Some(TransactionStatus::Expired))
2060 .times(1)
2061 .returning(|id, update| {
2062 let mut updated = create_test_transaction("test");
2063 updated.id = id;
2064 updated.status = update.status.unwrap();
2065 updated.status_reason = update.status_reason.clone();
2066 Ok(updated)
2067 });
2068
2069 mocks
2071 .job_producer
2072 .expect_produce_send_notification_job()
2073 .times(1)
2074 .returning(|_, _| Box::pin(async { Ok(()) }));
2075
2076 mocks
2078 .tx_repo
2079 .expect_find_by_status_paginated()
2080 .returning(move |_, _, _, _| {
2081 Ok(PaginatedResult {
2082 items: vec![],
2083 total: 0,
2084 page: 1,
2085 per_page: 1,
2086 })
2087 });
2088
2089 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2090 let result = handler.handle_transaction_status_impl(tx, None).await;
2091
2092 assert!(result.is_ok());
2093 let expired_tx = result.unwrap();
2094 assert_eq!(expired_tx.status, TransactionStatus::Expired);
2095 assert!(expired_tx
2096 .status_reason
2097 .as_ref()
2098 .unwrap()
2099 .contains("expired"));
2100 }
2101
2102 #[tokio::test]
2103 async fn test_handle_submitted_state_resubmits_after_timeout() {
2104 let relayer = create_test_relayer();
2106 let mut mocks = default_test_mocks();
2107
2108 let mut tx = create_test_transaction(&relayer.id);
2109 tx.id = "tx-submitted-resubmit".to_string();
2110 tx.status = TransactionStatus::Submitted;
2111 let eleven_seconds_ago = (Utc::now() - Duration::seconds(11)).to_rfc3339();
2112 tx.created_at = eleven_seconds_ago.clone();
2113 tx.sent_at = Some(eleven_seconds_ago);
2114 let tx_hash_bytes = [8u8; 32];
2116 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2117 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
2118 }
2119
2120 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
2121
2122 mocks
2124 .provider
2125 .expect_get_transaction()
2126 .with(eq(expected_stellar_hash.clone()))
2127 .times(1)
2128 .returning(move |_| {
2129 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
2130 });
2131
2132 mocks
2134 .job_producer
2135 .expect_produce_submit_transaction_job()
2136 .times(1)
2137 .returning(|_, _| Box::pin(async { Ok(()) }));
2138
2139 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2140 let result = handler.handle_transaction_status_impl(tx, None).await;
2141
2142 assert!(result.is_ok());
2143 let tx_result = result.unwrap();
2144 assert_eq!(tx_result.status, TransactionStatus::Submitted);
2145 }
2146
2147 #[tokio::test]
2148 async fn test_handle_submitted_state_backoff_increases_interval() {
2149 let relayer = create_test_relayer();
2153 let mut mocks = default_test_mocks();
2154
2155 let mut tx = create_test_transaction(&relayer.id);
2156 tx.id = "tx-submitted-backoff".to_string();
2157 tx.status = TransactionStatus::Submitted;
2158 tx.created_at = (Utc::now() - Duration::seconds(25)).to_rfc3339();
2159 tx.sent_at = Some((Utc::now() - Duration::seconds(15)).to_rfc3339());
2160 let tx_hash_bytes = [11u8; 32];
2161 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2162 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
2163 }
2164
2165 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
2166
2167 mocks
2168 .provider
2169 .expect_get_transaction()
2170 .with(eq(expected_stellar_hash.clone()))
2171 .times(1)
2172 .returning(move |_| {
2173 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
2174 });
2175
2176 mocks
2178 .job_producer
2179 .expect_produce_submit_transaction_job()
2180 .never();
2181
2182 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2183 let result = handler.handle_transaction_status_impl(tx, None).await;
2184
2185 assert!(result.is_ok());
2186 let tx_result = result.unwrap();
2187 assert_eq!(tx_result.status, TransactionStatus::Submitted);
2188 }
2189
2190 #[tokio::test]
2191 async fn test_handle_submitted_state_backoff_resubmits_when_interval_exceeded() {
2192 let relayer = create_test_relayer();
2196 let mut mocks = default_test_mocks();
2197
2198 let mut tx = create_test_transaction(&relayer.id);
2199 tx.id = "tx-submitted-backoff-resubmit".to_string();
2200 tx.status = TransactionStatus::Submitted;
2201 tx.created_at = (Utc::now() - Duration::seconds(25)).to_rfc3339();
2202 tx.sent_at = Some((Utc::now() - Duration::seconds(21)).to_rfc3339());
2203 let tx_hash_bytes = [12u8; 32];
2204 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2205 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
2206 }
2207
2208 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
2209
2210 mocks
2211 .provider
2212 .expect_get_transaction()
2213 .with(eq(expected_stellar_hash.clone()))
2214 .times(1)
2215 .returning(move |_| {
2216 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
2217 });
2218
2219 mocks
2221 .job_producer
2222 .expect_produce_submit_transaction_job()
2223 .times(1)
2224 .returning(|_, _| Box::pin(async { Ok(()) }));
2225
2226 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2227 let result = handler.handle_transaction_status_impl(tx, None).await;
2228
2229 assert!(result.is_ok());
2230 let tx_result = result.unwrap();
2231 assert_eq!(tx_result.status, TransactionStatus::Submitted);
2232 }
2233
2234 #[tokio::test]
2235 async fn test_handle_submitted_state_recent_sent_at_prevents_resubmit() {
2236 let relayer = create_test_relayer();
2241 let mut mocks = default_test_mocks();
2242
2243 let mut tx = create_test_transaction(&relayer.id);
2244 tx.id = "tx-submitted-recent-sent".to_string();
2245 tx.status = TransactionStatus::Submitted;
2246 tx.created_at = (Utc::now() - Duration::seconds(60)).to_rfc3339();
2247 tx.sent_at = Some((Utc::now() - Duration::seconds(5)).to_rfc3339());
2248 let tx_hash_bytes = [13u8; 32];
2249 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2250 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
2251 }
2252
2253 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
2254
2255 mocks
2256 .provider
2257 .expect_get_transaction()
2258 .with(eq(expected_stellar_hash.clone()))
2259 .times(1)
2260 .returning(move |_| {
2261 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
2262 });
2263
2264 mocks
2266 .job_producer
2267 .expect_produce_submit_transaction_job()
2268 .never();
2269
2270 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2271 let result = handler.handle_transaction_status_impl(tx, None).await;
2272
2273 assert!(result.is_ok());
2274 let tx_result = result.unwrap();
2275 assert_eq!(tx_result.status, TransactionStatus::Submitted);
2276 }
2277
2278 #[tokio::test]
2279 async fn test_handle_submitted_state_no_resubmit_before_timeout() {
2280 let relayer = create_test_relayer();
2281 let mut mocks = default_test_mocks();
2282
2283 let mut tx = create_test_transaction(&relayer.id);
2284 tx.id = "tx-submitted-young".to_string();
2285 tx.status = TransactionStatus::Submitted;
2286 tx.created_at = Utc::now().to_rfc3339();
2288 let tx_hash_bytes = [9u8; 32];
2290 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2291 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
2292 }
2293
2294 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
2295
2296 mocks
2298 .provider
2299 .expect_get_transaction()
2300 .with(eq(expected_stellar_hash.clone()))
2301 .times(1)
2302 .returning(move |_| {
2303 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
2304 });
2305
2306 mocks
2308 .job_producer
2309 .expect_produce_submit_transaction_job()
2310 .never();
2311
2312 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2313 let result = handler.handle_transaction_status_impl(tx, None).await;
2314
2315 assert!(result.is_ok());
2316 let tx_result = result.unwrap();
2317 assert_eq!(tx_result.status, TransactionStatus::Submitted);
2318 }
2319
2320 #[tokio::test]
2321 async fn test_handle_submitted_state_expired_before_resubmit() {
2322 let relayer = create_test_relayer();
2323 let mut mocks = default_test_mocks();
2324
2325 let mut tx = create_test_transaction(&relayer.id);
2326 tx.id = "tx-submitted-expired-no-resubmit".to_string();
2327 tx.status = TransactionStatus::Submitted;
2328 tx.created_at = (Utc::now() - Duration::minutes(10)).to_rfc3339();
2329 tx.valid_until = Some((Utc::now() - Duration::minutes(5)).to_rfc3339());
2331 let tx_hash_bytes = [10u8; 32];
2333 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx.network_data {
2334 stellar_data.hash = Some(hex::encode(tx_hash_bytes));
2335 }
2336
2337 let expected_stellar_hash = soroban_rs::xdr::Hash(tx_hash_bytes);
2338
2339 mocks
2341 .provider
2342 .expect_get_transaction()
2343 .with(eq(expected_stellar_hash.clone()))
2344 .times(1)
2345 .returning(move |_| {
2346 Box::pin(async { Ok(dummy_get_transaction_response("PENDING")) })
2347 });
2348
2349 mocks
2351 .tx_repo
2352 .expect_partial_update()
2353 .withf(|_id, update| update.status == Some(TransactionStatus::Expired))
2354 .times(1)
2355 .returning(|id, update| {
2356 let mut updated = create_test_transaction("test");
2357 updated.id = id;
2358 updated.status = update.status.unwrap();
2359 updated.status_reason = update.status_reason.clone();
2360 Ok(updated)
2361 });
2362
2363 mocks
2365 .job_producer
2366 .expect_produce_submit_transaction_job()
2367 .never();
2368
2369 mocks
2371 .job_producer
2372 .expect_produce_send_notification_job()
2373 .times(1)
2374 .returning(|_, _| Box::pin(async { Ok(()) }));
2375
2376 mocks
2378 .tx_repo
2379 .expect_find_by_status_paginated()
2380 .returning(move |_, _, _, _| {
2381 Ok(PaginatedResult {
2382 items: vec![],
2383 total: 0,
2384 page: 1,
2385 per_page: 1,
2386 })
2387 });
2388
2389 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2390 let result = handler.handle_transaction_status_impl(tx, None).await;
2391
2392 assert!(result.is_ok());
2393 let expired_tx = result.unwrap();
2394 assert_eq!(expired_tx.status, TransactionStatus::Expired);
2395 assert!(expired_tx
2396 .status_reason
2397 .as_ref()
2398 .unwrap()
2399 .contains("expired"));
2400 }
2401 }
2402
2403 mod is_valid_until_expired_tests {
2404 use super::*;
2405 use crate::{
2406 jobs::MockJobProducerTrait,
2407 repositories::{
2408 MockRelayerRepository, MockTransactionCounterTrait, MockTransactionRepository,
2409 },
2410 services::{
2411 provider::MockStellarProviderTrait, stellar_dex::MockStellarDexServiceTrait,
2412 },
2413 };
2414 use chrono::{Duration, Utc};
2415
2416 type TestHandler = StellarRelayerTransaction<
2418 MockRelayerRepository,
2419 MockTransactionRepository,
2420 MockJobProducerTrait,
2421 MockStellarCombinedSigner,
2422 MockStellarProviderTrait,
2423 MockTransactionCounterTrait,
2424 MockStellarDexServiceTrait,
2425 >;
2426
2427 #[test]
2428 fn test_rfc3339_expired() {
2429 let past = (Utc::now() - Duration::hours(1)).to_rfc3339();
2430 assert!(TestHandler::is_valid_until_string_expired(&past));
2431 }
2432
2433 #[test]
2434 fn test_rfc3339_not_expired() {
2435 let future = (Utc::now() + Duration::hours(1)).to_rfc3339();
2436 assert!(!TestHandler::is_valid_until_string_expired(&future));
2437 }
2438
2439 #[test]
2440 fn test_numeric_timestamp_expired() {
2441 let past_timestamp = (Utc::now() - Duration::hours(1)).timestamp().to_string();
2442 assert!(TestHandler::is_valid_until_string_expired(&past_timestamp));
2443 }
2444
2445 #[test]
2446 fn test_numeric_timestamp_not_expired() {
2447 let future_timestamp = (Utc::now() + Duration::hours(1)).timestamp().to_string();
2448 assert!(!TestHandler::is_valid_until_string_expired(
2449 &future_timestamp
2450 ));
2451 }
2452
2453 #[test]
2454 fn test_zero_timestamp_unbounded() {
2455 assert!(!TestHandler::is_valid_until_string_expired("0"));
2457 }
2458
2459 #[test]
2460 fn test_invalid_format_not_expired() {
2461 assert!(!TestHandler::is_valid_until_string_expired("not-a-date"));
2463 }
2464 }
2465
2466 mod circuit_breaker_tests {
2468 use super::*;
2469 use crate::jobs::StatusCheckContext;
2470 use crate::models::NetworkType;
2471
2472 fn create_triggered_context() -> StatusCheckContext {
2474 StatusCheckContext::new(
2475 110, 150, 160, 100, 300, NetworkType::Stellar,
2481 )
2482 }
2483
2484 fn create_safe_context() -> StatusCheckContext {
2486 StatusCheckContext::new(
2487 10, 20, 25, 100, 300, NetworkType::Stellar,
2493 )
2494 }
2495
2496 fn create_total_triggered_context() -> StatusCheckContext {
2498 StatusCheckContext::new(
2499 20, 310, 350, 100, 300, NetworkType::Stellar,
2505 )
2506 }
2507
2508 #[tokio::test]
2509 async fn test_circuit_breaker_submitted_marks_as_failed() {
2510 let relayer = create_test_relayer();
2511 let mut mocks = default_test_mocks();
2512
2513 let mut tx_to_handle = create_test_transaction(&relayer.id);
2514 tx_to_handle.status = TransactionStatus::Submitted;
2515 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
2516
2517 mocks
2519 .tx_repo
2520 .expect_partial_update()
2521 .withf(|_, update| update.status == Some(TransactionStatus::Failed))
2522 .times(1)
2523 .returning(|_, update| {
2524 let mut updated_tx = create_test_transaction("test-relayer");
2525 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2526 updated_tx.status_reason = update.status_reason.clone();
2527 Ok(updated_tx)
2528 });
2529
2530 mocks
2532 .job_producer
2533 .expect_produce_send_notification_job()
2534 .returning(|_, _| Box::pin(async { Ok(()) }));
2535
2536 mocks
2538 .tx_repo
2539 .expect_find_by_status_paginated()
2540 .returning(|_, _, _, _| {
2541 Ok(PaginatedResult {
2542 items: vec![],
2543 total: 0,
2544 page: 1,
2545 per_page: 1,
2546 })
2547 });
2548
2549 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2550 let ctx = create_triggered_context();
2551
2552 let result = handler
2553 .handle_transaction_status_impl(tx_to_handle, Some(ctx))
2554 .await;
2555
2556 assert!(result.is_ok());
2557 let tx = result.unwrap();
2558 assert_eq!(tx.status, TransactionStatus::Failed);
2559 assert!(tx.status_reason.is_some());
2560 assert!(tx.status_reason.unwrap().contains("consecutive errors"));
2561 }
2562
2563 #[tokio::test]
2564 async fn test_circuit_breaker_pending_marks_as_failed() {
2565 let relayer = create_test_relayer();
2566 let mut mocks = default_test_mocks();
2567
2568 let mut tx_to_handle = create_test_transaction(&relayer.id);
2569 tx_to_handle.status = TransactionStatus::Pending;
2570 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
2571
2572 mocks
2574 .tx_repo
2575 .expect_partial_update()
2576 .withf(|_, update| update.status == Some(TransactionStatus::Failed))
2577 .times(1)
2578 .returning(|_, update| {
2579 let mut updated_tx = create_test_transaction("test-relayer");
2580 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2581 updated_tx.status_reason = update.status_reason.clone();
2582 Ok(updated_tx)
2583 });
2584
2585 mocks
2586 .job_producer
2587 .expect_produce_send_notification_job()
2588 .returning(|_, _| Box::pin(async { Ok(()) }));
2589
2590 mocks
2591 .tx_repo
2592 .expect_find_by_status_paginated()
2593 .returning(|_, _, _, _| {
2594 Ok(PaginatedResult {
2595 items: vec![],
2596 total: 0,
2597 page: 1,
2598 per_page: 1,
2599 })
2600 });
2601
2602 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2603 let ctx = create_triggered_context();
2604
2605 let result = handler
2606 .handle_transaction_status_impl(tx_to_handle, Some(ctx))
2607 .await;
2608
2609 assert!(result.is_ok());
2610 let tx = result.unwrap();
2611 assert_eq!(tx.status, TransactionStatus::Failed);
2612 }
2613
2614 #[tokio::test]
2615 async fn test_circuit_breaker_total_failures_triggers() {
2616 let relayer = create_test_relayer();
2617 let mut mocks = default_test_mocks();
2618
2619 let mut tx_to_handle = create_test_transaction(&relayer.id);
2620 tx_to_handle.status = TransactionStatus::Submitted;
2621 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
2622
2623 mocks
2624 .tx_repo
2625 .expect_partial_update()
2626 .withf(|_, update| update.status == Some(TransactionStatus::Failed))
2627 .times(1)
2628 .returning(|_, update| {
2629 let mut updated_tx = create_test_transaction("test-relayer");
2630 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2631 updated_tx.status_reason = update.status_reason.clone();
2632 Ok(updated_tx)
2633 });
2634
2635 mocks
2636 .job_producer
2637 .expect_produce_send_notification_job()
2638 .returning(|_, _| Box::pin(async { Ok(()) }));
2639
2640 mocks
2641 .tx_repo
2642 .expect_find_by_status_paginated()
2643 .returning(|_, _, _, _| {
2644 Ok(PaginatedResult {
2645 items: vec![],
2646 total: 0,
2647 page: 1,
2648 per_page: 1,
2649 })
2650 });
2651
2652 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2653 let ctx = create_total_triggered_context();
2655
2656 let result = handler
2657 .handle_transaction_status_impl(tx_to_handle, Some(ctx))
2658 .await;
2659
2660 assert!(result.is_ok());
2661 let tx = result.unwrap();
2662 assert_eq!(tx.status, TransactionStatus::Failed);
2663 }
2664
2665 #[tokio::test]
2666 async fn test_circuit_breaker_below_threshold_continues() {
2667 let relayer = create_test_relayer();
2668 let mut mocks = default_test_mocks();
2669
2670 let mut tx_to_handle = create_test_transaction(&relayer.id);
2671 tx_to_handle.status = TransactionStatus::Submitted;
2672 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
2673 let tx_hash_bytes = [1u8; 32];
2674 let tx_hash_hex = hex::encode(tx_hash_bytes);
2675 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
2676 {
2677 stellar_data.hash = Some(tx_hash_hex.clone());
2678 }
2679
2680 mocks
2682 .provider
2683 .expect_get_transaction()
2684 .returning(|_| Box::pin(async { Ok(dummy_get_transaction_response("SUCCESS")) }));
2685
2686 mocks
2687 .tx_repo
2688 .expect_partial_update()
2689 .returning(|_, update| {
2690 let mut updated_tx = create_test_transaction("test-relayer");
2691 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2692 Ok(updated_tx)
2693 });
2694
2695 mocks
2696 .job_producer
2697 .expect_produce_send_notification_job()
2698 .returning(|_, _| Box::pin(async { Ok(()) }));
2699
2700 mocks
2701 .tx_repo
2702 .expect_find_by_status_paginated()
2703 .returning(|_, _, _, _| {
2704 Ok(PaginatedResult {
2705 items: vec![],
2706 total: 0,
2707 page: 1,
2708 per_page: 1,
2709 })
2710 });
2711
2712 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2713 let ctx = create_safe_context();
2714
2715 let result = handler
2716 .handle_transaction_status_impl(tx_to_handle, Some(ctx))
2717 .await;
2718
2719 assert!(result.is_ok());
2720 let tx = result.unwrap();
2721 assert_eq!(tx.status, TransactionStatus::Confirmed);
2723 }
2724
2725 #[tokio::test]
2726 async fn test_circuit_breaker_final_state_early_return() {
2727 let relayer = create_test_relayer();
2728 let mocks = default_test_mocks();
2729
2730 let mut tx_to_handle = create_test_transaction(&relayer.id);
2732 tx_to_handle.status = TransactionStatus::Confirmed;
2733
2734 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2735 let ctx = create_triggered_context();
2736
2737 let result = handler
2739 .handle_transaction_status_impl(tx_to_handle.clone(), Some(ctx))
2740 .await;
2741
2742 assert!(result.is_ok());
2743 assert_eq!(result.unwrap().id, tx_to_handle.id);
2744 }
2745
2746 #[tokio::test]
2747 async fn test_circuit_breaker_no_context_continues() {
2748 let relayer = create_test_relayer();
2749 let mut mocks = default_test_mocks();
2750
2751 let mut tx_to_handle = create_test_transaction(&relayer.id);
2752 tx_to_handle.status = TransactionStatus::Submitted;
2753 tx_to_handle.created_at = (Utc::now() - Duration::minutes(1)).to_rfc3339();
2754 let tx_hash_bytes = [1u8; 32];
2755 let tx_hash_hex = hex::encode(tx_hash_bytes);
2756 if let NetworkTransactionData::Stellar(ref mut stellar_data) = tx_to_handle.network_data
2757 {
2758 stellar_data.hash = Some(tx_hash_hex.clone());
2759 }
2760
2761 mocks
2763 .provider
2764 .expect_get_transaction()
2765 .returning(|_| Box::pin(async { Ok(dummy_get_transaction_response("SUCCESS")) }));
2766
2767 mocks
2768 .tx_repo
2769 .expect_partial_update()
2770 .returning(|_, update| {
2771 let mut updated_tx = create_test_transaction("test-relayer");
2772 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2773 Ok(updated_tx)
2774 });
2775
2776 mocks
2777 .job_producer
2778 .expect_produce_send_notification_job()
2779 .returning(|_, _| Box::pin(async { Ok(()) }));
2780
2781 mocks
2782 .tx_repo
2783 .expect_find_by_status_paginated()
2784 .returning(|_, _, _, _| {
2785 Ok(PaginatedResult {
2786 items: vec![],
2787 total: 0,
2788 page: 1,
2789 per_page: 1,
2790 })
2791 });
2792
2793 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
2794
2795 let result = handler
2797 .handle_transaction_status_impl(tx_to_handle, None)
2798 .await;
2799
2800 assert!(result.is_ok());
2801 let tx = result.unwrap();
2802 assert_eq!(tx.status, TransactionStatus::Confirmed);
2803 }
2804 }
2805
2806 mod failure_detail_helper_tests {
2807 use super::*;
2808 use soroban_rs::xdr::{InvokeHostFunctionResult, OperationResult, OperationResultTr, VecM};
2809
2810 #[test]
2811 fn first_failing_op_finds_trapped() {
2812 let ops: VecM<OperationResult> = vec![OperationResult::OpInner(
2813 OperationResultTr::InvokeHostFunction(InvokeHostFunctionResult::Trapped),
2814 )]
2815 .try_into()
2816 .unwrap();
2817 assert_eq!(first_failing_op(ops.as_slice()), Some("Trapped"));
2818 }
2819
2820 #[test]
2821 fn first_failing_op_skips_success() {
2822 let ops: VecM<OperationResult> = vec![
2823 OperationResult::OpInner(OperationResultTr::InvokeHostFunction(
2824 InvokeHostFunctionResult::Success(soroban_rs::xdr::Hash([0u8; 32])),
2825 )),
2826 OperationResult::OpInner(OperationResultTr::InvokeHostFunction(
2827 InvokeHostFunctionResult::ResourceLimitExceeded,
2828 )),
2829 ]
2830 .try_into()
2831 .unwrap();
2832 assert_eq!(
2833 first_failing_op(ops.as_slice()),
2834 Some("ResourceLimitExceeded")
2835 );
2836 }
2837
2838 #[test]
2839 fn first_failing_op_all_success_returns_none() {
2840 let ops: VecM<OperationResult> = vec![OperationResult::OpInner(
2841 OperationResultTr::InvokeHostFunction(InvokeHostFunctionResult::Success(
2842 soroban_rs::xdr::Hash([0u8; 32]),
2843 )),
2844 )]
2845 .try_into()
2846 .unwrap();
2847 assert_eq!(first_failing_op(ops.as_slice()), None);
2848 }
2849
2850 #[test]
2851 fn first_failing_op_empty_returns_none() {
2852 assert_eq!(first_failing_op(&[]), None);
2853 }
2854
2855 #[test]
2856 fn first_failing_op_op_bad_auth() {
2857 let ops: VecM<OperationResult> = vec![OperationResult::OpBadAuth].try_into().unwrap();
2858 assert_eq!(first_failing_op(ops.as_slice()), Some("OpBadAuth"));
2859 }
2860 }
2861}