openzeppelin_relayer/domain/transaction/stellar/
stellar_transaction.rs

1/// This module defines the `StellarRelayerTransaction` struct and its associated
2/// functionality for handling Stellar transactions.
3/// It includes methods for preparing, submitting, handling status, and
4/// managing notifications for transactions. The module leverages various
5/// services and repositories to perform these operations asynchronously.
6use crate::{
7    constants::DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS,
8    domain::transaction::{stellar::fetch_next_sequence_from_chain, Transaction},
9    jobs::{JobProducer, JobProducerTrait, StatusCheckContext, TransactionRequest},
10    models::{
11        produce_transaction_update_notification_payload, NetworkTransactionRequest,
12        PaginationQuery, RelayerNetworkPolicy, RelayerRepoModel, TransactionError,
13        TransactionRepoModel, TransactionStatus, TransactionUpdateRequest,
14    },
15    repositories::{
16        RelayerRepositoryStorage, Repository, TransactionCounterRepositoryStorage,
17        TransactionCounterTrait, TransactionRepository, TransactionRepositoryStorage,
18    },
19    services::{
20        provider::{StellarProvider, StellarProviderTrait},
21        signer::{Signer, StellarSignTrait, StellarSigner},
22        stellar_dex::{StellarDexService, StellarDexServiceTrait},
23    },
24    utils::calculate_scheduled_timestamp,
25};
26use async_trait::async_trait;
27use std::sync::Arc;
28use tracing::{error, info, warn};
29
30use super::lane_gate;
31
32#[allow(dead_code)]
33pub struct StellarRelayerTransaction<R, T, J, S, P, C, D>
34where
35    R: Repository<RelayerRepoModel, String>,
36    T: TransactionRepository,
37    J: JobProducerTrait,
38    S: Signer + StellarSignTrait,
39    P: StellarProviderTrait,
40    C: TransactionCounterTrait,
41    D: StellarDexServiceTrait + Send + Sync + 'static,
42{
43    relayer: RelayerRepoModel,
44    relayer_repository: Arc<R>,
45    transaction_repository: Arc<T>,
46    job_producer: Arc<J>,
47    signer: Arc<S>,
48    provider: P,
49    transaction_counter_service: Arc<C>,
50    dex_service: Arc<D>,
51}
52
53#[allow(dead_code)]
54impl<R, T, J, S, P, C, D> StellarRelayerTransaction<R, T, J, S, P, C, D>
55where
56    R: Repository<RelayerRepoModel, String>,
57    T: TransactionRepository,
58    J: JobProducerTrait,
59    S: Signer + StellarSignTrait,
60    P: StellarProviderTrait,
61    C: TransactionCounterTrait,
62    D: StellarDexServiceTrait + Send + Sync + 'static,
63{
64    /// Creates a new `StellarRelayerTransaction`.
65    ///
66    /// # Arguments
67    ///
68    /// * `relayer` - The relayer model.
69    /// * `relayer_repository` - Storage for relayer repository.
70    /// * `transaction_repository` - Storage for transaction repository.
71    /// * `job_producer` - Producer for job queue.
72    /// * `signer` - The Stellar signer.
73    /// * `provider` - The Stellar provider.
74    /// * `transaction_counter_service` - Service for managing transaction counters.
75    /// * `dex_service` - The DEX service implementation for swap operations and validations.
76    ///
77    /// # Returns
78    ///
79    /// A result containing the new `StellarRelayerTransaction` or a `TransactionError`.
80    #[allow(clippy::too_many_arguments)]
81    pub fn new(
82        relayer: RelayerRepoModel,
83        relayer_repository: Arc<R>,
84        transaction_repository: Arc<T>,
85        job_producer: Arc<J>,
86        signer: Arc<S>,
87        provider: P,
88        transaction_counter_service: Arc<C>,
89        dex_service: Arc<D>,
90    ) -> Result<Self, TransactionError> {
91        Ok(Self {
92            relayer,
93            relayer_repository,
94            transaction_repository,
95            job_producer,
96            signer,
97            provider,
98            transaction_counter_service,
99            dex_service,
100        })
101    }
102
103    pub fn provider(&self) -> &P {
104        &self.provider
105    }
106
107    pub fn relayer(&self) -> &RelayerRepoModel {
108        &self.relayer
109    }
110
111    pub fn job_producer(&self) -> &J {
112        &self.job_producer
113    }
114
115    pub fn transaction_repository(&self) -> &T {
116        &self.transaction_repository
117    }
118
119    pub fn signer(&self) -> &S {
120        &self.signer
121    }
122
123    pub fn transaction_counter_service(&self) -> &C {
124        &self.transaction_counter_service
125    }
126
127    pub fn dex_service(&self) -> &D {
128        &self.dex_service
129    }
130
131    pub fn concurrent_transactions_enabled(&self) -> bool {
132        if let RelayerNetworkPolicy::Stellar(policy) = &self.relayer().policies {
133            policy
134                .concurrent_transactions
135                .unwrap_or(DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS)
136        } else {
137            DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS
138        }
139    }
140
141    /// Send a transaction-request job for the given transaction.
142    pub async fn send_transaction_request_job(
143        &self,
144        tx: &TransactionRepoModel,
145        delay_seconds: Option<i64>,
146    ) -> Result<(), TransactionError> {
147        let job = TransactionRequest::new(tx.id.clone(), tx.relayer_id.clone());
148        let scheduled_on = delay_seconds.map(calculate_scheduled_timestamp);
149        self.job_producer()
150            .produce_transaction_request_job(job, scheduled_on)
151            .await?;
152        Ok(())
153    }
154
155    /// Sends a transaction update notification if a notification ID is configured.
156    ///
157    /// This is a best-effort operation that logs errors but does not propagate them,
158    /// as notification failures should not affect the transaction lifecycle.
159    pub(super) async fn send_transaction_update_notification(&self, tx: &TransactionRepoModel) {
160        if let Some(notification_id) = &self.relayer().notification_id {
161            if let Err(e) = self
162                .job_producer()
163                .produce_send_notification_job(
164                    produce_transaction_update_notification_payload(notification_id, tx),
165                    None,
166                )
167                .await
168            {
169                error!(error = %e, "failed to produce notification job");
170            }
171        }
172    }
173
174    /// Helper function to update transaction status, save it, and send a notification.
175    pub async fn finalize_transaction_state(
176        &self,
177        tx_id: String,
178        update_req: TransactionUpdateRequest,
179    ) -> Result<TransactionRepoModel, TransactionError> {
180        let updated_tx = self
181            .transaction_repository()
182            .partial_update(tx_id, update_req)
183            .await?;
184
185        self.send_transaction_update_notification(&updated_tx).await;
186        Ok(updated_tx)
187    }
188
189    pub async fn enqueue_next_pending_transaction(
190        &self,
191        finished_tx_id: &str,
192    ) -> Result<(), TransactionError> {
193        if !self.concurrent_transactions_enabled() {
194            if let Some(next) = self
195                .find_oldest_pending_for_relayer(&self.relayer().id)
196                .await?
197            {
198                // Atomic hand-over while still owning the lane
199                info!(to_tx_id = %next.id, finished_tx_id = %finished_tx_id, "handing over lane");
200                lane_gate::pass_to(&self.relayer().id, finished_tx_id, &next.id);
201                self.send_transaction_request_job(&next, None).await?;
202            } else {
203                info!(finished_tx_id = %finished_tx_id, "releasing relayer lane");
204                lane_gate::free(&self.relayer().id, finished_tx_id);
205            }
206        }
207        Ok(())
208    }
209
210    /// Finds the oldest pending transaction for a relayer.
211    ///
212    /// Uses optimized paginated query with `oldest_first: true` and `per_page: 1`
213    /// to fetch only the single oldest pending transaction from Redis in O(log N).
214    async fn find_oldest_pending_for_relayer(
215        &self,
216        relayer_id: &str,
217    ) -> Result<Option<TransactionRepoModel>, TransactionError> {
218        let result = self
219            .transaction_repository()
220            .find_by_status_paginated(
221                relayer_id,
222                &[TransactionStatus::Pending],
223                PaginationQuery {
224                    page: 1,
225                    per_page: 1,
226                },
227                true, // oldest_first=true - query returns oldest transaction first
228            )
229            .await
230            .map_err(TransactionError::from)?;
231
232        // oldest_first=true so .next() yields the oldest pending transaction (FIFO order)
233        Ok(result.items.into_iter().next())
234    }
235
236    /// Syncs the sequence number from the blockchain for the relayer's address.
237    /// This fetches the on-chain sequence number and updates the local counter to the next usable value.
238    pub async fn sync_sequence_from_chain(
239        &self,
240        relayer_address: &str,
241    ) -> Result<(), TransactionError> {
242        info!(address = %relayer_address, "syncing sequence number from chain");
243
244        // Use the shared helper to fetch the next sequence
245        let next_usable_seq = fetch_next_sequence_from_chain(self.provider(), relayer_address)
246            .await
247            .map_err(|e| {
248                warn!(
249                    address = %relayer_address,
250                    error = %e,
251                    "failed to fetch sequence from chain in sync_sequence_from_chain"
252                );
253                TransactionError::UnexpectedError(format!(
254                    "Failed to sync sequence from chain: {e}"
255                ))
256            })?;
257
258        // Update the local counter to the next usable sequence
259        self.transaction_counter_service()
260            .set(&self.relayer().id, relayer_address, next_usable_seq)
261            .await
262            .map_err(|e| {
263                TransactionError::UnexpectedError(format!("Failed to update sequence counter: {e}"))
264            })?;
265
266        info!(sequence = %next_usable_seq, "updated local sequence counter");
267        Ok(())
268    }
269
270    /// Resets a transaction to its pre-prepare state for reprocessing through the pipeline.
271    /// This is used when a transaction fails with a bad sequence error and needs to be retried.
272    pub async fn reset_transaction_for_retry(
273        &self,
274        tx: TransactionRepoModel,
275    ) -> Result<TransactionRepoModel, TransactionError> {
276        info!("resetting transaction for retry through pipeline");
277
278        // Use the model's built-in reset method
279        let update_req = tx.create_reset_update_request()?;
280
281        // Update the transaction
282        let reset_tx = self
283            .transaction_repository()
284            .partial_update(tx.id.clone(), update_req)
285            .await?;
286
287        info!("transaction reset successfully to pre-prepare state");
288        Ok(reset_tx)
289    }
290}
291
292#[async_trait]
293impl<R, T, J, S, P, C, D> Transaction for StellarRelayerTransaction<R, T, J, S, P, C, D>
294where
295    R: Repository<RelayerRepoModel, String> + Send + Sync,
296    T: TransactionRepository + Send + Sync,
297    J: JobProducerTrait + Send + Sync,
298    S: Signer + StellarSignTrait + Send + Sync,
299    P: StellarProviderTrait + Send + Sync,
300    C: TransactionCounterTrait + Send + Sync,
301    D: StellarDexServiceTrait + Send + Sync + 'static,
302{
303    async fn prepare_transaction(
304        &self,
305        tx: TransactionRepoModel,
306    ) -> Result<TransactionRepoModel, TransactionError> {
307        self.prepare_transaction_impl(tx).await
308    }
309
310    async fn submit_transaction(
311        &self,
312        tx: TransactionRepoModel,
313    ) -> Result<TransactionRepoModel, TransactionError> {
314        self.submit_transaction_impl(tx).await
315    }
316
317    async fn resubmit_transaction(
318        &self,
319        tx: TransactionRepoModel,
320    ) -> Result<TransactionRepoModel, TransactionError> {
321        Ok(tx)
322    }
323
324    async fn handle_transaction_status(
325        &self,
326        tx: TransactionRepoModel,
327        context: Option<StatusCheckContext>,
328    ) -> Result<TransactionRepoModel, TransactionError> {
329        self.handle_transaction_status_impl(tx, context).await
330    }
331
332    async fn cancel_transaction(
333        &self,
334        tx: TransactionRepoModel,
335    ) -> Result<TransactionRepoModel, TransactionError> {
336        Ok(tx)
337    }
338
339    async fn replace_transaction(
340        &self,
341        _old_tx: TransactionRepoModel,
342        _new_tx_request: NetworkTransactionRequest,
343    ) -> Result<TransactionRepoModel, TransactionError> {
344        Ok(_old_tx)
345    }
346
347    async fn sign_transaction(
348        &self,
349        tx: TransactionRepoModel,
350    ) -> Result<TransactionRepoModel, TransactionError> {
351        Ok(tx)
352    }
353
354    async fn validate_transaction(
355        &self,
356        _tx: TransactionRepoModel,
357    ) -> Result<bool, TransactionError> {
358        Ok(true)
359    }
360}
361
362pub type DefaultStellarTransaction = StellarRelayerTransaction<
363    RelayerRepositoryStorage,
364    TransactionRepositoryStorage,
365    JobProducer,
366    StellarSigner,
367    StellarProvider,
368    TransactionCounterRepositoryStorage,
369    StellarDexService<StellarProvider, StellarSigner>,
370>;
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use crate::repositories::transaction::RedisTransactionRepository;
376    use crate::utils::RedisConnections;
377    use crate::{
378        models::{NetworkTransactionData, RepositoryError},
379        services::provider::ProviderError,
380    };
381    use deadpool_redis::{Config, Runtime};
382    use std::sync::Arc;
383    use uuid::Uuid;
384
385    use crate::domain::transaction::stellar::test_helpers::*;
386
387    #[test]
388    fn new_returns_ok() {
389        let relayer = create_test_relayer();
390        let mocks = default_test_mocks();
391        let result = StellarRelayerTransaction::new(
392            relayer,
393            Arc::new(mocks.relayer_repo),
394            Arc::new(mocks.tx_repo),
395            Arc::new(mocks.job_producer),
396            Arc::new(mocks.signer),
397            mocks.provider,
398            Arc::new(mocks.counter),
399            Arc::new(mocks.dex_service),
400        );
401        assert!(result.is_ok());
402    }
403
404    #[test]
405    fn accessor_methods_return_correct_references() {
406        let relayer = create_test_relayer();
407        let mocks = default_test_mocks();
408        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
409
410        // Test all accessor methods
411        assert_eq!(handler.relayer().id, "relayer-1");
412        assert_eq!(handler.relayer().address, TEST_PK);
413
414        // These should not panic and return valid references
415        let _ = handler.provider();
416        let _ = handler.job_producer();
417        let _ = handler.transaction_repository();
418        let _ = handler.signer();
419        let _ = handler.transaction_counter_service();
420    }
421
422    #[tokio::test]
423    async fn send_transaction_request_job_success() {
424        let relayer = create_test_relayer();
425        let mut mocks = default_test_mocks();
426
427        mocks
428            .job_producer
429            .expect_produce_transaction_request_job()
430            .withf(|job, delay| {
431                job.transaction_id == "tx-1" && job.relayer_id == "relayer-1" && delay.is_none()
432            })
433            .times(1)
434            .returning(|_, _| Box::pin(async { Ok(()) }));
435
436        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
437        let tx = create_test_transaction(&relayer.id);
438
439        let result = handler.send_transaction_request_job(&tx, None).await;
440        assert!(result.is_ok());
441    }
442
443    #[tokio::test]
444    async fn send_transaction_request_job_with_delay() {
445        let relayer = create_test_relayer();
446        let mut mocks = default_test_mocks();
447
448        mocks
449            .job_producer
450            .expect_produce_transaction_request_job()
451            .withf(|job, delay| {
452                job.transaction_id == "tx-1"
453                    && job.relayer_id == "relayer-1"
454                    && delay.is_some()
455                    && delay.unwrap() > chrono::Utc::now().timestamp()
456            })
457            .times(1)
458            .returning(|_, _| Box::pin(async { Ok(()) }));
459
460        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
461        let tx = create_test_transaction(&relayer.id);
462
463        let result = handler.send_transaction_request_job(&tx, Some(60)).await;
464        assert!(result.is_ok());
465    }
466
467    #[tokio::test]
468    async fn finalize_transaction_state_success() {
469        let relayer = create_test_relayer();
470        let mut mocks = default_test_mocks();
471
472        // Mock repository update
473        mocks
474            .tx_repo
475            .expect_partial_update()
476            .withf(|tx_id, update| {
477                tx_id == "tx-1"
478                    && update.status == Some(TransactionStatus::Confirmed)
479                    && update.status_reason == Some("Transaction confirmed".to_string())
480            })
481            .times(1)
482            .returning(|tx_id, update| {
483                let mut tx = create_test_transaction("relayer-1");
484                tx.id = tx_id;
485                tx.status = update.status.unwrap();
486                tx.status_reason = update.status_reason;
487                tx.confirmed_at = update.confirmed_at;
488                Ok::<_, RepositoryError>(tx)
489            });
490
491        // Mock notification
492        mocks
493            .job_producer
494            .expect_produce_send_notification_job()
495            .times(1)
496            .returning(|_, _| Box::pin(async { Ok(()) }));
497
498        let handler = make_stellar_tx_handler(relayer, mocks);
499
500        let update_request = TransactionUpdateRequest {
501            status: Some(TransactionStatus::Confirmed),
502            status_reason: Some("Transaction confirmed".to_string()),
503            confirmed_at: Some("2023-01-01T00:00:00Z".to_string()),
504            ..Default::default()
505        };
506
507        let result = handler
508            .finalize_transaction_state("tx-1".to_string(), update_request)
509            .await;
510
511        assert!(result.is_ok());
512        let updated_tx = result.unwrap();
513        assert_eq!(updated_tx.status, TransactionStatus::Confirmed);
514        assert_eq!(
515            updated_tx.status_reason,
516            Some("Transaction confirmed".to_string())
517        );
518    }
519
520    #[tokio::test]
521    async fn enqueue_next_pending_transaction_with_pending_tx() {
522        let relayer = create_test_relayer();
523        let mut mocks = default_test_mocks();
524
525        // Mock finding a pending transaction
526        let mut pending_tx = create_test_transaction(&relayer.id);
527        pending_tx.id = "pending-tx-1".to_string();
528
529        mocks
530            .tx_repo
531            .expect_find_by_status_paginated()
532            .withf(|_relayer_id, statuses, query, oldest_first| {
533                statuses == [TransactionStatus::Pending]
534                    && query.page == 1
535                    && query.per_page == 1
536                    && *oldest_first
537            })
538            .times(1)
539            .returning(move |_, _, _, _| {
540                let mut tx = create_test_transaction("relayer-1");
541                tx.id = "pending-tx-1".to_string();
542                Ok(crate::repositories::PaginatedResult {
543                    items: vec![tx],
544                    total: 1,
545                    page: 1,
546                    per_page: 1,
547                })
548            });
549
550        // Mock job production for the next transaction
551        mocks
552            .job_producer
553            .expect_produce_transaction_request_job()
554            .withf(|job, delay| job.transaction_id == "pending-tx-1" && delay.is_none())
555            .times(1)
556            .returning(|_, _| Box::pin(async { Ok(()) }));
557
558        let handler = make_stellar_tx_handler(relayer, mocks);
559
560        let result = handler
561            .enqueue_next_pending_transaction("finished-tx")
562            .await;
563        assert!(result.is_ok());
564    }
565
566    #[tokio::test]
567    async fn enqueue_next_pending_transaction_no_pending_tx() {
568        let relayer = create_test_relayer();
569        let mut mocks = default_test_mocks();
570
571        // Mock finding no pending transactions
572        mocks
573            .tx_repo
574            .expect_find_by_status_paginated()
575            .withf(|_relayer_id, statuses, query, oldest_first| {
576                statuses == [TransactionStatus::Pending]
577                    && query.page == 1
578                    && query.per_page == 1
579                    && *oldest_first
580            })
581            .times(1)
582            .returning(|_, _, _, _| {
583                Ok(crate::repositories::PaginatedResult {
584                    items: vec![],
585                    total: 0,
586                    page: 1,
587                    per_page: 1,
588                })
589            });
590
591        let handler = make_stellar_tx_handler(relayer, mocks);
592
593        let result = handler
594            .enqueue_next_pending_transaction("finished-tx")
595            .await;
596        assert!(result.is_ok());
597    }
598
599    #[tokio::test]
600    async fn test_sync_sequence_from_chain() {
601        let relayer = create_test_relayer();
602        let mut mocks = default_test_mocks();
603
604        // Mock provider to return account with sequence 100
605        mocks
606            .provider
607            .expect_get_account()
608            .withf(|addr| addr == TEST_PK)
609            .times(1)
610            .returning(|_| {
611                Box::pin(async {
612                    use soroban_rs::xdr::{
613                        AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber,
614                        String32, Thresholds, Uint256,
615                    };
616                    use stellar_strkey::ed25519;
617
618                    // Create a dummy public key for account ID
619                    let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
620                    let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
621
622                    Ok(AccountEntry {
623                        account_id,
624                        balance: 1000000,
625                        seq_num: SequenceNumber(100),
626                        num_sub_entries: 0,
627                        inflation_dest: None,
628                        flags: 0,
629                        home_domain: String32::default(),
630                        thresholds: Thresholds([1, 1, 1, 1]),
631                        signers: Default::default(),
632                        ext: AccountEntryExt::V0,
633                    })
634                })
635            });
636
637        // Mock counter set to verify it's called with next usable sequence (101)
638        mocks
639            .counter
640            .expect_set()
641            .withf(|relayer_id, addr, seq| {
642                relayer_id == "relayer-1" && addr == TEST_PK && *seq == 101
643            })
644            .times(1)
645            .returning(|_, _, _| Box::pin(async { Ok(()) }));
646
647        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
648
649        let result = handler.sync_sequence_from_chain(&relayer.address).await;
650        assert!(result.is_ok());
651    }
652
653    #[tokio::test]
654    async fn test_sync_sequence_from_chain_provider_error() {
655        let relayer = create_test_relayer();
656        let mut mocks = default_test_mocks();
657
658        // Mock provider to fail
659        mocks.provider.expect_get_account().times(1).returning(|_| {
660            Box::pin(async { Err(ProviderError::Other("Account not found".to_string())) })
661        });
662
663        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
664
665        let result = handler.sync_sequence_from_chain(&relayer.address).await;
666        assert!(result.is_err());
667        match result.unwrap_err() {
668            TransactionError::UnexpectedError(msg) => {
669                assert!(msg.contains("Failed to fetch account from chain"));
670            }
671            _ => panic!("Expected UnexpectedError"),
672        }
673    }
674
675    #[tokio::test]
676    async fn test_sync_sequence_from_chain_counter_error() {
677        let relayer = create_test_relayer();
678        let mut mocks = default_test_mocks();
679
680        // Mock provider success
681        mocks.provider.expect_get_account().times(1).returning(|_| {
682            Box::pin(async {
683                use soroban_rs::xdr::{
684                    AccountEntry, AccountEntryExt, AccountId, PublicKey, SequenceNumber, String32,
685                    Thresholds, Uint256,
686                };
687                use stellar_strkey::ed25519;
688
689                // Create a dummy public key for account ID
690                let pk = ed25519::PublicKey::from_string(TEST_PK).unwrap();
691                let account_id = AccountId(PublicKey::PublicKeyTypeEd25519(Uint256(pk.0)));
692
693                Ok(AccountEntry {
694                    account_id,
695                    balance: 1000000,
696                    seq_num: SequenceNumber(100),
697                    num_sub_entries: 0,
698                    inflation_dest: None,
699                    flags: 0,
700                    home_domain: String32::default(),
701                    thresholds: Thresholds([1, 1, 1, 1]),
702                    signers: Default::default(),
703                    ext: AccountEntryExt::V0,
704                })
705            })
706        });
707
708        // Mock counter set to fail
709        mocks.counter.expect_set().times(1).returning(|_, _, _| {
710            Box::pin(async {
711                Err(RepositoryError::Unknown(
712                    "Counter update failed".to_string(),
713                ))
714            })
715        });
716
717        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
718
719        let result = handler.sync_sequence_from_chain(&relayer.address).await;
720        assert!(result.is_err());
721        match result.unwrap_err() {
722            TransactionError::UnexpectedError(msg) => {
723                assert!(msg.contains("Failed to update sequence counter"));
724            }
725            _ => panic!("Expected UnexpectedError"),
726        }
727    }
728
729    #[test]
730    fn test_concurrent_transactions_enabled() {
731        // Test with concurrent transactions explicitly enabled
732        let mut relayer = create_test_relayer();
733        if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
734            policy.concurrent_transactions = Some(true);
735        }
736        let mocks = default_test_mocks();
737        let handler = make_stellar_tx_handler(relayer, mocks);
738        assert!(handler.concurrent_transactions_enabled());
739
740        // Test with concurrent transactions explicitly disabled
741        let mut relayer = create_test_relayer();
742        if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
743            policy.concurrent_transactions = Some(false);
744        }
745        let mocks = default_test_mocks();
746        let handler = make_stellar_tx_handler(relayer, mocks);
747        assert!(!handler.concurrent_transactions_enabled());
748
749        // Test with default (None) - should use DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS
750        let relayer = create_test_relayer();
751        let mocks = default_test_mocks();
752        let handler = make_stellar_tx_handler(relayer, mocks);
753        assert_eq!(
754            handler.concurrent_transactions_enabled(),
755            DEFAULT_STELLAR_CONCURRENT_TRANSACTIONS
756        );
757    }
758
759    #[tokio::test]
760    async fn test_enqueue_next_pending_transaction_with_concurrency_enabled() {
761        // With concurrent transactions enabled, lane management should be skipped
762        let mut relayer = create_test_relayer();
763        if let RelayerNetworkPolicy::Stellar(ref mut policy) = relayer.policies {
764            policy.concurrent_transactions = Some(true);
765        }
766        let mut mocks = default_test_mocks();
767
768        // Should NOT look for pending transactions when concurrency is enabled
769        mocks.tx_repo.expect_find_by_status_paginated().times(0); // Expect zero calls
770
771        // Should NOT produce any job when concurrency is enabled
772        mocks
773            .job_producer
774            .expect_produce_transaction_request_job()
775            .times(0); // Expect zero calls
776
777        let handler = make_stellar_tx_handler(relayer, mocks);
778
779        let result = handler
780            .enqueue_next_pending_transaction("finished-tx")
781            .await;
782        assert!(result.is_ok());
783    }
784
785    #[tokio::test]
786    async fn test_reset_transaction_for_retry() {
787        let relayer = create_test_relayer();
788        let mut mocks = default_test_mocks();
789
790        // Create a transaction with stellar data that has been prepared
791        let mut tx = create_test_transaction(&relayer.id);
792        if let NetworkTransactionData::Stellar(ref mut data) = tx.network_data {
793            data.sequence_number = Some(42);
794            data.signatures.push(dummy_signature());
795            data.hash = Some("test-hash".to_string());
796            data.signed_envelope_xdr = Some("test-xdr".to_string());
797        }
798
799        // Mock partial_update to reset transaction
800        mocks
801            .tx_repo
802            .expect_partial_update()
803            .withf(|tx_id, upd| {
804                tx_id == "tx-1"
805                    && upd.status == Some(TransactionStatus::Pending)
806                    && upd.sent_at.is_none()
807                    && upd.confirmed_at.is_none()
808            })
809            .times(1)
810            .returning(|id, upd| {
811                let mut tx = create_test_transaction("relayer-1");
812                tx.id = id;
813                tx.status = upd.status.unwrap();
814                if let Some(network_data) = upd.network_data {
815                    tx.network_data = network_data;
816                }
817                Ok::<_, RepositoryError>(tx)
818            });
819
820        let handler = make_stellar_tx_handler(relayer.clone(), mocks);
821
822        let result = handler.reset_transaction_for_retry(tx).await;
823        assert!(result.is_ok());
824
825        let reset_tx = result.unwrap();
826        assert_eq!(reset_tx.status, TransactionStatus::Pending);
827
828        // Verify stellar data was reset
829        if let NetworkTransactionData::Stellar(data) = &reset_tx.network_data {
830            assert!(data.sequence_number.is_none());
831            assert!(data.signatures.is_empty());
832            assert!(data.hash.is_none());
833            assert!(data.signed_envelope_xdr.is_none());
834        } else {
835            panic!("Expected Stellar transaction data");
836        }
837    }
838
839    #[tokio::test]
840    #[ignore = "Requires active Redis instance"]
841    async fn test_find_oldest_pending_for_relayer_with_redis() {
842        // Setup Redis repository
843        let redis_url = std::env::var("REDIS_TEST_URL")
844            .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
845        let pool = Arc::new(
846            Config::from_url(&redis_url)
847                .builder()
848                .expect("Failed to create Redis pool builder")
849                .max_size(16)
850                .runtime(Runtime::Tokio1)
851                .build()
852                .expect("Failed to build Redis pool"),
853        );
854        let connections = Arc::new(RedisConnections::new_single_pool(pool));
855
856        let random_id = Uuid::new_v4().to_string();
857        let key_prefix = format!("test_stellar:{random_id}");
858        let tx_repo = Arc::new(
859            RedisTransactionRepository::new(connections, key_prefix)
860                .expect("Failed to create RedisTransactionRepository"),
861        );
862
863        let relayer_id = format!("relayer-{}", Uuid::new_v4());
864
865        // Create three pending transactions with different created_at timestamps
866        // tx1: oldest (created first)
867        let mut tx1 = create_test_transaction(&relayer_id);
868        tx1.id = format!("tx-1-{}", Uuid::new_v4());
869        tx1.status = TransactionStatus::Pending;
870        tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string();
871
872        // tx2: middle
873        let mut tx2 = create_test_transaction(&relayer_id);
874        tx2.id = format!("tx-2-{}", Uuid::new_v4());
875        tx2.status = TransactionStatus::Pending;
876        tx2.created_at = "2025-01-27T11:00:00.000000+00:00".to_string();
877
878        // tx3: newest (created last)
879        let mut tx3 = create_test_transaction(&relayer_id);
880        tx3.id = format!("tx-3-{}", Uuid::new_v4());
881        tx3.status = TransactionStatus::Pending;
882        tx3.created_at = "2025-01-27T12:00:00.000000+00:00".to_string();
883
884        // Create transactions in Redis
885        tx_repo.create(tx1.clone()).await.unwrap();
886        tx_repo.create(tx2.clone()).await.unwrap();
887        tx_repo.create(tx3.clone()).await.unwrap();
888
889        // Create a minimal StellarRelayerTransaction instance to test the method
890        // We'll use mocks for other dependencies since we only need the transaction repository
891        let relayer = create_test_relayer();
892        let mut relayer_model = relayer.clone();
893        relayer_model.id = relayer_id.clone();
894
895        let mocks = default_test_mocks();
896        let handler = StellarRelayerTransaction::new(
897            relayer_model,
898            Arc::new(mocks.relayer_repo),
899            tx_repo.clone(),
900            Arc::new(mocks.job_producer),
901            Arc::new(mocks.signer),
902            mocks.provider,
903            Arc::new(mocks.counter),
904            Arc::new(mocks.dex_service),
905        )
906        .unwrap();
907
908        // Call find_oldest_pending_for_relayer
909        let result = handler
910            .find_oldest_pending_for_relayer(&relayer_id)
911            .await
912            .unwrap();
913
914        // Verify the result
915        assert!(result.is_some(), "Should find a pending transaction");
916        let found_tx = result.unwrap();
917
918        assert_eq!(
919            found_tx.id, tx1.id,
920            "Should get oldest transaction (tx1) - oldest_first=true so .next() yields oldest"
921        );
922        assert_eq!(
923            found_tx.created_at, tx1.created_at,
924            "Should match oldest transaction's created_at"
925        );
926
927        // Cleanup: delete test transactions
928        let _ = tx_repo.delete_by_id(tx1.id.clone()).await;
929        let _ = tx_repo.delete_by_id(tx2.id.clone()).await;
930        let _ = tx_repo.delete_by_id(tx3.id.clone()).await;
931    }
932
933    #[tokio::test]
934    async fn test_find_oldest_pending_for_relayer_with_in_memory() {
935        use crate::repositories::transaction::InMemoryTransactionRepository;
936        use uuid::Uuid;
937
938        // Setup in-memory repository
939        let tx_repo = Arc::new(InMemoryTransactionRepository::new());
940
941        let relayer_id = format!("relayer-{}", Uuid::new_v4());
942
943        // Create three pending transactions with different created_at timestamps
944        // tx1: oldest (created first)
945        let mut tx1 = create_test_transaction(&relayer_id);
946        tx1.id = format!("tx-1-{}", Uuid::new_v4());
947        tx1.status = TransactionStatus::Pending;
948        tx1.created_at = "2025-01-27T10:00:00.000000+00:00".to_string();
949
950        // tx2: middle
951        let mut tx2 = create_test_transaction(&relayer_id);
952        tx2.id = format!("tx-2-{}", Uuid::new_v4());
953        tx2.status = TransactionStatus::Pending;
954        tx2.created_at = "2025-01-27T11:00:00.000000+00:00".to_string();
955
956        // tx3: newest (created last)
957        let mut tx3 = create_test_transaction(&relayer_id);
958        tx3.id = format!("tx-3-{}", Uuid::new_v4());
959        tx3.status = TransactionStatus::Pending;
960        tx3.created_at = "2025-01-27T12:00:00.000000+00:00".to_string();
961
962        // Create transactions in memory store
963        tx_repo.create(tx1.clone()).await.unwrap();
964        tx_repo.create(tx2.clone()).await.unwrap();
965        tx_repo.create(tx3.clone()).await.unwrap();
966
967        // Create a minimal StellarRelayerTransaction instance to test the method
968        // We'll use mocks for other dependencies since we only need the transaction repository
969        let relayer = create_test_relayer();
970        let mut relayer_model = relayer.clone();
971        relayer_model.id = relayer_id.clone();
972
973        let mocks = default_test_mocks();
974        let handler = StellarRelayerTransaction::new(
975            relayer_model,
976            Arc::new(mocks.relayer_repo),
977            tx_repo.clone(),
978            Arc::new(mocks.job_producer),
979            Arc::new(mocks.signer),
980            mocks.provider,
981            Arc::new(mocks.counter),
982            Arc::new(mocks.dex_service),
983        )
984        .unwrap();
985
986        // Call find_oldest_pending_for_relayer
987        let result = handler
988            .find_oldest_pending_for_relayer(&relayer_id)
989            .await
990            .unwrap();
991
992        // Verify the result
993        assert!(result.is_some(), "Should find a pending transaction");
994        let found_tx = result.unwrap();
995
996        // oldest_first=true so .next() yields the oldest pending transaction (FIFO order)
997        assert_eq!(
998            found_tx.id, tx1.id,
999            "Should get oldest transaction (tx1) - oldest_first=true so .next() yields oldest"
1000        );
1001        assert_eq!(
1002            found_tx.created_at, tx1.created_at,
1003            "Should match oldest transaction's created_at"
1004        );
1005    }
1006}