1use crate::constants::{
7 MAXIMUM_SOLANA_TX_ATTEMPTS, SOLANA_DEFAULT_TX_VALID_TIMESPAN,
8 SOLANA_MIN_AGE_FOR_RESUBMIT_CHECK_SECONDS, SOLANA_PENDING_RECOVERY_TRIGGER_SECONDS,
9 SOLANA_PENDING_TIMEOUT_MINUTES, SOLANA_SENT_TIMEOUT_MINUTES,
10};
11use crate::models::{NetworkTransactionData, SolanaTransactionData};
12use crate::services::provider::SolanaProviderError;
13use chrono::{DateTime, Duration, Utc};
14use solana_commitment_config::CommitmentConfig;
15use solana_sdk::{signature::Signature, transaction::Transaction as SolanaTransaction};
16use std::str::FromStr;
17use tracing::{debug, error, info, warn};
18
19use super::{utils::decode_solana_transaction, SolanaRelayerTransaction};
20use crate::domain::transaction::common::is_final_state;
21use crate::domain::transaction::solana::utils::{
22 is_resubmitable, map_solana_status_to_transaction_status, too_many_solana_attempts,
23};
24use crate::{
25 jobs::{JobProducerTrait, StatusCheckContext, TransactionRequest, TransactionSend},
26 models::{
27 RelayerRepoModel, SolanaTransactionStatus, TransactionError, TransactionRepoModel,
28 TransactionStatus, TransactionUpdateRequest,
29 },
30 repositories::{transaction::TransactionRepository, RelayerRepository, Repository},
31 services::{provider::SolanaProviderTrait, signer::SolanaSignTrait},
32};
33
34impl<P, RR, TR, J, S> SolanaRelayerTransaction<P, RR, TR, J, S>
35where
36 P: SolanaProviderTrait + Send + Sync + 'static,
37 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
38 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
39 J: JobProducerTrait + Send + Sync + 'static,
40 S: SolanaSignTrait + Send + Sync + 'static,
41{
42 pub async fn handle_transaction_status_impl(
54 &self,
55 mut tx: TransactionRepoModel,
56 context: Option<StatusCheckContext>,
57 ) -> Result<TransactionRepoModel, TransactionError> {
58 debug!(
59 tx_id = %tx.id,
60 relayer_id = %tx.relayer_id,
61 status = ?tx.status,
62 "handling solana transaction status"
63 );
64
65 if is_final_state(&tx.status) {
67 debug!(
68 tx_id = %tx.id,
69 relayer_id = %tx.relayer_id,
70 status = ?tx.status,
71 "transaction already in final state"
72 );
73 return Ok(tx);
74 }
75
76 if let Some(ref ctx) = context {
78 if ctx.should_force_finalize() {
79 let reason = format!(
80 "Transaction status monitoring failed after {} consecutive errors (total: {}). \
81 Last status: {:?}. Unable to determine final on-chain state.",
82 ctx.consecutive_failures, ctx.total_failures, tx.status
83 );
84 warn!(
85 tx_id = %tx.id,
86 consecutive_failures = ctx.consecutive_failures,
87 total_failures = ctx.total_failures,
88 max_consecutive = ctx.max_consecutive_failures,
89 "circuit breaker triggered, forcing transaction to failed state"
90 );
91 return self.mark_as_failed(tx, reason).await;
95 }
96 }
97
98 let detected_status = self.check_onchain_transaction_status(&tx).await?;
100
101 if tx.status != detected_status {
105 tx = self
106 .transaction_repository()
107 .get_by_id(tx.id.clone())
108 .await?;
109 }
110
111 match detected_status {
113 TransactionStatus::Pending => {
114 self.handle_pending_status(tx).await
116 }
117 TransactionStatus::Sent | TransactionStatus::Submitted => {
118 self.handle_resubmit_or_expiration(tx).await
120 }
121 TransactionStatus::Mined
122 | TransactionStatus::Confirmed
123 | TransactionStatus::Failed
124 | TransactionStatus::Canceled
125 | TransactionStatus::Expired => {
126 self.update_transaction_status_if_needed(tx, detected_status)
127 .await
128 }
129 }
130 }
131
132 async fn check_onchain_transaction_status(
138 &self,
139 tx: &TransactionRepoModel,
140 ) -> Result<TransactionStatus, TransactionError> {
141 match tx.status {
143 TransactionStatus::Pending | TransactionStatus::Sent => {
144 return Ok(tx.status.clone());
145 }
146 _ => {}
147 }
148
149 let solana_data = tx.network_data.get_solana_transaction_data()?;
151 let signature_str = solana_data.signature.as_ref().ok_or_else(|| {
152 TransactionError::ValidationError("Transaction signature is missing".to_string())
153 })?;
154
155 let signature = Signature::from_str(signature_str).map_err(|e| {
156 TransactionError::ValidationError(format!("Invalid signature format: {e}"))
157 })?;
158
159 match self.provider().get_transaction_status(&signature).await {
161 Ok(solana_status) => {
162 Ok(map_solana_status_to_transaction_status(solana_status))
164 }
165 Err(e) => {
166 warn!(
168 tx_id = %tx.id,
169 signature = %signature_str,
170 error = %e,
171 "error getting transaction status from chain"
172 );
173 Ok(tx.status.clone())
175 }
176 }
177 }
178
179 async fn update_transaction_status_and_send_notification(
187 &self,
188 tx: TransactionRepoModel,
189 new_status: TransactionStatus,
190 network_data: Option<crate::models::NetworkTransactionData>,
191 ) -> Result<TransactionRepoModel, TransactionError> {
192 let update_request = TransactionUpdateRequest {
193 status: Some(new_status.clone()),
194 network_data,
195 confirmed_at: if matches!(new_status, TransactionStatus::Confirmed) {
196 Some(Utc::now().to_rfc3339())
197 } else {
198 None
199 },
200 ..Default::default()
201 };
202
203 let updated_tx = self
205 .transaction_repository()
206 .partial_update(tx.id.clone(), update_request)
207 .await
208 .map_err(|e| TransactionError::UnexpectedError(e.to_string()))?;
209
210 if let Err(e) = self.send_transaction_update_notification(&updated_tx).await {
213 error!(
214 tx_id = %updated_tx.id,
215 status = ?new_status,
216 "sending transaction update notification failed: {:?}",
217 e
218 );
219 }
220
221 Ok(updated_tx)
222 }
223
224 async fn update_transaction_status_if_needed(
228 &self,
229 tx: TransactionRepoModel,
230 new_status: TransactionStatus,
231 ) -> Result<TransactionRepoModel, TransactionError> {
232 if tx.status != new_status {
233 return self
234 .update_transaction_status_and_send_notification(tx, new_status, None)
235 .await;
236 }
237 Ok(tx)
238 }
239
240 async fn handle_pending_status(
245 &self,
246 tx: TransactionRepoModel,
247 ) -> Result<TransactionRepoModel, TransactionError> {
248 if self.is_valid_until_expired(&tx) {
250 info!(
251 tx_id = %tx.id,
252 valid_until = ?tx.valid_until,
253 "pending transaction valid_until has expired"
254 );
255 return self
256 .mark_as_expired(
257 tx,
258 "Transaction valid_until timestamp has expired".to_string(),
259 )
260 .await;
261 }
262
263 if self.has_exceeded_timeout(&tx)? {
266 warn!(
267 tx_id = %tx.id,
268 timeout_minutes = SOLANA_PENDING_TIMEOUT_MINUTES,
269 "pending transaction has exceeded timeout, marking as failed"
270 );
271 return self
272 .mark_as_failed(
273 tx,
274 format!(
275 "Transaction stuck in Pending status for more than {SOLANA_PENDING_TIMEOUT_MINUTES} minutes"
276 ),
277 )
278 .await;
279 }
280
281 let age = self.get_time_since_sent_or_created_at(&tx).ok_or_else(|| {
284 TransactionError::UnexpectedError(
285 "Both sent_at and created_at are missing or invalid".to_string(),
286 )
287 })?;
288
289 if age.num_seconds() >= SOLANA_PENDING_RECOVERY_TRIGGER_SECONDS {
292 info!(
293 tx_id = %tx.id,
294 age_seconds = age.num_seconds(),
295 "pending transaction may be stuck, scheduling recovery job"
296 );
297
298 let transaction_request = TransactionRequest::new(tx.id.clone(), tx.relayer_id.clone());
299
300 self.job_producer()
301 .produce_transaction_request_job(transaction_request, None)
302 .await
303 .map_err(|e| {
304 TransactionError::UnexpectedError(format!(
305 "Failed to enqueue transaction request job: {e}"
306 ))
307 })?;
308 } else {
309 debug!(
310 tx_id = %tx.id,
311 age_seconds = age.num_seconds(),
312 "pending transaction too young for recovery check"
313 );
314 }
315
316 Ok(tx)
317 }
318
319 fn get_time_since_sent_or_created_at(&self, tx: &TransactionRepoModel) -> Option<Duration> {
324 let timestamp = tx.sent_at.as_ref().or(Some(&tx.created_at))?;
326 match DateTime::parse_from_rfc3339(timestamp) {
327 Ok(dt) => Some(Utc::now().signed_duration_since(dt.with_timezone(&Utc))),
328 Err(e) => {
329 warn!(tx_id = %tx.id, ts = %timestamp, error = %e, "failed to parse timestamp");
330 None
331 }
332 }
333 }
334
335 async fn check_any_signature_on_chain(
349 &self,
350 tx: &TransactionRepoModel,
351 ) -> Result<Option<(String, SolanaTransactionStatus)>, TransactionError> {
352 for (idx, sig_str) in tx.hashes.iter().enumerate() {
354 let signature = match Signature::from_str(sig_str) {
355 Ok(sig) => sig,
356 Err(e) => {
357 warn!(
358 tx_id = %tx.id,
359 signature = %sig_str,
360 error = %e,
361 "invalid signature format in hashes, skipping"
362 );
363 continue;
364 }
365 };
366
367 match self.provider().get_transaction_status(&signature).await {
368 Ok(solana_status) => {
369 info!(
371 tx_id = %tx.id,
372 signature = %sig_str,
373 signature_idx = idx,
374 on_chain_status = ?solana_status,
375 "found transaction on-chain with previous signature"
376 );
377 return Ok(Some((sig_str.clone(), solana_status)));
378 }
379 Err(e) => {
380 debug!(
382 tx_id = %tx.id,
383 signature = %sig_str,
384 signature_idx = idx,
385 error = %e,
386 "signature not found on-chain or RPC error"
387 );
388 continue;
389 }
390 }
391 }
392
393 Ok(None)
395 }
396
397 async fn is_blockhash_valid(
401 &self,
402 transaction: &SolanaTransaction,
403 tx_id: &str,
404 ) -> Result<bool, TransactionError> {
405 let blockhash = transaction.message.recent_blockhash;
406
407 match self
408 .provider()
409 .is_blockhash_valid(&blockhash, CommitmentConfig::confirmed())
410 .await
411 {
412 Ok(is_valid) => Ok(is_valid),
413 Err(e) => {
414 if matches!(e, SolanaProviderError::BlockhashNotFound(_)) {
416 info!(
417 tx_id = %tx_id,
418 "blockhash not found on chain, treating as expired"
419 );
420 return Ok(false);
421 }
422
423 warn!(
425 tx_id = %tx_id,
426 error = %e,
427 "error checking blockhash validity, propagating error for retry"
428 );
429 Err(TransactionError::UnderlyingSolanaProvider(e))
430 }
431 }
432 }
433
434 async fn mark_as_expired(
436 &self,
437 tx: TransactionRepoModel,
438 reason: String,
439 ) -> Result<TransactionRepoModel, TransactionError> {
440 warn!(tx_id = %tx.id, reason = %reason, "marking transaction as expired");
441
442 let update_request = TransactionUpdateRequest {
443 status: Some(TransactionStatus::Expired),
444 status_reason: Some(reason),
445 ..Default::default()
446 };
447
448 self.transaction_repository()
449 .partial_update(tx.id.clone(), update_request)
450 .await
451 .map_err(|e| TransactionError::UnexpectedError(e.to_string()))
452 }
453
454 async fn mark_as_failed(
456 &self,
457 tx: TransactionRepoModel,
458 reason: String,
459 ) -> Result<TransactionRepoModel, TransactionError> {
460 warn!(tx_id = %tx.id, reason = %reason, "marking transaction as failed");
461
462 let update_request = TransactionUpdateRequest {
463 status: Some(TransactionStatus::Failed),
464 status_reason: Some(reason),
465 ..Default::default()
466 };
467
468 let updated_tx = self
469 .transaction_repository()
470 .partial_update(tx.id.clone(), update_request)
471 .await
472 .map_err(|e| TransactionError::UnexpectedError(e.to_string()))?;
473
474 if let Err(e) = self.send_transaction_update_notification(&updated_tx).await {
476 error!(
477 tx_id = %updated_tx.id,
478 error = %e,
479 "failed to send notification for failed transaction"
480 );
481 }
482
483 Ok(updated_tx)
484 }
485
486 fn is_valid_until_expired(&self, tx: &TransactionRepoModel) -> bool {
492 if let Some(valid_until_str) = &tx.valid_until {
494 if let Ok(valid_until) = DateTime::parse_from_rfc3339(valid_until_str) {
495 return Utc::now() > valid_until.with_timezone(&Utc);
496 }
497 }
498
499 if let Ok(created_at) = DateTime::parse_from_rfc3339(&tx.created_at) {
501 let default_valid_until = created_at.with_timezone(&Utc)
502 + Duration::milliseconds(SOLANA_DEFAULT_TX_VALID_TIMESPAN);
503 return Utc::now() > default_valid_until;
504 }
505
506 false
509 }
510
511 fn has_exceeded_timeout(&self, tx: &TransactionRepoModel) -> Result<bool, TransactionError> {
513 let age = self.get_time_since_sent_or_created_at(tx).ok_or_else(|| {
514 TransactionError::UnexpectedError(
515 "Both sent_at and created_at are missing or invalid".to_string(),
516 )
517 })?;
518
519 let timeout = match tx.status {
520 TransactionStatus::Pending => Duration::minutes(SOLANA_PENDING_TIMEOUT_MINUTES),
521 TransactionStatus::Sent => Duration::minutes(SOLANA_SENT_TIMEOUT_MINUTES),
522 _ => return Ok(false), };
525
526 Ok(age >= timeout)
527 }
528
529 async fn handle_resubmit_or_expiration(
541 &self,
542 tx: TransactionRepoModel,
543 ) -> Result<TransactionRepoModel, TransactionError> {
544 if self.is_valid_until_expired(&tx) {
546 info!(
547 tx_id = %tx.id,
548 valid_until = ?tx.valid_until,
549 "transaction valid_until has expired"
550 );
551 return self
552 .mark_as_expired(
553 tx,
554 "Transaction valid_until timestamp has expired".to_string(),
555 )
556 .await;
557 }
558
559 if tx.status == TransactionStatus::Submitted {
561 if too_many_solana_attempts(&tx) {
563 let attempt_count = tx.hashes.len();
564 warn!(
565 tx_id = %tx.id,
566 attempt_count = attempt_count,
567 max_attempts = MAXIMUM_SOLANA_TX_ATTEMPTS,
568 "transaction has exceeded maximum resubmission attempts"
569 );
570 return self
571 .mark_as_failed(
572 tx,
573 format!(
574 "Transaction exceeded maximum resubmission attempts ({attempt_count} > {MAXIMUM_SOLANA_TX_ATTEMPTS})"
575 ),
576 )
577 .await;
578 }
579 } else if self.has_exceeded_timeout(&tx)? {
580 let timeout_minutes = match tx.status {
582 TransactionStatus::Pending => SOLANA_PENDING_TIMEOUT_MINUTES,
583 TransactionStatus::Sent => SOLANA_SENT_TIMEOUT_MINUTES,
584 _ => 0,
585 };
586 let status = tx.status.clone();
587 warn!(
588 tx_id = %tx.id,
589 status = ?status,
590 timeout_minutes = timeout_minutes,
591 "transaction has exceeded timeout for status"
592 );
593 return self
594 .mark_as_failed(
595 tx,
596 format!(
597 "Transaction stuck in {status:?} status for more than {timeout_minutes} minutes"
598 ),
599 )
600 .await;
601 }
602
603 let time_since_sent = match self.get_time_since_sent_or_created_at(&tx) {
605 Some(duration) => duration,
606 None => {
607 debug!(tx_id = %tx.id, "both sent_at and created_at are missing or invalid, skipping resubmit check");
608 return Ok(tx);
609 }
610 };
611
612 if time_since_sent.num_seconds() < SOLANA_MIN_AGE_FOR_RESUBMIT_CHECK_SECONDS {
613 debug!(
614 tx_id = %tx.id,
615 time_since_sent_secs = time_since_sent.num_seconds(),
616 min_age = SOLANA_MIN_AGE_FOR_RESUBMIT_CHECK_SECONDS,
617 "transaction too young for blockhash expiration check"
618 );
619 return Ok(tx);
620 }
621
622 if let Some((found_signature, solana_status)) =
629 self.check_any_signature_on_chain(&tx).await?
630 {
631 info!(
632 tx_id = %tx.id,
633 signature = %found_signature,
634 on_chain_status = ?solana_status,
635 "transaction found on-chain with previous signature, updating to final state"
636 );
637
638 let new_status = map_solana_status_to_transaction_status(solana_status);
640
641 let solana_data = tx.network_data.get_solana_transaction_data()?;
643 let updated_solana_data = SolanaTransactionData {
644 signature: Some(found_signature),
645 ..solana_data
646 };
647 let updated_network_data = NetworkTransactionData::Solana(updated_solana_data);
648
649 return self
651 .update_transaction_status_and_send_notification(
652 tx,
653 new_status,
654 Some(updated_network_data),
655 )
656 .await;
657 }
658
659 let transaction = decode_solana_transaction(&tx)?;
661
662 let blockhash_valid = self.is_blockhash_valid(&transaction, &tx.id).await?;
664
665 if blockhash_valid {
666 debug!(
667 tx_id = %tx.id,
668 "blockhash still valid, no action needed"
669 );
670 return Ok(tx);
671 }
672
673 info!(
674 tx_id = %tx.id,
675 "blockhash has expired, checking if transaction can be resubmitted"
676 );
677
678 if is_resubmitable(&transaction) {
680 info!(
681 tx_id = %tx.id,
682 "transaction is resubmitable, enqueuing resubmit job"
683 );
684
685 self.job_producer()
687 .produce_submit_transaction_job(
688 TransactionSend::resubmit(tx.id.clone(), tx.relayer_id.clone()),
689 None,
690 )
691 .await
692 .map_err(|e| {
693 TransactionError::UnexpectedError(format!(
694 "Failed to enqueue resubmit job: {e}"
695 ))
696 })?;
697
698 info!(tx_id = %tx.id, "resubmit job enqueued successfully");
699 Ok(tx)
700 } else {
701 warn!(
703 tx_id = %tx.id,
704 num_signatures = transaction.message.header.num_required_signatures,
705 "transaction has expired blockhash but cannot be resubmitted (multi-sig)"
706 );
707
708 self.mark_as_expired(
709 tx,
710 format!(
711 "Blockhash expired and transaction requires {} signatures (cannot resubmit)",
712 transaction.message.header.num_required_signatures
713 ),
714 )
715 .await
716 }
717 }
718}
719
720#[cfg(test)]
721mod tests {
722 use super::*;
723 use crate::{
724 jobs::{JobProducerError, MockJobProducerTrait, TransactionCommand},
725 models::{NetworkTransactionData, SolanaTransactionData},
726 repositories::{MockRelayerRepository, MockTransactionRepository},
727 services::{
728 provider::{MockSolanaProviderTrait, SolanaProviderError},
729 signer::MockSolanaSignTrait,
730 },
731 utils::{
732 base64_encode,
733 mocks::mockutils::{create_mock_solana_relayer, create_mock_solana_transaction},
734 },
735 };
736 use eyre::Result;
737 use mockall::predicate::*;
738 use solana_sdk::{hash::Hash, message::Message, pubkey::Pubkey};
739 use solana_system_interface::instruction as system_instruction;
740 use std::sync::Arc;
741
742 fn create_tx_with_signature(
744 status: TransactionStatus,
745 signature: Option<&str>,
746 ) -> TransactionRepoModel {
747 let mut tx = create_mock_solana_transaction();
748 tx.status = status;
749 if let Some(sig) = signature {
750 tx.network_data = NetworkTransactionData::Solana(SolanaTransactionData {
751 transaction: Some("test".to_string()),
752 instructions: None,
753 signature: Some(sig.to_string()),
754 });
755 }
756 tx
757 }
758
759 #[tokio::test]
760 async fn test_handle_status_already_final() {
761 let provider = MockSolanaProviderTrait::new();
762 let relayer_repo = Arc::new(MockRelayerRepository::new());
763 let tx_repo = Arc::new(MockTransactionRepository::new());
764 let job_producer = Arc::new(MockJobProducerTrait::new());
765 let relayer = create_mock_solana_relayer("test-relayer".to_string(), false);
766
767 let handler = SolanaRelayerTransaction::new(
768 relayer,
769 relayer_repo,
770 Arc::new(provider),
771 tx_repo,
772 job_producer,
773 Arc::new(MockSolanaSignTrait::new()),
774 )
775 .unwrap();
776
777 let tx_confirmed = create_tx_with_signature(TransactionStatus::Confirmed, None);
779 let result = handler
780 .handle_transaction_status_impl(tx_confirmed.clone(), None)
781 .await;
782 assert!(result.is_ok());
783 assert_eq!(result.unwrap().id, tx_confirmed.id);
784
785 let tx_failed = create_tx_with_signature(TransactionStatus::Failed, None);
787 let result = handler
788 .handle_transaction_status_impl(tx_failed.clone(), None)
789 .await;
790 assert!(result.is_ok());
791 assert_eq!(result.unwrap().id, tx_failed.id);
792
793 let tx_expired = create_tx_with_signature(TransactionStatus::Expired, None);
795 let result = handler
796 .handle_transaction_status_impl(tx_expired.clone(), None)
797 .await;
798 assert!(result.is_ok());
799 assert_eq!(result.unwrap().id, tx_expired.id);
800 }
801
802 #[tokio::test]
803 async fn test_handle_status_processed() -> Result<()> {
804 let mut provider = MockSolanaProviderTrait::new();
805 let relayer_repo = Arc::new(MockRelayerRepository::new());
806 let mut tx_repo = MockTransactionRepository::new();
807 let job_producer = MockJobProducerTrait::new();
808
809 let signature_str =
810 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
811 let tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
813
814 provider
816 .expect_get_transaction_status()
817 .with(eq(Signature::from_str(signature_str)?))
818 .times(1)
819 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Processed) }));
820
821 let tx_id = tx.id.clone();
822 let tx_id_clone = tx_id.clone();
823
824 tx_repo
826 .expect_get_by_id()
827 .with(eq(tx_id.clone()))
828 .times(1)
829 .returning(move |_| {
830 Ok(create_tx_with_signature(
831 TransactionStatus::Submitted, Some(signature_str),
833 ))
834 });
835
836 tx_repo
838 .expect_partial_update()
839 .withf(move |tx_id_param, update_req| {
840 tx_id_param == &tx_id_clone && update_req.status == Some(TransactionStatus::Mined)
841 })
842 .times(1)
843 .returning(move |_, _| {
844 Ok(create_tx_with_signature(
845 TransactionStatus::Mined,
846 Some(signature_str),
847 ))
848 });
849
850 let handler = SolanaRelayerTransaction::new(
851 create_mock_solana_relayer("test-relayer".to_string(), false),
852 relayer_repo,
853 Arc::new(provider),
854 Arc::new(tx_repo),
855 Arc::new(job_producer),
856 Arc::new(MockSolanaSignTrait::new()),
857 )?;
858
859 let result = handler
860 .handle_transaction_status_impl(tx.clone(), None)
861 .await;
862
863 assert!(result.is_ok());
864 let updated_tx = result.unwrap();
865 assert_eq!(updated_tx.id, tx.id);
866 assert_eq!(updated_tx.status, TransactionStatus::Mined);
868 Ok(())
869 }
870
871 #[tokio::test]
872 async fn test_handle_status_confirmed() -> Result<()> {
873 let mut provider = MockSolanaProviderTrait::new();
874 let relayer_repo = Arc::new(MockRelayerRepository::new());
875 let mut tx_repo = MockTransactionRepository::new();
876 let job_producer = MockJobProducerTrait::new();
877
878 let signature_str =
879 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
880 let tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
881
882 provider
883 .expect_get_transaction_status()
884 .with(eq(Signature::from_str(signature_str)?))
885 .times(1)
886 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Confirmed) }));
887
888 let tx_id = tx.id.clone();
889 let tx_id_clone = tx_id.clone();
890
891 tx_repo
893 .expect_get_by_id()
894 .with(eq(tx_id.clone()))
895 .times(1)
896 .returning(move |_| {
897 Ok(create_tx_with_signature(
898 TransactionStatus::Submitted,
899 Some(signature_str),
900 ))
901 });
902
903 tx_repo
904 .expect_partial_update()
905 .withf(move |tx_id_param, update_req| {
906 tx_id_param == &tx_id_clone && update_req.status == Some(TransactionStatus::Mined)
907 })
908 .times(1)
909 .returning(move |_, _| {
910 Ok(create_tx_with_signature(
911 TransactionStatus::Mined,
912 Some(signature_str),
913 ))
914 });
915
916 let handler = SolanaRelayerTransaction::new(
917 create_mock_solana_relayer("test-relayer".to_string(), false),
918 relayer_repo,
919 Arc::new(provider),
920 Arc::new(tx_repo),
921 Arc::new(job_producer),
922 Arc::new(MockSolanaSignTrait::new()),
923 )?;
924
925 let result = handler
926 .handle_transaction_status_impl(tx.clone(), None)
927 .await;
928
929 assert!(result.is_ok());
930 let updated_tx = result.unwrap();
931 assert_eq!(updated_tx.id, tx.id);
932 assert_eq!(updated_tx.status, TransactionStatus::Mined);
933 Ok(())
934 }
935
936 #[tokio::test]
937 async fn test_handle_status_finalized() -> Result<()> {
938 let mut provider = MockSolanaProviderTrait::new();
939 let relayer_repo = Arc::new(MockRelayerRepository::new());
940 let mut tx_repo = MockTransactionRepository::new();
941 let job_producer = MockJobProducerTrait::new();
942
943 let signature_str =
944 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
945 let tx = create_tx_with_signature(TransactionStatus::Mined, Some(signature_str));
946
947 provider
948 .expect_get_transaction_status()
949 .with(eq(Signature::from_str(signature_str)?))
950 .times(1)
951 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Finalized) }));
952
953 let tx_id = tx.id.clone();
954 let tx_id_clone = tx_id.clone();
955
956 tx_repo
958 .expect_get_by_id()
959 .with(eq(tx_id.clone()))
960 .times(1)
961 .returning(move |_| {
962 Ok(create_tx_with_signature(
963 TransactionStatus::Mined,
964 Some(signature_str),
965 ))
966 });
967
968 tx_repo
969 .expect_partial_update()
970 .withf(move |tx_id_param, update_req| {
971 tx_id_param == &tx_id_clone
972 && update_req.status == Some(TransactionStatus::Confirmed)
973 })
974 .times(1)
975 .returning(move |_, _| {
976 Ok(create_tx_with_signature(
977 TransactionStatus::Confirmed,
978 Some(signature_str),
979 ))
980 });
981
982 let handler = SolanaRelayerTransaction::new(
983 create_mock_solana_relayer("test-relayer".to_string(), false),
984 relayer_repo,
985 Arc::new(provider),
986 Arc::new(tx_repo),
987 Arc::new(job_producer),
988 Arc::new(MockSolanaSignTrait::new()),
989 )?;
990
991 let result = handler
992 .handle_transaction_status_impl(tx.clone(), None)
993 .await;
994
995 assert!(result.is_ok());
996 let updated_tx = result.unwrap();
997 assert_eq!(updated_tx.id, tx.id);
998 assert_eq!(updated_tx.status, TransactionStatus::Confirmed);
999 Ok(())
1000 }
1001
1002 #[tokio::test]
1003 async fn test_handle_status_provider_error() -> Result<()> {
1004 let mut provider = MockSolanaProviderTrait::new();
1005 let relayer_repo = Arc::new(MockRelayerRepository::new());
1006 let tx_repo = Arc::new(MockTransactionRepository::new());
1007 let job_producer = MockJobProducerTrait::new();
1008
1009 let signature_str = "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1010 let tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
1012 let error_message = "Provider is down";
1013
1014 provider
1017 .expect_get_transaction_status()
1018 .with(eq(Signature::from_str(signature_str)?))
1019 .times(1)
1020 .returning(move |_| {
1021 Box::pin(async { Err(SolanaProviderError::RpcError(error_message.to_string())) })
1022 });
1023
1024 let handler = SolanaRelayerTransaction::new(
1028 create_mock_solana_relayer("test-relayer".to_string(), false),
1029 relayer_repo,
1030 Arc::new(provider),
1031 tx_repo,
1032 Arc::new(job_producer),
1033 Arc::new(MockSolanaSignTrait::new()),
1034 )?;
1035
1036 let result = handler
1037 .handle_transaction_status_impl(tx.clone(), None)
1038 .await;
1039
1040 assert!(result.is_ok());
1043 let updated_tx = result.unwrap();
1044 assert_eq!(updated_tx.status, TransactionStatus::Submitted); Ok(())
1046 }
1047
1048 #[tokio::test]
1049 async fn test_handle_status_failed() -> Result<()> {
1050 let mut provider = MockSolanaProviderTrait::new();
1051 let relayer_repo = Arc::new(MockRelayerRepository::new());
1052 let mut tx_repo = MockTransactionRepository::new();
1053 let job_producer = MockJobProducerTrait::new();
1054
1055 let signature_str =
1056 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1057 let tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
1058
1059 provider
1060 .expect_get_transaction_status()
1061 .with(eq(Signature::from_str(signature_str)?))
1062 .times(1)
1063 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Failed) }));
1064
1065 let tx_id = tx.id.clone();
1066 let tx_id_clone = tx_id.clone();
1067
1068 tx_repo
1070 .expect_get_by_id()
1071 .with(eq(tx_id.clone()))
1072 .times(1)
1073 .returning(move |_| {
1074 Ok(create_tx_with_signature(
1075 TransactionStatus::Submitted,
1076 Some(signature_str),
1077 ))
1078 });
1079
1080 tx_repo
1081 .expect_partial_update()
1082 .withf(move |tx_id_param, update_req| {
1083 tx_id_param == &tx_id_clone && update_req.status == Some(TransactionStatus::Failed)
1084 })
1085 .times(1)
1086 .returning(move |_, _| {
1087 Ok(create_tx_with_signature(
1088 TransactionStatus::Failed,
1089 Some(signature_str),
1090 ))
1091 });
1092
1093 let handler = SolanaRelayerTransaction::new(
1094 create_mock_solana_relayer("test-relayer".to_string(), false),
1095 relayer_repo,
1096 Arc::new(provider),
1097 Arc::new(tx_repo),
1098 Arc::new(job_producer),
1099 Arc::new(MockSolanaSignTrait::new()),
1100 )?;
1101
1102 let result = handler
1103 .handle_transaction_status_impl(tx.clone(), None)
1104 .await;
1105
1106 assert!(result.is_ok());
1107 let updated_tx = result.unwrap();
1108 assert_eq!(updated_tx.id, tx.id);
1109 assert_eq!(updated_tx.status, TransactionStatus::Failed);
1110 Ok(())
1111 }
1112
1113 #[tokio::test]
1114 async fn test_default_valid_until_expired() -> Result<()> {
1115 let provider = MockSolanaProviderTrait::new();
1116 let relayer_repo = Arc::new(MockRelayerRepository::new());
1117 let mut tx_repo = MockTransactionRepository::new();
1118 let job_producer = MockJobProducerTrait::new();
1119
1120 let old_created_at = (Utc::now()
1122 - Duration::milliseconds(SOLANA_DEFAULT_TX_VALID_TIMESPAN + 60000))
1123 .to_rfc3339();
1124 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1125 tx.created_at = old_created_at;
1126 tx.valid_until = None; let tx_id = tx.id.clone();
1129
1130 tx_repo
1132 .expect_partial_update()
1133 .withf(move |tx_id_param, update_req| {
1134 tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Expired)
1135 })
1136 .times(1)
1137 .returning(move |_, _| {
1138 let mut expired_tx = create_tx_with_signature(TransactionStatus::Expired, None);
1139 expired_tx.status = TransactionStatus::Expired;
1140 Ok(expired_tx)
1141 });
1142
1143 let handler = SolanaRelayerTransaction::new(
1144 create_mock_solana_relayer("test-relayer".to_string(), false),
1145 relayer_repo,
1146 Arc::new(provider),
1147 Arc::new(tx_repo),
1148 Arc::new(job_producer),
1149 Arc::new(MockSolanaSignTrait::new()),
1150 )?;
1151
1152 let result = handler.handle_transaction_status_impl(tx, None).await;
1153
1154 assert!(result.is_ok());
1155 let updated_tx = result.unwrap();
1156 assert_eq!(updated_tx.status, TransactionStatus::Expired);
1157 Ok(())
1158 }
1159
1160 #[tokio::test]
1161 async fn test_default_valid_until_not_expired() -> Result<()> {
1162 let mut provider = MockSolanaProviderTrait::new();
1163 let relayer_repo = Arc::new(MockRelayerRepository::new());
1164 let mut tx_repo = MockTransactionRepository::new();
1165 let job_producer = MockJobProducerTrait::new();
1166
1167 let recent_created_at = (Utc::now()
1169 - Duration::milliseconds(SOLANA_DEFAULT_TX_VALID_TIMESPAN - 60000))
1170 .to_rfc3339();
1171 let signature_str =
1172 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1173 let mut tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
1174 tx.created_at = recent_created_at.clone();
1175 tx.valid_until = None; let tx_id = tx.id.clone();
1178 let tx_id_clone = tx_id.clone();
1179 let recent_created_at_clone = recent_created_at.clone();
1180
1181 provider
1183 .expect_get_transaction_status()
1184 .with(eq(Signature::from_str(signature_str)?))
1185 .times(1)
1186 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Processed) }));
1187
1188 tx_repo
1190 .expect_get_by_id()
1191 .with(eq(tx_id.clone()))
1192 .times(1)
1193 .returning(move |_| {
1194 let mut tx =
1195 create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
1196 tx.created_at = recent_created_at_clone.clone();
1197 tx.valid_until = None;
1198 Ok(tx)
1199 });
1200
1201 tx_repo
1203 .expect_partial_update()
1204 .withf(move |tx_id_param, update_req| {
1205 tx_id_param == &tx_id_clone && update_req.status == Some(TransactionStatus::Mined)
1206 })
1207 .times(1)
1208 .returning(move |_, _| {
1209 Ok(create_tx_with_signature(
1210 TransactionStatus::Mined,
1211 Some(signature_str),
1212 ))
1213 });
1214
1215 let handler = SolanaRelayerTransaction::new(
1216 create_mock_solana_relayer("test-relayer".to_string(), false),
1217 relayer_repo,
1218 Arc::new(provider),
1219 Arc::new(tx_repo),
1220 Arc::new(job_producer),
1221 Arc::new(MockSolanaSignTrait::new()),
1222 )?;
1223
1224 let result = handler
1225 .handle_transaction_status_impl(tx.clone(), None)
1226 .await;
1227
1228 assert!(result.is_ok());
1229 let updated_tx = result.unwrap();
1230 assert_eq!(updated_tx.status, TransactionStatus::Mined);
1232 Ok(())
1233 }
1234
1235 #[tokio::test]
1236 async fn test_too_many_resubmission_attempts() -> Result<()> {
1237 let mut provider = MockSolanaProviderTrait::new();
1238 let relayer_repo = Arc::new(MockRelayerRepository::new());
1239 let mut tx_repo = MockTransactionRepository::new();
1240 let job_producer = MockJobProducerTrait::new();
1241
1242 let signature_str =
1244 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1245 let mut tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
1246 tx.hashes = vec!["sig".to_string(); MAXIMUM_SOLANA_TX_ATTEMPTS + 1];
1247 tx.sent_at = Some(Utc::now().to_rfc3339()); let tx_id = tx.id.clone();
1250
1251 provider
1253 .expect_get_transaction_status()
1254 .with(eq(Signature::from_str(signature_str)?))
1255 .times(1)
1256 .returning(|_| {
1257 Box::pin(async {
1258 Err(crate::services::provider::SolanaProviderError::RpcError(
1259 "test error".to_string(),
1260 ))
1261 })
1262 });
1263
1264 tx_repo
1266 .expect_partial_update()
1267 .withf(move |tx_id_param, update_req| {
1268 tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Failed)
1269 })
1270 .times(1)
1271 .returning(move |_, _| {
1272 let mut failed_tx = create_tx_with_signature(TransactionStatus::Failed, None);
1273 failed_tx.status = TransactionStatus::Failed;
1274 Ok(failed_tx)
1275 });
1276
1277 let handler = SolanaRelayerTransaction::new(
1278 create_mock_solana_relayer("test-relayer".to_string(), false),
1279 relayer_repo,
1280 Arc::new(provider),
1281 Arc::new(tx_repo),
1282 Arc::new(job_producer),
1283 Arc::new(MockSolanaSignTrait::new()),
1284 )?;
1285
1286 let result = handler.handle_transaction_status_impl(tx, None).await;
1287
1288 assert!(result.is_ok());
1289 let updated_tx = result.unwrap();
1290 assert_eq!(updated_tx.status, TransactionStatus::Failed);
1291 Ok(())
1292 }
1293
1294 #[tokio::test]
1295 async fn test_handle_pending_status_schedules_recovery_job() -> Result<()> {
1296 let provider = MockSolanaProviderTrait::new();
1297 let relayer_repo = Arc::new(MockRelayerRepository::new());
1298 let tx_repo = Arc::new(MockTransactionRepository::new());
1299 let mut job_producer = MockJobProducerTrait::new();
1300
1301 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1303 tx.created_at = (Utc::now()
1304 - Duration::seconds(SOLANA_PENDING_RECOVERY_TRIGGER_SECONDS + 10))
1305 .to_rfc3339();
1306
1307 let tx_id = tx.id.clone();
1308
1309 job_producer
1311 .expect_produce_transaction_request_job()
1312 .withf(move |job, _delay| job.transaction_id == tx_id)
1313 .times(1)
1314 .returning(|_, _| Box::pin(async { Ok(()) }));
1315
1316 let handler = SolanaRelayerTransaction::new(
1317 create_mock_solana_relayer("test-relayer".to_string(), false),
1318 relayer_repo,
1319 Arc::new(provider),
1320 tx_repo,
1321 Arc::new(job_producer),
1322 Arc::new(MockSolanaSignTrait::new()),
1323 )?;
1324
1325 let result = handler.handle_pending_status(tx.clone()).await;
1326
1327 assert!(result.is_ok());
1328 let returned_tx = result.unwrap();
1329 assert_eq!(returned_tx.status, TransactionStatus::Pending); Ok(())
1331 }
1332
1333 #[tokio::test]
1334 async fn test_handle_pending_status_too_young() -> Result<()> {
1335 let provider = MockSolanaProviderTrait::new();
1336 let relayer_repo = Arc::new(MockRelayerRepository::new());
1337 let tx_repo = Arc::new(MockTransactionRepository::new());
1338 let job_producer = Arc::new(MockJobProducerTrait::new());
1339
1340 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1342 tx.created_at = (Utc::now()
1343 - Duration::seconds(SOLANA_PENDING_RECOVERY_TRIGGER_SECONDS - 10))
1344 .to_rfc3339();
1345
1346 let handler = SolanaRelayerTransaction::new(
1347 create_mock_solana_relayer("test-relayer".to_string(), false),
1348 relayer_repo,
1349 Arc::new(provider),
1350 tx_repo,
1351 job_producer,
1352 Arc::new(MockSolanaSignTrait::new()),
1353 )?;
1354
1355 let result = handler.handle_pending_status(tx.clone()).await;
1356
1357 assert!(result.is_ok());
1358 let returned_tx = result.unwrap();
1359 assert_eq!(returned_tx.status, TransactionStatus::Pending); Ok(())
1361 }
1362
1363 #[tokio::test]
1364 async fn test_handle_pending_status_timeout() -> Result<()> {
1365 let provider = MockSolanaProviderTrait::new();
1366 let relayer_repo = Arc::new(MockRelayerRepository::new());
1367 let mut tx_repo = MockTransactionRepository::new();
1368 let job_producer = Arc::new(MockJobProducerTrait::new());
1369
1370 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1372 tx.created_at =
1373 (Utc::now() - Duration::minutes(SOLANA_PENDING_TIMEOUT_MINUTES + 1)).to_rfc3339();
1374
1375 let tx_id = tx.id.clone();
1376
1377 tx_repo
1379 .expect_partial_update()
1380 .withf(move |tx_id_param, update_req| {
1381 tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Failed)
1382 })
1383 .times(1)
1384 .returning(move |_, _| {
1385 let mut failed_tx = create_tx_with_signature(TransactionStatus::Failed, None);
1386 failed_tx.status = TransactionStatus::Failed;
1387 Ok(failed_tx)
1388 });
1389
1390 let handler = SolanaRelayerTransaction::new(
1391 create_mock_solana_relayer("test-relayer".to_string(), false),
1392 relayer_repo,
1393 Arc::new(provider),
1394 Arc::new(tx_repo),
1395 job_producer,
1396 Arc::new(MockSolanaSignTrait::new()),
1397 )?;
1398
1399 let result = handler.handle_pending_status(tx).await;
1400
1401 assert!(result.is_ok());
1402 let updated_tx = result.unwrap();
1403 assert_eq!(updated_tx.status, TransactionStatus::Failed);
1404 Ok(())
1405 }
1406
1407 #[tokio::test]
1408 async fn test_handle_resubmit_blockhash_expired_resubmitable() -> Result<()> {
1409 let mut provider = MockSolanaProviderTrait::new();
1410 let relayer_repo = Arc::new(MockRelayerRepository::new());
1411 let tx_repo = Arc::new(MockTransactionRepository::new());
1412 let mut job_producer = MockJobProducerTrait::new();
1413
1414 let payer = Pubkey::new_unique();
1416 let instruction =
1417 solana_system_interface::instruction::transfer(&payer, &Pubkey::new_unique(), 1000);
1418 let mut transaction = SolanaTransaction::new_with_payer(&[instruction], Some(&payer));
1419 transaction.message.recent_blockhash = Hash::from_str("11111111111111111111111111111112")?;
1420 let transaction_bytes = bincode::serialize(&transaction)?;
1421 let transaction_b64 = base64_encode(&transaction_bytes);
1422
1423 let signature_str = "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1425 let mut tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
1426 tx.sent_at = Some(
1427 (Utc::now() - Duration::seconds(SOLANA_MIN_AGE_FOR_RESUBMIT_CHECK_SECONDS + 10))
1428 .to_rfc3339(),
1429 );
1430 tx.network_data = NetworkTransactionData::Solana(SolanaTransactionData {
1431 transaction: Some(transaction_b64),
1432 instructions: None,
1433 signature: Some(signature_str.to_string()),
1434 });
1435
1436 let tx_id = tx.id.clone();
1437
1438 provider
1440 .expect_is_blockhash_valid()
1441 .with(
1442 eq(Hash::from_str("11111111111111111111111111111112")?),
1443 eq(CommitmentConfig::confirmed()),
1444 )
1445 .times(1)
1446 .returning(|_, _| Box::pin(async { Ok(false) })); job_producer
1450 .expect_produce_submit_transaction_job()
1451 .withf(move |job, _delay| {
1452 matches!(job.command, TransactionCommand::Resubmit) && job.transaction_id == tx_id
1453 })
1454 .times(1)
1455 .returning(|_, _| Box::pin(async { Ok(()) }));
1456
1457 let handler = SolanaRelayerTransaction::new(
1458 create_mock_solana_relayer("test-relayer".to_string(), false),
1459 relayer_repo,
1460 Arc::new(provider),
1461 tx_repo,
1462 Arc::new(job_producer),
1463 Arc::new(MockSolanaSignTrait::new()),
1464 )?;
1465
1466 let result = handler.handle_resubmit_or_expiration(tx.clone()).await;
1467
1468 assert!(result.is_ok());
1469 let returned_tx = result.unwrap();
1470 assert_eq!(returned_tx.status, TransactionStatus::Submitted); Ok(())
1472 }
1473
1474 #[tokio::test]
1475 async fn test_handle_resubmit_blockhash_expired_not_resubmitable() -> Result<()> {
1476 let mut provider = MockSolanaProviderTrait::new();
1477 let relayer_repo = Arc::new(MockRelayerRepository::new());
1478 let mut tx_repo = MockTransactionRepository::new();
1479 let job_producer = Arc::new(MockJobProducerTrait::new());
1480
1481 let payer = Pubkey::new_unique();
1483 let recipient = Pubkey::new_unique();
1484 let additional_signer = Pubkey::new_unique();
1485 let instruction = system_instruction::transfer(&payer, &recipient, 1000);
1486
1487 let mut message = Message::new(&[instruction], Some(&payer));
1489 message.account_keys.push(additional_signer);
1490 message.header.num_required_signatures = 2; message.recent_blockhash = Hash::from_str("11111111111111111111111111111112")?;
1492
1493 let transaction = SolanaTransaction::new_unsigned(message);
1494 let transaction_bytes = bincode::serialize(&transaction)?;
1495 let transaction_b64 = base64_encode(&transaction_bytes);
1496
1497 let signature_str = "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1498 let mut tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
1499 tx.sent_at = Some(
1500 (Utc::now() - Duration::seconds(SOLANA_MIN_AGE_FOR_RESUBMIT_CHECK_SECONDS + 10))
1501 .to_rfc3339(),
1502 );
1503 tx.network_data = NetworkTransactionData::Solana(SolanaTransactionData {
1504 transaction: Some(transaction_b64),
1505 instructions: None,
1506 signature: Some(signature_str.to_string()),
1507 });
1508
1509 let tx_id = tx.id.clone();
1510
1511 provider
1513 .expect_is_blockhash_valid()
1514 .with(
1515 eq(Hash::from_str("11111111111111111111111111111112")?),
1516 eq(CommitmentConfig::confirmed()),
1517 )
1518 .times(1)
1519 .returning(|_, _| Box::pin(async { Ok(false) })); tx_repo
1523 .expect_partial_update()
1524 .withf(move |tx_id_param, update_req| {
1525 tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Expired)
1526 })
1527 .times(1)
1528 .returning(move |_, _| {
1529 let mut expired_tx = create_tx_with_signature(TransactionStatus::Expired, None);
1530 expired_tx.status = TransactionStatus::Expired;
1531 Ok(expired_tx)
1532 });
1533
1534 let handler = SolanaRelayerTransaction::new(
1535 create_mock_solana_relayer("test-relayer".to_string(), false),
1536 relayer_repo,
1537 Arc::new(provider),
1538 Arc::new(tx_repo),
1539 job_producer,
1540 Arc::new(MockSolanaSignTrait::new()),
1541 )?;
1542
1543 let result = handler.handle_resubmit_or_expiration(tx).await;
1544
1545 assert!(result.is_ok());
1546 let updated_tx = result.unwrap();
1547 assert_eq!(updated_tx.status, TransactionStatus::Expired);
1548 Ok(())
1549 }
1550
1551 #[tokio::test]
1552 async fn test_check_any_signature_on_chain_found() -> Result<()> {
1553 let mut provider = MockSolanaProviderTrait::new();
1554 let relayer_repo = Arc::new(MockRelayerRepository::new());
1555 let tx_repo = Arc::new(MockTransactionRepository::new());
1556 let job_producer = Arc::new(MockJobProducerTrait::new());
1557
1558 let signature1 = "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1559 let signature2 = "3XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1560
1561 let mut tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature1));
1562 tx.hashes = vec![signature1.to_string(), signature2.to_string()];
1563
1564 provider
1566 .expect_get_transaction_status()
1567 .with(eq(Signature::from_str(signature1)?))
1568 .times(1)
1569 .returning(|_| {
1570 Box::pin(async { Err(SolanaProviderError::RpcError("not found".to_string())) })
1571 });
1572
1573 provider
1574 .expect_get_transaction_status()
1575 .with(eq(Signature::from_str(signature2)?))
1576 .times(1)
1577 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Processed) }));
1578
1579 let handler = SolanaRelayerTransaction::new(
1580 create_mock_solana_relayer("test-relayer".to_string(), false),
1581 relayer_repo,
1582 Arc::new(provider),
1583 tx_repo,
1584 job_producer,
1585 Arc::new(MockSolanaSignTrait::new()),
1586 )?;
1587
1588 let result = handler.check_any_signature_on_chain(&tx).await;
1589
1590 assert!(result.is_ok());
1591 let found = result.unwrap();
1592 assert!(found.is_some());
1593 let (found_sig, status) = found.unwrap();
1594 assert_eq!(found_sig, signature2);
1595 assert_eq!(status, SolanaTransactionStatus::Processed);
1596 Ok(())
1597 }
1598
1599 #[tokio::test]
1600 async fn test_check_any_signature_on_chain_not_found() -> Result<()> {
1601 let mut provider = MockSolanaProviderTrait::new();
1602 let relayer_repo = Arc::new(MockRelayerRepository::new());
1603 let tx_repo = Arc::new(MockTransactionRepository::new());
1604 let job_producer = Arc::new(MockJobProducerTrait::new());
1605
1606 let signature1 = "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1607 let signature2 = "3XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
1608
1609 let mut tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature1));
1610 tx.hashes = vec![signature1.to_string(), signature2.to_string()];
1611
1612 provider
1614 .expect_get_transaction_status()
1615 .with(eq(Signature::from_str(signature1)?))
1616 .times(1)
1617 .returning(|_| {
1618 Box::pin(async { Err(SolanaProviderError::RpcError("not found".to_string())) })
1619 });
1620
1621 provider
1622 .expect_get_transaction_status()
1623 .with(eq(Signature::from_str(signature2)?))
1624 .times(1)
1625 .returning(|_| {
1626 Box::pin(async { Err(SolanaProviderError::RpcError("not found".to_string())) })
1627 });
1628
1629 let handler = SolanaRelayerTransaction::new(
1630 create_mock_solana_relayer("test-relayer".to_string(), false),
1631 relayer_repo,
1632 Arc::new(provider),
1633 tx_repo,
1634 job_producer,
1635 Arc::new(MockSolanaSignTrait::new()),
1636 )?;
1637
1638 let result = handler.check_any_signature_on_chain(&tx).await;
1639
1640 assert!(result.is_ok());
1641 let found = result.unwrap();
1642 assert!(found.is_none());
1643 Ok(())
1644 }
1645
1646 #[tokio::test]
1647 async fn test_is_blockhash_valid_true() -> Result<()> {
1648 let mut provider = MockSolanaProviderTrait::new();
1649 let relayer_repo = Arc::new(MockRelayerRepository::new());
1650 let tx_repo = Arc::new(MockTransactionRepository::new());
1651 let job_producer = Arc::new(MockJobProducerTrait::new());
1652
1653 let blockhash = Hash::from_str("11111111111111111111111111111112")?;
1654
1655 provider
1656 .expect_is_blockhash_valid()
1657 .with(eq(blockhash), eq(CommitmentConfig::confirmed()))
1658 .times(1)
1659 .returning(|_, _| Box::pin(async { Ok(true) }));
1660
1661 let handler = SolanaRelayerTransaction::new(
1662 create_mock_solana_relayer("test-relayer".to_string(), false),
1663 relayer_repo,
1664 Arc::new(provider),
1665 tx_repo,
1666 job_producer,
1667 Arc::new(MockSolanaSignTrait::new()),
1668 )?;
1669
1670 let mut transaction =
1671 SolanaTransaction::new_unsigned(Message::new(&[], Some(&Pubkey::new_unique())));
1672 transaction.message.recent_blockhash = blockhash;
1673
1674 let result = handler.is_blockhash_valid(&transaction, "test-tx-id").await;
1675
1676 assert!(result.is_ok());
1677 assert!(result.unwrap());
1678 Ok(())
1679 }
1680
1681 #[tokio::test]
1682 async fn test_is_blockhash_valid_false() -> Result<()> {
1683 let mut provider = MockSolanaProviderTrait::new();
1684 let relayer_repo = Arc::new(MockRelayerRepository::new());
1685 let tx_repo = Arc::new(MockTransactionRepository::new());
1686 let job_producer = Arc::new(MockJobProducerTrait::new());
1687
1688 let blockhash = Hash::from_str("11111111111111111111111111111112")?;
1689
1690 provider
1691 .expect_is_blockhash_valid()
1692 .with(eq(blockhash), eq(CommitmentConfig::confirmed()))
1693 .times(1)
1694 .returning(|_, _| Box::pin(async { Ok(false) }));
1695
1696 let handler = SolanaRelayerTransaction::new(
1697 create_mock_solana_relayer("test-relayer".to_string(), false),
1698 relayer_repo,
1699 Arc::new(provider),
1700 tx_repo,
1701 job_producer,
1702 Arc::new(MockSolanaSignTrait::new()),
1703 )?;
1704
1705 let mut transaction =
1706 SolanaTransaction::new_unsigned(Message::new(&[], Some(&Pubkey::new_unique())));
1707 transaction.message.recent_blockhash = blockhash;
1708
1709 let result = handler.is_blockhash_valid(&transaction, "test-tx-id").await;
1710
1711 assert!(result.is_ok());
1712 assert!(!result.unwrap());
1713 Ok(())
1714 }
1715
1716 #[tokio::test]
1717 async fn test_is_blockhash_valid_error() -> Result<()> {
1718 let mut provider = MockSolanaProviderTrait::new();
1719 let relayer_repo = Arc::new(MockRelayerRepository::new());
1720 let tx_repo = Arc::new(MockTransactionRepository::new());
1721 let job_producer = Arc::new(MockJobProducerTrait::new());
1722
1723 let blockhash = Hash::from_str("11111111111111111111111111111112")?;
1724
1725 provider
1726 .expect_is_blockhash_valid()
1727 .with(eq(blockhash), eq(CommitmentConfig::confirmed()))
1728 .times(1)
1729 .returning(|_, _| {
1730 Box::pin(async { Err(SolanaProviderError::RpcError("test error".to_string())) })
1731 });
1732
1733 let handler = SolanaRelayerTransaction::new(
1734 create_mock_solana_relayer("test-relayer".to_string(), false),
1735 relayer_repo,
1736 Arc::new(provider),
1737 tx_repo,
1738 job_producer,
1739 Arc::new(MockSolanaSignTrait::new()),
1740 )?;
1741
1742 let mut transaction =
1743 SolanaTransaction::new_unsigned(Message::new(&[], Some(&Pubkey::new_unique())));
1744 transaction.message.recent_blockhash = blockhash;
1745
1746 let result = handler.is_blockhash_valid(&transaction, "test-tx-id").await;
1747
1748 assert!(result.is_err());
1749 let error = result.unwrap_err();
1750 match error {
1751 TransactionError::UnderlyingSolanaProvider(_) => {} _ => panic!("Expected UnderlyingSolanaProvider error"),
1753 }
1754 Ok(())
1755 }
1756
1757 #[tokio::test]
1758 async fn test_get_time_since_sent_or_created_at_with_sent_at() {
1759 let provider = MockSolanaProviderTrait::new();
1760 let relayer_repo = Arc::new(MockRelayerRepository::new());
1761 let tx_repo = Arc::new(MockTransactionRepository::new());
1762 let job_producer = Arc::new(MockJobProducerTrait::new());
1763
1764 let handler = SolanaRelayerTransaction::new(
1765 create_mock_solana_relayer("test-relayer".to_string(), false),
1766 relayer_repo,
1767 Arc::new(provider),
1768 tx_repo,
1769 job_producer,
1770 Arc::new(MockSolanaSignTrait::new()),
1771 )
1772 .unwrap();
1773
1774 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1775 let past_time = Utc::now() - Duration::minutes(5);
1776 tx.sent_at = Some(past_time.to_rfc3339());
1777
1778 let result = handler.get_time_since_sent_or_created_at(&tx);
1779
1780 assert!(result.is_some());
1781 let duration = result.unwrap();
1782 assert!(duration.num_minutes() >= 5);
1783 }
1784
1785 #[tokio::test]
1786 async fn test_get_time_since_sent_or_created_at_with_created_at() {
1787 let provider = MockSolanaProviderTrait::new();
1788 let relayer_repo = Arc::new(MockRelayerRepository::new());
1789 let tx_repo = Arc::new(MockTransactionRepository::new());
1790 let job_producer = Arc::new(MockJobProducerTrait::new());
1791
1792 let handler = SolanaRelayerTransaction::new(
1793 create_mock_solana_relayer("test-relayer".to_string(), false),
1794 relayer_repo,
1795 Arc::new(provider),
1796 tx_repo,
1797 job_producer,
1798 Arc::new(MockSolanaSignTrait::new()),
1799 )
1800 .unwrap();
1801
1802 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1803 let past_time = Utc::now() - Duration::minutes(10);
1804 tx.created_at = past_time.to_rfc3339();
1805 tx.sent_at = None; let result = handler.get_time_since_sent_or_created_at(&tx);
1808
1809 assert!(result.is_some());
1810 let duration = result.unwrap();
1811 assert!(duration.num_minutes() >= 10);
1812 }
1813
1814 #[tokio::test]
1815 async fn test_has_exceeded_timeout_pending() {
1816 let provider = MockSolanaProviderTrait::new();
1817 let relayer_repo = Arc::new(MockRelayerRepository::new());
1818 let tx_repo = Arc::new(MockTransactionRepository::new());
1819 let job_producer = Arc::new(MockJobProducerTrait::new());
1820
1821 let handler = SolanaRelayerTransaction::new(
1822 create_mock_solana_relayer("test-relayer".to_string(), false),
1823 relayer_repo,
1824 Arc::new(provider),
1825 tx_repo,
1826 job_producer,
1827 Arc::new(MockSolanaSignTrait::new()),
1828 )
1829 .unwrap();
1830
1831 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1832 tx.created_at =
1833 (Utc::now() - Duration::minutes(SOLANA_PENDING_TIMEOUT_MINUTES + 1)).to_rfc3339();
1834
1835 let result = handler.has_exceeded_timeout(&tx);
1836
1837 assert!(result.is_ok());
1838 assert!(result.unwrap());
1839 }
1840
1841 #[tokio::test]
1842 async fn test_has_exceeded_timeout_sent() {
1843 let provider = MockSolanaProviderTrait::new();
1844 let relayer_repo = Arc::new(MockRelayerRepository::new());
1845 let tx_repo = Arc::new(MockTransactionRepository::new());
1846 let job_producer = Arc::new(MockJobProducerTrait::new());
1847
1848 let handler = SolanaRelayerTransaction::new(
1849 create_mock_solana_relayer("test-relayer".to_string(), false),
1850 relayer_repo,
1851 Arc::new(provider),
1852 tx_repo,
1853 job_producer,
1854 Arc::new(MockSolanaSignTrait::new()),
1855 )
1856 .unwrap();
1857
1858 let mut tx = create_tx_with_signature(TransactionStatus::Sent, None);
1859 tx.sent_at =
1860 Some((Utc::now() - Duration::minutes(SOLANA_SENT_TIMEOUT_MINUTES + 1)).to_rfc3339());
1861
1862 let result = handler.has_exceeded_timeout(&tx);
1863
1864 assert!(result.is_ok());
1865 assert!(result.unwrap());
1866 }
1867
1868 #[tokio::test]
1869 async fn test_is_valid_until_expired_user_provided() {
1870 let provider = MockSolanaProviderTrait::new();
1871 let relayer_repo = Arc::new(MockRelayerRepository::new());
1872 let tx_repo = Arc::new(MockTransactionRepository::new());
1873 let job_producer = Arc::new(MockJobProducerTrait::new());
1874
1875 let handler = SolanaRelayerTransaction::new(
1876 create_mock_solana_relayer("test-relayer".to_string(), false),
1877 relayer_repo,
1878 Arc::new(provider),
1879 tx_repo,
1880 job_producer,
1881 Arc::new(MockSolanaSignTrait::new()),
1882 )
1883 .unwrap();
1884
1885 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1886 let past_time = Utc::now() - Duration::minutes(1);
1887 tx.valid_until = Some(past_time.to_rfc3339());
1888
1889 assert!(handler.is_valid_until_expired(&tx));
1890 }
1891
1892 #[tokio::test]
1893 async fn test_is_valid_until_expired_default() {
1894 let provider = MockSolanaProviderTrait::new();
1895 let relayer_repo = Arc::new(MockRelayerRepository::new());
1896 let tx_repo = Arc::new(MockTransactionRepository::new());
1897 let job_producer = Arc::new(MockJobProducerTrait::new());
1898
1899 let handler = SolanaRelayerTransaction::new(
1900 create_mock_solana_relayer("test-relayer".to_string(), false),
1901 relayer_repo,
1902 Arc::new(provider),
1903 tx_repo,
1904 job_producer,
1905 Arc::new(MockSolanaSignTrait::new()),
1906 )
1907 .unwrap();
1908
1909 let mut tx = create_tx_with_signature(TransactionStatus::Pending, None);
1910 let past_time =
1911 Utc::now() - Duration::milliseconds(SOLANA_DEFAULT_TX_VALID_TIMESPAN + 1000);
1912 tx.created_at = past_time.to_rfc3339();
1913 tx.valid_until = None; assert!(handler.is_valid_until_expired(&tx));
1916 }
1917
1918 #[tokio::test]
1919 async fn test_mark_as_expired() -> Result<()> {
1920 let provider = MockSolanaProviderTrait::new();
1921 let relayer_repo = Arc::new(MockRelayerRepository::new());
1922 let mut tx_repo = MockTransactionRepository::new();
1923 let job_producer = Arc::new(MockJobProducerTrait::new());
1924
1925 let tx = create_tx_with_signature(TransactionStatus::Pending, None);
1926 let tx_id = tx.id.clone();
1927 let reason = "Test expiration";
1928
1929 tx_repo
1930 .expect_partial_update()
1931 .withf(move |tx_id_param, update_req| {
1932 tx_id_param == &tx_id
1933 && update_req.status == Some(TransactionStatus::Expired)
1934 && update_req.status_reason == Some(reason.to_string())
1935 })
1936 .times(1)
1937 .returning(move |_, _| {
1938 let mut expired_tx = create_tx_with_signature(TransactionStatus::Expired, None);
1939 expired_tx.status = TransactionStatus::Expired;
1940 Ok(expired_tx)
1941 });
1942
1943 let handler = SolanaRelayerTransaction::new(
1944 create_mock_solana_relayer("test-relayer".to_string(), false),
1945 relayer_repo,
1946 Arc::new(provider),
1947 Arc::new(tx_repo),
1948 job_producer,
1949 Arc::new(MockSolanaSignTrait::new()),
1950 )?;
1951
1952 let result = handler.mark_as_expired(tx, reason.to_string()).await;
1953
1954 assert!(result.is_ok());
1955 let updated_tx = result.unwrap();
1956 assert_eq!(updated_tx.status, TransactionStatus::Expired);
1957 Ok(())
1958 }
1959
1960 #[tokio::test]
1961 async fn test_mark_as_failed_without_notification() -> Result<()> {
1962 let provider = MockSolanaProviderTrait::new();
1964 let relayer_repo = Arc::new(MockRelayerRepository::new());
1965 let mut tx_repo = MockTransactionRepository::new();
1966 let job_producer = Arc::new(MockJobProducerTrait::new());
1967
1968 let tx = create_tx_with_signature(TransactionStatus::Pending, None);
1969 let tx_id = tx.id.clone();
1970 let reason = "Test failure";
1971
1972 tx_repo
1973 .expect_partial_update()
1974 .withf(move |tx_id_param, update_req| {
1975 tx_id_param == &tx_id
1976 && update_req.status == Some(TransactionStatus::Failed)
1977 && update_req.status_reason == Some(reason.to_string())
1978 })
1979 .times(1)
1980 .returning(move |_, _| {
1981 let mut failed_tx = create_tx_with_signature(TransactionStatus::Failed, None);
1982 failed_tx.status = TransactionStatus::Failed;
1983 Ok(failed_tx)
1984 });
1985
1986 let handler = SolanaRelayerTransaction::new(
1988 create_mock_solana_relayer("test-relayer".to_string(), false),
1989 relayer_repo,
1990 Arc::new(provider),
1991 Arc::new(tx_repo),
1992 job_producer,
1993 Arc::new(MockSolanaSignTrait::new()),
1994 )?;
1995
1996 let result = handler.mark_as_failed(tx, reason.to_string()).await;
1997
1998 assert!(result.is_ok());
1999 let updated_tx = result.unwrap();
2000 assert_eq!(updated_tx.status, TransactionStatus::Failed);
2001 Ok(())
2002 }
2003
2004 #[tokio::test]
2005 async fn test_mark_as_failed_sends_notification() -> Result<()> {
2006 let provider = MockSolanaProviderTrait::new();
2008 let relayer_repo = Arc::new(MockRelayerRepository::new());
2009 let mut tx_repo = MockTransactionRepository::new();
2010 let mut job_producer = MockJobProducerTrait::new();
2011
2012 let mut relayer = create_mock_solana_relayer("test-relayer".to_string(), false);
2014 relayer.notification_id = Some("test-notification".to_string());
2015
2016 let tx = create_tx_with_signature(TransactionStatus::Pending, None);
2017 let tx_id = tx.id.clone();
2018 let reason = "Test failure";
2019
2020 tx_repo
2021 .expect_partial_update()
2022 .withf(move |tx_id_param, update_req| {
2023 tx_id_param == &tx_id
2024 && update_req.status == Some(TransactionStatus::Failed)
2025 && update_req.status_reason == Some(reason.to_string())
2026 })
2027 .times(1)
2028 .returning(move |_, _| {
2029 let mut failed_tx = create_tx_with_signature(TransactionStatus::Failed, None);
2030 failed_tx.status = TransactionStatus::Failed;
2031 Ok(failed_tx)
2032 });
2033
2034 job_producer
2036 .expect_produce_send_notification_job()
2037 .times(1)
2038 .returning(|_, _| Box::pin(async { Ok(()) }));
2039
2040 let handler = SolanaRelayerTransaction::new(
2041 relayer,
2042 relayer_repo,
2043 Arc::new(provider),
2044 Arc::new(tx_repo),
2045 Arc::new(job_producer),
2046 Arc::new(MockSolanaSignTrait::new()),
2047 )?;
2048
2049 let result = handler.mark_as_failed(tx, reason.to_string()).await;
2050
2051 assert!(result.is_ok());
2052 let updated_tx = result.unwrap();
2053 assert_eq!(updated_tx.status, TransactionStatus::Failed);
2054 Ok(())
2055 }
2056
2057 #[tokio::test]
2058 async fn test_mark_as_failed_notification_error_does_not_fail() -> Result<()> {
2059 let provider = MockSolanaProviderTrait::new();
2061 let relayer_repo = Arc::new(MockRelayerRepository::new());
2062 let mut tx_repo = MockTransactionRepository::new();
2063 let mut job_producer = MockJobProducerTrait::new();
2064
2065 let mut relayer = create_mock_solana_relayer("test-relayer".to_string(), false);
2067 relayer.notification_id = Some("test-notification".to_string());
2068
2069 let tx = create_tx_with_signature(TransactionStatus::Pending, None);
2070 let tx_id = tx.id.clone();
2071 let reason = "Test failure";
2072
2073 tx_repo
2074 .expect_partial_update()
2075 .withf(move |tx_id_param, update_req| {
2076 tx_id_param == &tx_id
2077 && update_req.status == Some(TransactionStatus::Failed)
2078 && update_req.status_reason == Some(reason.to_string())
2079 })
2080 .times(1)
2081 .returning(move |_, _| {
2082 let mut failed_tx = create_tx_with_signature(TransactionStatus::Failed, None);
2083 failed_tx.status = TransactionStatus::Failed;
2084 Ok(failed_tx)
2085 });
2086
2087 job_producer
2089 .expect_produce_send_notification_job()
2090 .times(1)
2091 .returning(|_, _| {
2092 Box::pin(async {
2093 Err(JobProducerError::QueueError(
2094 "Notification service unavailable".to_string(),
2095 ))
2096 })
2097 });
2098
2099 let handler = SolanaRelayerTransaction::new(
2100 relayer,
2101 relayer_repo,
2102 Arc::new(provider),
2103 Arc::new(tx_repo),
2104 Arc::new(job_producer),
2105 Arc::new(MockSolanaSignTrait::new()),
2106 )?;
2107
2108 let result = handler.mark_as_failed(tx, reason.to_string()).await;
2109
2110 assert!(result.is_ok());
2112 let updated_tx = result.unwrap();
2113 assert_eq!(updated_tx.status, TransactionStatus::Failed);
2114 Ok(())
2115 }
2116
2117 #[tokio::test]
2118 async fn test_update_transaction_status_and_send_notification() -> Result<()> {
2119 let provider = MockSolanaProviderTrait::new();
2120 let relayer_repo = Arc::new(MockRelayerRepository::new());
2121 let mut tx_repo = MockTransactionRepository::new();
2122 let mut job_producer = MockJobProducerTrait::new();
2123
2124 let mut relayer = create_mock_solana_relayer("test-relayer".to_string(), false);
2126 relayer.notification_id = Some("test-notification".to_string());
2127
2128 let tx = create_tx_with_signature(TransactionStatus::Submitted, None);
2129 let tx_id = tx.id.clone();
2130 let new_status = TransactionStatus::Confirmed;
2131
2132 tx_repo
2133 .expect_partial_update()
2134 .withf(move |tx_id_param, update_req| {
2135 tx_id_param == &tx_id && update_req.status == Some(TransactionStatus::Confirmed)
2136 })
2137 .times(1)
2138 .returning(move |_, _| {
2139 let mut confirmed_tx = create_tx_with_signature(TransactionStatus::Confirmed, None);
2140 confirmed_tx.status = TransactionStatus::Confirmed;
2141 Ok(confirmed_tx)
2142 });
2143
2144 job_producer
2145 .expect_produce_send_notification_job()
2146 .times(1)
2147 .returning(|_, _| Box::pin(async { Ok(()) }));
2148
2149 let handler = SolanaRelayerTransaction::new(
2150 relayer,
2151 relayer_repo,
2152 Arc::new(provider),
2153 Arc::new(tx_repo),
2154 Arc::new(job_producer),
2155 Arc::new(MockSolanaSignTrait::new()),
2156 )?;
2157
2158 let result = handler
2159 .update_transaction_status_and_send_notification(tx, new_status, None)
2160 .await;
2161
2162 assert!(result.is_ok());
2163 let updated_tx = result.unwrap();
2164 assert_eq!(updated_tx.status, TransactionStatus::Confirmed);
2165 Ok(())
2166 }
2167
2168 mod circuit_breaker_tests {
2170 use super::*;
2171 use crate::jobs::StatusCheckContext;
2172 use crate::models::NetworkType;
2173
2174 fn create_triggered_context() -> StatusCheckContext {
2176 StatusCheckContext::new(
2177 45, 60, 70, 38, 115, NetworkType::Solana,
2183 )
2184 }
2185
2186 fn create_safe_context() -> StatusCheckContext {
2188 StatusCheckContext::new(
2189 5, 10, 15, 38, 115, NetworkType::Solana,
2195 )
2196 }
2197
2198 fn create_total_triggered_context() -> StatusCheckContext {
2200 StatusCheckContext::new(
2201 10, 120, 130, 38, 115, NetworkType::Solana,
2207 )
2208 }
2209
2210 #[tokio::test]
2211 async fn test_circuit_breaker_submitted_marks_as_failed() -> Result<()> {
2212 let provider = MockSolanaProviderTrait::new();
2213 let relayer_repo = Arc::new(MockRelayerRepository::new());
2214 let mut tx_repo = MockTransactionRepository::new();
2215 let job_producer = Arc::new(MockJobProducerTrait::new());
2216 let relayer = create_mock_solana_relayer("test-relayer".to_string(), false);
2217
2218 let tx = create_tx_with_signature(TransactionStatus::Submitted, Some("test-sig"));
2219
2220 tx_repo
2222 .expect_partial_update()
2223 .withf(|_, update| update.status == Some(TransactionStatus::Failed))
2224 .times(1)
2225 .returning(|_, update| {
2226 let mut updated_tx =
2227 create_tx_with_signature(TransactionStatus::Submitted, Some("test-sig"));
2228 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2229 updated_tx.status_reason = update.status_reason.clone();
2230 Ok(updated_tx)
2231 });
2232
2233 let handler = SolanaRelayerTransaction::new(
2234 relayer,
2235 relayer_repo,
2236 Arc::new(provider),
2237 Arc::new(tx_repo),
2238 job_producer,
2239 Arc::new(MockSolanaSignTrait::new()),
2240 )?;
2241
2242 let ctx = create_triggered_context();
2243 let result = handler
2244 .handle_transaction_status_impl(tx, Some(ctx))
2245 .await?;
2246
2247 assert_eq!(result.status, TransactionStatus::Failed);
2248 assert!(result.status_reason.is_some());
2249 assert!(result.status_reason.unwrap().contains("consecutive errors"));
2250 Ok(())
2251 }
2252
2253 #[tokio::test]
2254 async fn test_circuit_breaker_pending_marks_as_failed() -> Result<()> {
2255 let provider = MockSolanaProviderTrait::new();
2256 let relayer_repo = Arc::new(MockRelayerRepository::new());
2257 let mut tx_repo = MockTransactionRepository::new();
2258 let job_producer = Arc::new(MockJobProducerTrait::new());
2259 let relayer = create_mock_solana_relayer("test-relayer".to_string(), false);
2260
2261 let tx = create_tx_with_signature(TransactionStatus::Pending, None);
2262
2263 tx_repo
2265 .expect_partial_update()
2266 .withf(|_, update| update.status == Some(TransactionStatus::Failed))
2267 .times(1)
2268 .returning(|_, update| {
2269 let mut updated_tx = create_tx_with_signature(TransactionStatus::Pending, None);
2270 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2271 updated_tx.status_reason = update.status_reason.clone();
2272 Ok(updated_tx)
2273 });
2274
2275 let handler = SolanaRelayerTransaction::new(
2276 relayer,
2277 relayer_repo,
2278 Arc::new(provider),
2279 Arc::new(tx_repo),
2280 job_producer,
2281 Arc::new(MockSolanaSignTrait::new()),
2282 )?;
2283
2284 let ctx = create_triggered_context();
2285 let result = handler
2286 .handle_transaction_status_impl(tx, Some(ctx))
2287 .await?;
2288
2289 assert_eq!(result.status, TransactionStatus::Failed);
2290 Ok(())
2291 }
2292
2293 #[tokio::test]
2294 async fn test_circuit_breaker_total_failures_triggers() -> Result<()> {
2295 let provider = MockSolanaProviderTrait::new();
2296 let relayer_repo = Arc::new(MockRelayerRepository::new());
2297 let mut tx_repo = MockTransactionRepository::new();
2298 let job_producer = Arc::new(MockJobProducerTrait::new());
2299 let relayer = create_mock_solana_relayer("test-relayer".to_string(), false);
2300
2301 let tx = create_tx_with_signature(TransactionStatus::Submitted, Some("test-sig"));
2302
2303 tx_repo
2304 .expect_partial_update()
2305 .withf(|_, update| update.status == Some(TransactionStatus::Failed))
2306 .times(1)
2307 .returning(|_, _| {
2308 let mut updated_tx =
2309 create_tx_with_signature(TransactionStatus::Failed, Some("test-sig"));
2310 updated_tx.status_reason =
2311 Some("Circuit breaker triggered by total failures".to_string());
2312 Ok(updated_tx)
2313 });
2314
2315 let handler = SolanaRelayerTransaction::new(
2316 relayer,
2317 relayer_repo,
2318 Arc::new(provider),
2319 Arc::new(tx_repo),
2320 job_producer,
2321 Arc::new(MockSolanaSignTrait::new()),
2322 )?;
2323
2324 let ctx = create_total_triggered_context();
2326 let result = handler
2327 .handle_transaction_status_impl(tx, Some(ctx))
2328 .await?;
2329
2330 assert_eq!(result.status, TransactionStatus::Failed);
2331 Ok(())
2332 }
2333
2334 #[tokio::test]
2335 async fn test_circuit_breaker_below_threshold_continues() -> Result<()> {
2336 let mut provider = MockSolanaProviderTrait::new();
2337 let relayer_repo = Arc::new(MockRelayerRepository::new());
2338 let mut tx_repo = MockTransactionRepository::new();
2339 let job_producer = Arc::new(MockJobProducerTrait::new());
2340 let relayer = create_mock_solana_relayer("test-relayer".to_string(), false);
2341
2342 let signature_str =
2344 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
2345 let tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
2346
2347 provider
2350 .expect_get_transaction_status()
2351 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Confirmed) }));
2352
2353 tx_repo.expect_get_by_id().returning(move |_| {
2354 Ok(create_tx_with_signature(
2355 TransactionStatus::Submitted,
2356 Some(signature_str),
2357 ))
2358 });
2359
2360 tx_repo.expect_partial_update().returning(move |_, update| {
2362 let mut updated_tx =
2363 create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
2364 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2365 Ok(updated_tx)
2366 });
2367
2368 let handler = SolanaRelayerTransaction::new(
2369 relayer,
2370 relayer_repo,
2371 Arc::new(provider),
2372 Arc::new(tx_repo),
2373 job_producer,
2374 Arc::new(MockSolanaSignTrait::new()),
2375 )?;
2376
2377 let ctx = create_safe_context();
2378 let result = handler
2379 .handle_transaction_status_impl(tx, Some(ctx))
2380 .await?;
2381
2382 assert_eq!(result.status, TransactionStatus::Mined);
2385 Ok(())
2386 }
2387
2388 #[tokio::test]
2389 async fn test_circuit_breaker_final_state_early_return() -> Result<()> {
2390 let provider = MockSolanaProviderTrait::new();
2391 let relayer_repo = Arc::new(MockRelayerRepository::new());
2392 let tx_repo = MockTransactionRepository::new();
2393 let job_producer = MockJobProducerTrait::new();
2394 let relayer = create_mock_solana_relayer("test-relayer".to_string(), false);
2395
2396 let tx = create_tx_with_signature(TransactionStatus::Confirmed, Some("test-sig"));
2398
2399 let handler = SolanaRelayerTransaction::new(
2400 relayer,
2401 relayer_repo,
2402 Arc::new(provider),
2403 Arc::new(tx_repo),
2404 Arc::new(job_producer),
2405 Arc::new(MockSolanaSignTrait::new()),
2406 )?;
2407
2408 let ctx = create_triggered_context();
2409
2410 let result = handler
2412 .handle_transaction_status_impl(tx, Some(ctx))
2413 .await?;
2414
2415 assert_eq!(result.status, TransactionStatus::Confirmed);
2416 Ok(())
2417 }
2418
2419 #[tokio::test]
2420 async fn test_circuit_breaker_no_context_continues() -> Result<()> {
2421 let mut provider = MockSolanaProviderTrait::new();
2422 let relayer_repo = Arc::new(MockRelayerRepository::new());
2423 let mut tx_repo = MockTransactionRepository::new();
2424 let job_producer = Arc::new(MockJobProducerTrait::new());
2425 let relayer = create_mock_solana_relayer("test-relayer".to_string(), false);
2426
2427 let signature_str =
2429 "4XFPmbPT4TRchFWNmQD2N8BhjxJQKqYdXWQG7kJJtxCBZ8Y9WtNDoPAwQaHFYnVynCjMVyF9TCMrpPFkEpG7LpZr";
2430 let tx = create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
2431
2432 provider
2434 .expect_get_transaction_status()
2435 .returning(|_| Box::pin(async { Ok(SolanaTransactionStatus::Confirmed) }));
2436
2437 tx_repo.expect_get_by_id().returning(move |_| {
2438 Ok(create_tx_with_signature(
2439 TransactionStatus::Submitted,
2440 Some(signature_str),
2441 ))
2442 });
2443
2444 tx_repo.expect_partial_update().returning(move |_, update| {
2445 let mut updated_tx =
2446 create_tx_with_signature(TransactionStatus::Submitted, Some(signature_str));
2447 updated_tx.status = update.status.unwrap_or(updated_tx.status);
2448 Ok(updated_tx)
2449 });
2450
2451 let handler = SolanaRelayerTransaction::new(
2452 relayer,
2453 relayer_repo,
2454 Arc::new(provider),
2455 Arc::new(tx_repo),
2456 job_producer,
2457 Arc::new(MockSolanaSignTrait::new()),
2458 )?;
2459
2460 let result = handler.handle_transaction_status_impl(tx, None).await?;
2462
2463 assert_eq!(result.status, TransactionStatus::Mined);
2465 Ok(())
2466 }
2467 }
2468}