1use crate::{
7 constants::DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS,
8 domain::transaction::{stellar::fetch_next_sequence_from_chain, Transaction},
9 jobs::{JobProducer, JobProducerTrait, StatusCheckContext, TransactionRequest},
10 models::{
11 produce_transaction_update_notification_payload, NetworkTransactionRequest,
12 PaginationQuery, RelayerNetworkPolicy, RelayerRepoModel, TransactionError,
13 TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
14 },
15 repositories::{
16 RelayerRepositoryStorage, Repository, TransactionCounterRepositoryStorage,
17 TransactionCounterTrait, TransactionRepository, TransactionRepositoryStorage,
18 },
19 services::{
20 provider::{StellarProvider, StellarProviderTrait},
21 signer::{Signer, StellarSignTrait, StellarSigner},
22 stellar_dex::{StellarDexService, StellarDexServiceTrait},
23 },
24 utils::calculate_scheduled_timestamp,
25};
26use async_trait::async_trait;
27use std::sync::Arc;
28use tracing::{error, info, warn};
29
30use super::lane_gate;
31
32#[allow(dead_code)]
33pub struct StellarRelayerTransaction<R, T, J, S, P, C, D>
34where
35 R: Repository<RelayerRepoModel, String>,
36 T: TransactionRepository,
37 J: JobProducerTrait,
38 S: Signer + StellarSignTrait,
39 P: StellarProviderTrait,
40 C: TransactionCounterTrait,
41 D: StellarDexServiceTrait + Send + Sync + 'static,
42{
43 relayer: RelayerRepoModel,
44 relayer_repository: Arc<R>,
45 transaction_repository: Arc<T>,
46 job_producer: Arc<J>,
47 signer: Arc<S>,
48 provider: P,
49 transaction_counter_service: Arc<C>,
50 dex_service: Arc<D>,
51}
52
53#[allow(dead_code)]
54impl<R, T, J, S, P, C, D> StellarRelayerTransaction<R, T, J, S, P, C, D>
55where
56 R: Repository<RelayerRepoModel, String>,
57 T: TransactionRepository,
58 J: JobProducerTrait,
59 S: Signer + StellarSignTrait,
60 P: StellarProviderTrait,
61 C: TransactionCounterTrait,
62 D: StellarDexServiceTrait + Send + Sync + 'static,
63{
64 #[allow(clippy::too_many_arguments)]
81 pub fn new(
82 relayer: RelayerRepoModel,
83 relayer_repository: Arc<R>,
84 transaction_repository: Arc<T>,
85 job_producer: Arc<J>,
86 signer: Arc<S>,
87 provider: P,
88 transaction_counter_service: Arc<C>,
89 dex_service: Arc<D>,
90 ) -> Result<Self, TransactionError> {
91 Ok(Self {
92 relayer,
93 relayer_repository,
94 transaction_repository,
95 job_producer,
96 signer,
97 provider,
98 transaction_counter_service,
99 dex_service,
100 })
101 }
102
103 pub fn provider(&self) -> &P {
104 &self.provider
105 }
106
107 pub fn relayer(&self) -> &RelayerRepoModel {
108 &self.relayer
109 }
110
111 pub fn job_producer(&self) -> &J {
112 &self.job_producer
113 }
114
115 pub fn transaction_repository(&self) -> &T {
116 &self.transaction_repository
117 }
118
119 pub fn signer(&self) -> &S {
120 &self.signer
121 }
122
123 pub fn transaction_counter_service(&self) -> &C {
124 &self.transaction_counter_service
125 }
126
127 pub fn dex_service(&self) -> &D {
128 &self.dex_service
129 }
130
131 pub fn concurrent_transactions_enabled(&self) -> bool {
132 if let RelayerNetworkPolicy::Stellar(policy) = &self.relayer().policies {
133 policy
134 .concurrent_transactions
135 .unwrap_or(DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS)
136 } else {
137 DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS
138 }
139 }
140
141 pub async fn send_transaction_request_job(
143 &self,
144 tx: &TransactionRepoModel,
145 delay_seconds: Option<i64>,
146 ) -> Result<(), TransactionError> {
147 let job = TransactionRequest::new(tx.id.clone(), tx.relayer_id.clone());
148 let scheduled_on = delay_seconds.map(calculate_scheduled_timestamp);
149 self.job_producer()
150 .produce_transaction_request_job(job, scheduled_on)
151 .await?;
152 Ok(())
153 }
154
155 pub(super) async fn send_transaction_update_notification(&self, tx: &TransactionRepoModel) {
160 if let Some(notification_id) = &self.relayer().notification_id {
161 if let Err(e) = self
162 .job_producer()
163 .produce_send_notification_job(
164 produce_transaction_update_notification_payload(notification_id, tx),
165 None,
166 )
167 .await
168 {
169 error!(error = %e, "failed to produce notification job");
170 }
171 }
172 }
173
174 pub async fn finalize_transaction_state(
176 &self,
177 tx_id: String,
178 update_req: TransactionUpdateRequest,
179 ) -> Result<TransactionRepoModel, TransactionError> {
180 let updated_tx = self
181 .transaction_repository()
182 .partial_update(tx_id, update_req)
183 .await?;
184
185 self.send_transaction_update_notification(&updated_tx).await;
186 Ok(updated_tx)
187 }
188
189 pub async fn enqueue_next_pending_transaction(
190 &self,
191 finished_tx_id: &str,
192 ) -> Result<(), TransactionError> {
193 if !self.concurrent_transactions_enabled() {
194 if let Some(next) = self
195 .find_oldest_pending_for_relayer(&self.relayer().id)
196 .await?
197 {
198 info!(to_tx_id = %next.id, finished_tx_id = %finished_tx_id, "handing over lane");
200 lane_gate::pass_to(&self.relayer().id, finished_tx_id, &next.id);
201 self.send_transaction_request_job(&next, None).await?;
202 } else {
203 info!(finished_tx_id = %finished_tx_id, "releasing relayer lane");
204 lane_gate::free(&self.relayer().id, finished_tx_id);
205 }
206 }
207 Ok(())
208 }
209
210 async fn find_oldest_pending_for_relayer(
215 &self,
216 relayer_id: &str,
217 ) -> Result<Option<TransactionRepoModel>, TransactionError> {
218 let result = self
219 .transaction_repository()
220 .find_by_status_paginated(
221 relayer_id,
222 &[TransactionStatus::Pending],
223 PaginationQuery {
224 page: 1,
225 per_page: 1,
226 },
227 true, )
229 .await
230 .map_err(TransactionError::from)?;
231
232 Ok(result.items.into_iter().next())
234 }
235
236 pub async fn sync_sequence_from_chain(
239 &self,
240 relayer_address: &str,
241 ) -> Result<(), TransactionError> {
242 info!(address = %relayer_address, "syncing sequence number from chain");
243
244 let next_usable_seq = fetch_next_sequence_from_chain(self.provider(), relayer_address)
246 .await
247 .map_err(|e| {
248 warn!(
249 address = %relayer_address,
250 error = %e,
251 "failed to fetch sequence from chain in sync_sequence_from_chain"
252 );
253 TransactionError::UnexpectedError(format!(
254 "Failed to sync sequence from chain: {e}"
255 ))
256 })?;
257
258 self.transaction_counter_service()
260 .set(&self.relayer().id, relayer_address, next_usable_seq)
261 .await
262 .map_err(|e| {
263 TransactionError::UnexpectedError(format!("Failed to update sequence counter: {e}"))
264 })?;
265
266 info!(sequence = %next_usable_seq, "updated local sequence counter");
267 Ok(())
268 }
269
270 pub async fn reset_transaction_for_retry(
273 &self,
274 tx: TransactionRepoModel,
275 ) -> Result<TransactionRepoModel, TransactionError> {
276 info!("resetting transaction for retry through pipeline");
277
278 let update_req = tx.create_reset_update_request()?;
280
281 let reset_tx = self
283 .transaction_repository()
284 .partial_update(tx.id.clone(), update_req)
285 .await?;
286
287 info!("transaction reset successfully to pre-prepare state");
288 Ok(reset_tx)
289 }
290}
291
292#[async_trait]
293impl<R, T, J, S, P, C, D> Transaction for StellarRelayerTransaction<R, T, J, S, P, C, D>
294where
295 R: Repository<RelayerRepoModel, String> + Send + Sync,
296 T: TransactionRepository + Send + Sync,
297 J: JobProducerTrait + Send + Sync,
298 S: Signer + StellarSignTrait + Send + Sync,
299 P: StellarProviderTrait + Send + Sync,
300 C: TransactionCounterTrait + Send + Sync,
301 D: StellarDexServiceTrait + Send + Sync + 'static,
302{
303 async fn prepare_transaction(
304 &self,
305 tx: TransactionRepoModel,
306 ) -> Result<TransactionRepoModel, TransactionError> {
307 self.prepare_transaction_impl(tx).await
308 }
309
310 async fn submit_transaction(
311 &self,
312 tx: TransactionRepoModel,
313 ) -> Result<TransactionRepoModel, TransactionError> {
314 self.submit_transaction_impl(tx).await
315 }
316
317 async fn resubmit_transaction(
318 &self,
319 tx: TransactionRepoModel,
320 ) -> Result<TransactionRepoModel, TransactionError> {
321 Ok(tx)
322 }
323
324 async fn handle_transaction_status(
325 &self,
326 tx: TransactionRepoModel,
327 context: Option<StatusCheckContext>,
328 ) -> Result<TransactionRepoModel, TransactionError> {
329 self.handle_transaction_status_impl(tx, context).await
330 }
331
332 async fn cancel_transaction(
333 &self,
334 tx: TransactionRepoModel,
335 ) -> Result<TransactionRepoModel, TransactionError> {
336 Ok(tx)
337 }
338
339 async fn replace_transaction(
340 &self,
341 _old_tx: TransactionRepoModel,
342 _new_tx_request: NetworkTransactionRequest,
343 ) -> Result<TransactionRepoModel, TransactionError> {
344 Ok(_old_tx)
345 }
346
347 async fn sign_transaction(
348 &self,
349 tx: TransactionRepoModel,
350 ) -> Result<TransactionRepoModel, TransactionError> {
351 Ok(tx)
352 }
353
354 async fn validate_transaction(
355 &self,
356 _tx: TransactionRepoModel,
357 ) -> Result<bool, TransactionError> {
358 Ok(true)
359 }
360}
361
362pub type DefaultStellarTransaction = StellarRelayerTransaction<
363 RelayerRepositoryStorage,
364 TransactionRepositoryStorage,
365 JobProducer,
366 StellarSigner,
367 StellarProvider,
368 TransactionCounterRepositoryStorage,
369 StellarDexService<StellarProvider, StellarSigner>,
370>;
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375 use crate::repositories::transaction::RedisTransactionRepository;
376 use crate::utils::RedisConnections;
377 use crate::{
378 models::{NetworkTransactionData, RepositoryError},
379 services::provider::ProviderError,
380 };
381 use deadpool_redis::{Config, Runtime};
382 use std::sync::Arc;
383 use uuid::Uuid;
384
385 use crate::domain::transaction::stellar::test_helpers::*;
386
387 #[test]
388 fn new_returns_ok() {
389 let relayer = create_test_relayer();
390 let mocks = default_test_mocks();
391 let result = StellarRelayerTransaction::new(
392 relayer,
393 Arc::new(mocks.relayer_repo),
394 Arc::new(mocks.tx_repo),
395 Arc::new(mocks.job_producer),
396 Arc::new(mocks.signer),
397 mocks.provider,
398 Arc::new(mocks.counter),
399 Arc::new(mocks.dex_service),
400 );
401 assert!(result.is_ok());
402 }
403
404 #[test]
405 fn accessor_methods_return_correct_references() {
406 let relayer = create_test_relayer();
407 let mocks = default_test_mocks();
408 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
409
410 assert_eq!(handler.relayer().id, "relayer-1");
412 assert_eq!(handler.relayer().address, TEST_PK);
413
414 let _ = handler.provider();
416 let _ = handler.job_producer();
417 let _ = handler.transaction_repository();
418 let _ = handler.signer();
419 let _ = handler.transaction_counter_service();
420 }
421
422 #[tokio::test]
423 async fn send_transaction_request_job_success() {
424 let relayer = create_test_relayer();
425 let mut mocks = default_test_mocks();
426
427 mocks
428 .job_producer
429 .expect_produce_transaction_request_job()
430 .withf(|job, delay| {
431 job.transaction_id == "tx-1" && job.relayer_id == "relayer-1" && delay.is_none()
432 })
433 .times(1)
434 .returning(|_, _| Box::pin(async { Ok(()) }));
435
436 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
437 let tx = create_test_transaction(&relayer.id);
438
439 let result = handler.send_transaction_request_job(&tx, None).await;
440 assert!(result.is_ok());
441 }
442
443 #[tokio::test]
444 async fn send_transaction_request_job_with_delay() {
445 let relayer = create_test_relayer();
446 let mut mocks = default_test_mocks();
447
448 mocks
449 .job_producer
450 .expect_produce_transaction_request_job()
451 .withf(|job, delay| {
452 job.transaction_id == "tx-1"
453 && job.relayer_id == "relayer-1"
454 && delay.is_some()
455 && delay.unwrap() > chrono::Utc::now().timestamp()
456 })
457 .times(1)
458 .returning(|_, _| Box::pin(async { Ok(()) }));
459
460 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
461 let tx = create_test_transaction(&relayer.id);
462
463 let result = handler.send_transaction_request_job(&tx, Some(60)).await;
464 assert!(result.is_ok());
465 }
466
467 #[tokio::test]
468 async fn finalize_transaction_state_success() {
469 let relayer = create_test_relayer();
470 let mut mocks = default_test_mocks();
471
472 mocks
474 .tx_repo
475 .expect_partial_update()
476 .withf(|tx_id, update| {
477 tx_id == "tx-1"
478 && update.status == Some(TransactionStatus::Confirmed)
479 && update.status_reason == Some("Transaction confirmed".to_string())
480 })
481 .times(1)
482 .returning(|tx_id, update| {
483 let mut tx = create_test_transaction("relayer-1");
484 tx.id = tx_id;
485 tx.status = update.status.unwrap();
486 tx.status_reason = update.status_reason;
487 tx.confirmed_at = update.confirmed_at;
488 Ok::<_, RepositoryError>(tx)
489 });
490
491 mocks
493 .job_producer
494 .expect_produce_send_notification_job()
495 .times(1)
496 .returning(|_, _| Box::pin(async { Ok(()) }));
497
498 let handler = make_stellar_tx_handler(relayer, mocks);
499
500 let update_request = TransactionUpdateRequest {
501 status: Some(TransactionStatus::Confirmed),
502 status_reason: Some("Transaction confirmed".to_string()),
503 confirmed_at: Some("2023-01-01T00:00:00Z".to_string()),
504 ..Default::default()
505 };
506
507 let result = handler
508 .finalize_transaction_state("tx-1".to_string(), update_request)
509 .await;
510
511 assert!(result.is_ok());
512 let updated_tx = result.unwrap();
513 assert_eq!(updated_tx.status, TransactionStatus::Confirmed);
514 assert_eq!(
515 updated_tx.status_reason,
516 Some("Transaction confirmed".to_string())
517 );
518 }
519
520 #[tokio::test]
521 async fn enqueue_next_pending_transaction_with_pending_tx() {
522 let relayer = create_test_relayer();
523 let mut mocks = default_test_mocks();
524
525 let mut pending_tx = create_test_transaction(&relayer.id);
527 pending_tx.id = "pending-tx-1".to_string();
528
529 mocks
530 .tx_repo
531 .expect_find_by_status_paginated()
532 .withf(|_relayer_id, statuses, query, oldest_first| {
533 statuses == [TransactionStatus::Pending]
534 && query.page == 1
535 && query.per_page == 1
536 && *oldest_first
537 })
538 .times(1)
539 .returning(move |_, _, _, _| {
540 let mut tx = create_test_transaction("relayer-1");
541 tx.id = "pending-tx-1".to_string();
542 Ok(crate::repositories::PaginatedResult {
543 items: vec![tx],
544 total: 1,
545 page: 1,
546 per_page: 1,
547 })
548 });
549
550 mocks
552 .job_producer
553 .expect_produce_transaction_request_job()
554 .withf(|job, delay| job.transaction_id == "pending-tx-1" && delay.is_none())
555 .times(1)
556 .returning(|_, _| Box::pin(async { Ok(()) }));
557
558 let handler = make_stellar_tx_handler(relayer, mocks);
559
560 let result = handler
561 .enqueue_next_pending_transaction("finished-tx")
562 .await;
563 assert!(result.is_ok());
564 }
565
566 #[tokio::test]
567 async fn enqueue_next_pending_transaction_no_pending_tx() {
568 let relayer = create_test_relayer();
569 let mut mocks = default_test_mocks();
570
571 mocks
573 .tx_repo
574 .expect_find_by_status_paginated()
575 .withf(|_relayer_id, statuses, query, oldest_first| {
576 statuses == [TransactionStatus::Pending]
577 && query.page == 1
578 && query.per_page == 1
579 && *oldest_first
580 })
581 .times(1)
582 .returning(|_, _, _, _| {
583 Ok(crate::repositories::PaginatedResult {
584 items: vec![],
585 total: 0,
586 page: 1,
587 per_page: 1,
588 })
589 });
590
591 let handler = make_stellar_tx_handler(relayer, mocks);
592
593 let result = handler
594 .enqueue_next_pending_transaction("finished-tx")
595 .await;
596 assert!(result.is_ok());
597 }
598
599 #[tokio::test]
600 async fn test_sync_sequence_from_chain() {
601 let relayer = create_test_relayer();
602 let mut mocks = default_test_mocks();
603
604 mocks
606 .provider
607 .expect_get_account()
608 .withf(|addr| addr == TEST_PK)
609 .times(1)
610 .returning(|_| {
611 Box::pin(async {
612 use soroban_rs::xdr::{
613 AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
614 String32, Thresholds, Uint256,
615 };
616 use stellar_strkey::ed25519;
617
618 let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
620 let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
621
622 Ok(AccountEntry {
623 account_id,
624 balance: 1000000,
625 seq_num: SequenceNumber(100),
626 num_sub_entries: 0,
627 inflation_dest: None,
628 flags: 0,
629 home_domain: String32::default(),
630 thresholds: Thresholds([1, 1, 1, 1]),
631 signers: Default::default(),
632 ext: AccountEntryExt::V0,
633 })
634 })
635 });
636
637 mocks
639 .counter
640 .expect_set()
641 .withf(|relayer_id, addr, seq| {
642 relayer_id == "relayer-1" && addr == TEST_PK && *seq == 101
643 })
644 .times(1)
645 .returning(|_, _, _| Box::pin(async { Ok(()) }));
646
647 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
648
649 let result = handler.sync_sequence_from_chain(&relayer.address).await;
650 assert!(result.is_ok());
651 }
652
653 #[tokio::test]
654 async fn test_sync_sequence_from_chain_provider_error() {
655 let relayer = create_test_relayer();
656 let mut mocks = default_test_mocks();
657
658 mocks.provider.expect_get_account().times(1).returning(|_| {
660 Box::pin(async { Err(ProviderError::Other("Account not found".to_string())) })
661 });
662
663 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
664
665 let result = handler.sync_sequence_from_chain(&relayer.address).await;
666 assert!(result.is_err());
667 match result.unwrap_err() {
668 TransactionError::UnexpectedError(msg) => {
669 assert!(msg.contains("Failed to fetch account from chain"));
670 }
671 _ => panic!("Expected UnexpectedError"),
672 }
673 }
674
675 #[tokio::test]
676 async fn test_sync_sequence_from_chain_counter_error() {
677 let relayer = create_test_relayer();
678 let mut mocks = default_test_mocks();
679
680 mocks.provider.expect_get_account().times(1).returning(|_| {
682 Box::pin(async {
683 use soroban_rs::xdr::{
684 AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber, String32,
685 Thresholds, Uint256,
686 };
687 use stellar_strkey::ed25519;
688
689 let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
691 let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
692
693 Ok(AccountEntry {
694 account_id,
695 balance: 1000000,
696 seq_num: SequenceNumber(100),
697 num_sub_entries: 0,
698 inflation_dest: None,
699 flags: 0,
700 home_domain: String32::default(),
701 thresholds: Thresholds([1, 1, 1, 1]),
702 signers: Default::default(),
703 ext: AccountEntryExt::V0,
704 })
705 })
706 });
707
708 mocks.counter.expect_set().times(1).returning(|_, _, _| {
710 Box::pin(async {
711 Err(RepositoryError::Unknown(
712 "Counter update failed".to_string(),
713 ))
714 })
715 });
716
717 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
718
719 let result = handler.sync_sequence_from_chain(&relayer.address).await;
720 assert!(result.is_err());
721 match result.unwrap_err() {
722 TransactionError::UnexpectedError(msg) => {
723 assert!(msg.contains("Failed to update sequence counter"));
724 }
725 _ => panic!("Expected UnexpectedError"),
726 }
727 }
728
729 #[test]
730 fn test_concurrent_transactions_enabled() {
731 let mut relayer = create_test_relayer();
733 if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
734 policy.concurrent_transactions = Some(true);
735 }
736 let mocks = default_test_mocks();
737 let handler = make_stellar_tx_handler(relayer, mocks);
738 assert!(handler.concurrent_transactions_enabled());
739
740 let mut relayer = create_test_relayer();
742 if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
743 policy.concurrent_transactions = Some(false);
744 }
745 let mocks = default_test_mocks();
746 let handler = make_stellar_tx_handler(relayer, mocks);
747 assert!(!handler.concurrent_transactions_enabled());
748
749 let relayer = create_test_relayer();
751 let mocks = default_test_mocks();
752 let handler = make_stellar_tx_handler(relayer, mocks);
753 assert_eq!(
754 handler.concurrent_transactions_enabled(),
755 DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS
756 );
757 }
758
759 #[tokio::test]
760 async fn test_enqueue_next_pending_transaction_with_concurrency_enabled() {
761 let mut relayer = create_test_relayer();
763 if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
764 policy.concurrent_transactions = Some(true);
765 }
766 let mut mocks = default_test_mocks();
767
768 mocks.tx_repo.expect_find_by_status_paginated().times(0); mocks
773 .job_producer
774 .expect_produce_transaction_request_job()
775 .times(0); let handler = make_stellar_tx_handler(relayer, mocks);
778
779 let result = handler
780 .enqueue_next_pending_transaction("finished-tx")
781 .await;
782 assert!(result.is_ok());
783 }
784
785 #[tokio::test]
786 async fn test_reset_transaction_for_retry() {
787 let relayer = create_test_relayer();
788 let mut mocks = default_test_mocks();
789
790 let mut tx = create_test_transaction(&relayer.id);
792 if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
793 data.sequence_number = Some(42);
794 data.signatures.push(dummy_signature());
795 data.hash = Some("test-hash".to_string());
796 data.signed_envelope_xdr = Some("test-xdr".to_string());
797 }
798
799 mocks
801 .tx_repo
802 .expect_partial_update()
803 .withf(|tx_id, upd| {
804 tx_id == "tx-1"
805 && upd.status == Some(TransactionStatus::Pending)
806 && upd.sent_at.is_none()
807 && upd.confirmed_at.is_none()
808 })
809 .times(1)
810 .returning(|id, upd| {
811 let mut tx = create_test_transaction("relayer-1");
812 tx.id = id;
813 tx.status = upd.status.unwrap();
814 if let Some(network_data) = upd.network_data {
815 tx.network_data = network_data;
816 }
817 Ok::<_, RepositoryError>(tx)
818 });
819
820 let handler = make_stellar_tx_handler(relayer.clone(), mocks);
821
822 let result = handler.reset_transaction_for_retry(tx).await;
823 assert!(result.is_ok());
824
825 let reset_tx = result.unwrap();
826 assert_eq!(reset_tx.status, TransactionStatus::Pending);
827
828 if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
830 assert!(data.sequence_number.is_none());
831 assert!(data.signatures.is_empty());
832 assert!(data.hash.is_none());
833 assert!(data.signed_envelope_xdr.is_none());
834 } else {
835 panic!("Expected Stellar transaction data");
836 }
837 }
838
839 #[tokio::test]
840 #[ignore = "Requires active Redis instance"]
841 async fn test_find_oldest_pending_for_relayer_with_redis() {
842 let redis_url = std::env::var("REDIS_TEST_URL")
844 .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
845 let pool = Arc::new(
846 Config::from_url(&redis_url)
847 .builder()
848 .expect("Failed to create Redis pool builder")
849 .max_size(16)
850 .runtime(Runtime::Tokio1)
851 .build()
852 .expect("Failed to build Redis pool"),
853 );
854 let connections = Arc::new(RedisConnections::new_single_pool(pool));
855
856 let random_id = Uuid::new_v4().to_string();
857 let key_prefix = format!("test_stellar:{random_id}");
858 let tx_repo = Arc::new(
859 RedisTransactionRepository::new(connections, key_prefix)
860 .expect("Failed to create RedisTransactionRepository"),
861 );
862
863 let relayer_id = format!("relayer-{}", Uuid::new_v4());
864
865 let mut tx1 = create_test_transaction(&relayer_id);
868 tx1.id = format!("tx-1-{}", Uuid::new_v4());
869 tx1.status = TransactionStatus::Pending;
870 tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string();
871
872 let mut tx2 = create_test_transaction(&relayer_id);
874 tx2.id = format!("tx-2-{}", Uuid::new_v4());
875 tx2.status = TransactionStatus::Pending;
876 tx2.created_at = "2025-01-27T11:00:00.000000+00:00".to_string();
877
878 let mut tx3 = create_test_transaction(&relayer_id);
880 tx3.id = format!("tx-3-{}", Uuid::new_v4());
881 tx3.status = TransactionStatus::Pending;
882 tx3.created_at = "2025-01-27T12:00:00.000000+00:00".to_string();
883
884 tx_repo.create(tx1.clone()).await.unwrap();
886 tx_repo.create(tx2.clone()).await.unwrap();
887 tx_repo.create(tx3.clone()).await.unwrap();
888
889 let relayer = create_test_relayer();
892 let mut relayer_model = relayer.clone();
893 relayer_model.id = relayer_id.clone();
894
895 let mocks = default_test_mocks();
896 let handler = StellarRelayerTransaction::new(
897 relayer_model,
898 Arc::new(mocks.relayer_repo),
899 tx_repo.clone(),
900 Arc::new(mocks.job_producer),
901 Arc::new(mocks.signer),
902 mocks.provider,
903 Arc::new(mocks.counter),
904 Arc::new(mocks.dex_service),
905 )
906 .unwrap();
907
908 let result = handler
910 .find_oldest_pending_for_relayer(&relayer_id)
911 .await
912 .unwrap();
913
914 assert!(result.is_some(), "Should find a pending transaction");
916 let found_tx = result.unwrap();
917
918 assert_eq!(
919 found_tx.id, tx1.id,
920 "Should get oldest transaction (tx1) - oldest_first=true so .next() yields oldest"
921 );
922 assert_eq!(
923 found_tx.created_at, tx1.created_at,
924 "Should match oldest transaction's created_at"
925 );
926
927 let _ = tx_repo.delete_by_id(tx1.id.clone()).await;
929 let _ = tx_repo.delete_by_id(tx2.id.clone()).await;
930 let _ = tx_repo.delete_by_id(tx3.id.clone()).await;
931 }
932
933 #[tokio::test]
934 async fn test_find_oldest_pending_for_relayer_with_in_memory() {
935 use crate::repositories::transaction::InMemoryTransactionRepository;
936 use uuid::Uuid;
937
938 let tx_repo = Arc::new(InMemoryTransactionRepository::new());
940
941 let relayer_id = format!("relayer-{}", Uuid::new_v4());
942
943 let mut tx1 = create_test_transaction(&relayer_id);
946 tx1.id = format!("tx-1-{}", Uuid::new_v4());
947 tx1.status = TransactionStatus::Pending;
948 tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string();
949
950 let mut tx2 = create_test_transaction(&relayer_id);
952 tx2.id = format!("tx-2-{}", Uuid::new_v4());
953 tx2.status = TransactionStatus::Pending;
954 tx2.created_at = "2025-01-27T11:00:00.000000+00:00".to_string();
955
956 let mut tx3 = create_test_transaction(&relayer_id);
958 tx3.id = format!("tx-3-{}", Uuid::new_v4());
959 tx3.status = TransactionStatus::Pending;
960 tx3.created_at = "2025-01-27T12:00:00.000000+00:00".to_string();
961
962 tx_repo.create(tx1.clone()).await.unwrap();
964 tx_repo.create(tx2.clone()).await.unwrap();
965 tx_repo.create(tx3.clone()).await.unwrap();
966
967 let relayer = create_test_relayer();
970 let mut relayer_model = relayer.clone();
971 relayer_model.id = relayer_id.clone();
972
973 let mocks = default_test_mocks();
974 let handler = StellarRelayerTransaction::new(
975 relayer_model,
976 Arc::new(mocks.relayer_repo),
977 tx_repo.clone(),
978 Arc::new(mocks.job_producer),
979 Arc::new(mocks.signer),
980 mocks.provider,
981 Arc::new(mocks.counter),
982 Arc::new(mocks.dex_service),
983 )
984 .unwrap();
985
986 let result = handler
988 .find_oldest_pending_for_relayer(&relayer_id)
989 .await
990 .unwrap();
991
992 assert!(result.is_some(), "Should find a pending transaction");
994 let found_tx = result.unwrap();
995
996 assert_eq!(
998 found_tx.id, tx1.id,
999 "Should get oldest transaction (tx1) - oldest_first=true so .next() yields oldest"
1000 );
1001 assert_eq!(
1002 found_tx.created_at, tx1.created_at,
1003 "Should match oldest transaction's created_at"
1004 );
1005 }
1006}