openzeppelin_relayer/utils/
redis.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use color_eyre::Result;
5use deadpool_redis::{Config, Pool, Runtime};
6use tracing::{debug, info, warn};
7
8use crate::config::ServerConfig;
9
10#[derive(Clone, Debug)]
19pub struct RedisConnections {
20 primary_pool: Arc<Pool>,
22 reader_pool: Arc<Pool>,
24}
25
26impl RedisConnections {
27 pub fn new_single_pool(pool: Arc<Pool>) -> Self {
34 Self {
35 primary_pool: pool.clone(),
36 reader_pool: pool,
37 }
38 }
39
40 pub fn primary(&self) -> &Arc<Pool> {
45 &self.primary_pool
46 }
47
48 pub fn reader(&self) -> &Arc<Pool> {
55 &self.reader_pool
56 }
57}
58
59async fn create_pool(url: &str, pool_max_size: usize, config: &ServerConfig) -> Result<Arc<Pool>> {
61 let cfg = Config::from_url(url);
62
63 let pool = cfg
64 .builder()
65 .map_err(|e| eyre::eyre!("Failed to create Redis pool builder for {}: {}", url, e))?
66 .max_size(pool_max_size)
67 .wait_timeout(Some(Duration::from_millis(config.redis_pool_timeout_ms)))
68 .create_timeout(Some(Duration::from_millis(
69 config.redis_connection_timeout_ms,
70 )))
71 .recycle_timeout(Some(Duration::from_millis(
72 config.redis_connection_timeout_ms,
73 )))
74 .runtime(Runtime::Tokio1)
75 .build()
76 .map_err(|e| eyre::eyre!("Failed to build Redis pool for {}: {}", url, e))?;
77
78 let conn = pool
80 .get()
81 .await
82 .map_err(|e| eyre::eyre!("Failed to get initial Redis connection from {}: {}", url, e))?;
83 drop(conn);
84
85 Ok(Arc::new(pool))
86}
87
88pub async fn initialize_redis_connections(config: &ServerConfig) -> Result<Arc<RedisConnections>> {
117 let primary_pool_size = config.redis_pool_max_size;
118 let primary_pool = create_pool(&config.redis_url, primary_pool_size, config).await?;
119
120 info!(
121 primary_url = %config.redis_url,
122 primary_pool_size = %primary_pool_size,
123 "initializing primary Redis connection pool"
124 );
125 let reader_pool = match &config.redis_reader_url {
126 Some(reader_url) if !reader_url.is_empty() => {
127 let reader_pool_size = config.redis_reader_pool_max_size;
128
129 info!(
130 primary_url = %config.redis_url,
131 reader_url = %reader_url,
132 primary_pool_size = %primary_pool_size,
133 reader_pool_size = %reader_pool_size,
134 "Using separate reader endpoint for read operations"
135 );
136 create_pool(reader_url, reader_pool_size, config).await?
137 }
138 _ => {
139 debug!(
140 primary_url = %config.redis_url,
141 pool_size = %primary_pool_size,
142 "No reader URL configured, using primary for all operations"
143 );
144 primary_pool.clone()
145 }
146 };
147
148 Ok(Arc::new(RedisConnections {
149 primary_pool,
150 reader_pool,
151 }))
152}
153
154#[derive(Clone)]
186pub struct DistributedLock {
187 pool: Arc<Pool>,
188 lock_key: String,
189 ttl: Duration,
190}
191
192impl DistributedLock {
193 pub fn new(pool: Arc<Pool>, lock_key: &str, ttl: Duration) -> Self {
201 Self {
202 pool,
203 lock_key: lock_key.to_string(),
204 ttl,
205 }
206 }
207
208 pub async fn try_acquire(&self) -> Result<Option<LockGuard>> {
223 let lock_value = generate_lock_value();
224 let mut conn = self.pool.get().await?;
225
226 let result: Option<String> = redis::cmd("SET")
228 .arg(&self.lock_key)
229 .arg(&lock_value)
230 .arg("NX")
231 .arg("EX")
232 .arg(self.ttl.as_secs())
233 .query_async(&mut conn)
234 .await?;
235
236 match result {
237 Some(_) => {
238 debug!(
239 lock_key = %self.lock_key,
240 ttl_secs = %self.ttl.as_secs(),
241 "distributed lock acquired"
242 );
243 Ok(Some(LockGuard {
244 release_info: Some(LockReleaseInfo {
245 pool: self.pool.clone(),
246 lock_key: self.lock_key.clone(),
247 lock_value,
248 }),
249 }))
250 }
251 None => {
252 debug!(
253 lock_key = %self.lock_key,
254 "distributed lock already held by another instance"
255 );
256 Ok(None)
257 }
258 }
259 }
260}
261
262pub struct LockGuard {
276 release_info: Option<LockReleaseInfo>,
280}
281
282struct LockReleaseInfo {
284 pool: Arc<Pool>,
285 lock_key: String,
286 lock_value: String,
287}
288
289impl LockGuard {
290 pub async fn release(mut self) -> Result<bool> {
303 let info = self
305 .release_info
306 .take()
307 .expect("release_info should always be Some before release");
308
309 Self::do_release(info).await
310 }
311
312 async fn do_release(info: LockReleaseInfo) -> Result<bool> {
314 let mut conn = info.pool.get().await?;
315
316 let script = r#"
319 if redis.call("GET", KEYS[1]) == ARGV[1] then
320 return redis.call("DEL", KEYS[1])
321 else
322 return 0
323 end
324 "#;
325
326 let result: i32 = redis::Script::new(script)
327 .key(&info.lock_key)
328 .arg(&info.lock_value)
329 .invoke_async(&mut conn)
330 .await?;
331
332 if result == 1 {
333 debug!(lock_key = %info.lock_key, "distributed lock released");
334 Ok(true)
335 } else {
336 warn!(
337 lock_key = %info.lock_key,
338 "distributed lock release failed - lock already expired or owned by another instance"
339 );
340 Ok(false)
341 }
342 }
343}
344
345impl Drop for LockGuard {
346 fn drop(&mut self) {
347 if let Some(info) = self.release_info.take() {
349 tokio::spawn(async move {
352 if let Err(e) = LockGuard::do_release(info).await {
353 warn!(error = %e, "failed to release distributed lock in drop, will expire after TTL");
354 }
355 });
356 }
357 }
358}
359
360fn generate_lock_value() -> String {
366 use std::sync::atomic::{AtomicU64, Ordering};
367
368 static LOCK_VALUE_COUNTER: AtomicU64 = AtomicU64::new(0);
369 let process_id = std::process::id();
370 let timestamp = std::time::SystemTime::now()
371 .duration_since(std::time::UNIX_EPOCH)
372 .map(|d| d.as_nanos())
373 .unwrap_or(0);
374 let counter = LOCK_VALUE_COUNTER.fetch_add(1, Ordering::Relaxed);
375
376 format!("{process_id}:{timestamp}:{counter}")
377}
378
379#[cfg(test)]
380mod tests {
381 use super::*;
382
383 #[test]
384 fn test_generate_lock_value_is_unique() {
385 let value1 = generate_lock_value();
386 let value2 = generate_lock_value();
387
388 assert_ne!(value1, value2);
390 }
391
392 #[test]
393 fn test_generate_lock_value_contains_process_id() {
394 let value = generate_lock_value();
395
396 assert!(value.contains(':'));
398
399 let parts: Vec<&str> = value.split(':').collect();
401 assert_eq!(parts.len(), 3);
402
403 assert!(parts[0].parse::<u32>().is_ok());
405 }
406
407 #[test]
408 fn test_distributed_lock_key_format() {
409 let prefix = "myrelayer";
411 let lock_name = "transaction_cleanup";
412 let lock_key = format!("{prefix}:lock:{lock_name}");
413 assert_eq!(lock_key, "myrelayer:lock:transaction_cleanup");
414 }
415
416 #[test]
417 fn test_distributed_lock_key_format_with_complex_prefix() {
418 let prefix = "oz-relayer-prod";
420 let lock_name = "transaction_cleanup";
421 let lock_key = format!("{prefix}:lock:{lock_name}");
422 assert_eq!(lock_key, "oz-relayer-prod:lock:transaction_cleanup");
423 }
424
425 #[test]
426 fn test_distributed_lock_key_uses_exact_key() {
427 let lock_key = "myprefix:lock:myoperation";
429 assert_eq!(lock_key, "myprefix:lock:myoperation");
431 }
432
433 mod redis_connections_tests {
438 use super::*;
439
440 #[test]
441 fn test_new_single_pool_returns_same_pool_for_both() {
442 let cfg = Config::from_url("redis://127.0.0.1:6379");
445 let pool = cfg
446 .builder()
447 .expect("Failed to create pool builder")
448 .max_size(16)
449 .runtime(Runtime::Tokio1)
450 .build()
451 .expect("Failed to build pool");
452 let pool = Arc::new(pool);
453
454 let connections = RedisConnections::new_single_pool(pool.clone());
455
456 assert!(Arc::ptr_eq(connections.primary(), &pool));
458 assert!(Arc::ptr_eq(connections.reader(), &pool));
459 assert!(Arc::ptr_eq(connections.primary(), connections.reader()));
460 }
461
462 #[test]
463 fn test_redis_connections_clone() {
464 let cfg = Config::from_url("redis://127.0.0.1:6379");
466 let pool = cfg
467 .builder()
468 .expect("Failed to create pool builder")
469 .max_size(16)
470 .runtime(Runtime::Tokio1)
471 .build()
472 .expect("Failed to build pool");
473 let pool = Arc::new(pool);
474
475 let connections = RedisConnections::new_single_pool(pool);
476 let cloned = connections.clone();
477
478 assert!(Arc::ptr_eq(connections.primary(), cloned.primary()));
480 assert!(Arc::ptr_eq(connections.reader(), cloned.reader()));
481 }
482
483 #[test]
484 fn test_redis_connections_debug() {
485 let cfg = Config::from_url("redis://127.0.0.1:6379");
487 let pool = cfg
488 .builder()
489 .expect("Failed to create pool builder")
490 .max_size(16)
491 .runtime(Runtime::Tokio1)
492 .build()
493 .expect("Failed to build pool");
494 let pool = Arc::new(pool);
495
496 let connections = RedisConnections::new_single_pool(pool);
497 let debug_str = format!("{connections:?}");
498
499 assert!(debug_str.contains("RedisConnections"));
500 }
501 }
502
503 async fn create_test_redis_connection() -> Option<Arc<Pool>> {
511 let cfg = Config::from_url("redis://127.0.0.1:6379");
512 let pool = cfg
513 .builder()
514 .ok()?
515 .max_size(16)
516 .runtime(Runtime::Tokio1)
517 .build()
518 .ok()?;
519 Some(Arc::new(pool))
520 }
521
522 mod integration {
523 use super::*;
524
525 #[tokio::test]
526 #[ignore] async fn test_distributed_lock_acquire_and_release() {
528 let conn = create_test_redis_connection()
529 .await
530 .expect("Redis connection required for this test");
531
532 let lock =
533 DistributedLock::new(conn, "test:lock:acquire_release", Duration::from_secs(60));
534
535 let guard = lock.try_acquire().await.expect("Redis error");
537 assert!(guard.is_some(), "Should acquire lock when not held");
538
539 let released = guard.unwrap().release().await.expect("Redis error");
541 assert!(released, "Should successfully release the lock");
542 }
543
544 #[tokio::test]
545 #[ignore] async fn test_distributed_lock_prevents_double_acquisition() {
547 let conn = create_test_redis_connection()
548 .await
549 .expect("Redis connection required for this test");
550
551 let lock1 = DistributedLock::new(
552 conn.clone(),
553 "test:lock:double_acquire",
554 Duration::from_secs(60),
555 );
556 let lock2 =
557 DistributedLock::new(conn, "test:lock:double_acquire", Duration::from_secs(60));
558
559 let guard1 = lock1.try_acquire().await.expect("Redis error");
561 assert!(guard1.is_some(), "First acquisition should succeed");
562
563 let guard2 = lock2.try_acquire().await.expect("Redis error");
565 assert!(
566 guard2.is_none(),
567 "Second acquisition should fail - lock already held"
568 );
569
570 guard1.unwrap().release().await.expect("Redis error");
572
573 let guard2_retry = lock2.try_acquire().await.expect("Redis error");
575 assert!(guard2_retry.is_some(), "Should acquire lock after release");
576
577 guard2_retry.unwrap().release().await.expect("Redis error");
579 }
580
581 #[tokio::test]
582 #[ignore] async fn test_distributed_lock_expires_after_ttl() {
584 let conn = create_test_redis_connection()
585 .await
586 .expect("Redis connection required for this test");
587
588 let lock =
590 DistributedLock::new(conn.clone(), "test:lock:ttl_expiry", Duration::from_secs(1));
591
592 let guard = lock.try_acquire().await.expect("Redis error");
594 assert!(guard.is_some(), "Should acquire lock");
595
596 drop(guard);
598
599 tokio::time::sleep(Duration::from_secs(2)).await;
601
602 let lock2 = DistributedLock::new(conn, "test:lock:ttl_expiry", Duration::from_secs(60));
604 let guard2 = lock2.try_acquire().await.expect("Redis error");
605 assert!(guard2.is_some(), "Should acquire lock after TTL expiry");
606
607 if let Some(g) = guard2 {
609 g.release().await.expect("Redis error");
610 }
611 }
612
613 #[tokio::test]
614 #[ignore] async fn test_distributed_lock_release_only_own_lock() {
616 let conn = create_test_redis_connection()
617 .await
618 .expect("Redis connection required for this test");
619
620 let lock = DistributedLock::new(
622 conn.clone(),
623 "test:lock:release_own",
624 Duration::from_secs(1),
625 );
626
627 let guard = lock.try_acquire().await.expect("Redis error");
629 assert!(guard.is_some(), "Should acquire lock");
630
631 tokio::time::sleep(Duration::from_secs(2)).await;
633
634 let released = guard.unwrap().release().await.expect("Redis error");
636 assert!(!released, "Should not release expired lock");
637 }
638
639 #[tokio::test]
640 #[ignore] async fn test_distributed_lock_with_prefix() {
642 let conn = create_test_redis_connection()
643 .await
644 .expect("Redis connection required for this test");
645
646 let prefix = "test_prefix";
649 let lock_name = "cleanup";
650 let lock_key = format!("{prefix}:lock:{lock_name}");
651
652 let lock = DistributedLock::new(conn, &lock_key, Duration::from_secs(60));
653
654 let guard = lock.try_acquire().await.expect("Redis error");
655 assert!(guard.is_some(), "Should acquire prefixed lock");
656
657 guard.unwrap().release().await.expect("Redis error");
659 }
660
661 #[tokio::test]
662 #[ignore] async fn test_distributed_lock_drop_releases_lock() {
664 let conn = create_test_redis_connection()
665 .await
666 .expect("Redis connection required for this test");
667
668 let lock_key = "test:lock:drop_release";
669
670 {
672 let lock = DistributedLock::new(conn.clone(), lock_key, Duration::from_secs(60));
673 let guard = lock.try_acquire().await.expect("Redis error");
674 assert!(guard.is_some(), "Should acquire lock");
675
676 }
678
679 tokio::time::sleep(Duration::from_millis(100)).await;
681
682 let lock2 = DistributedLock::new(conn, lock_key, Duration::from_secs(60));
684 let guard2 = lock2.try_acquire().await.expect("Redis error");
685 assert!(
686 guard2.is_some(),
687 "Should acquire lock after previous guard was dropped"
688 );
689
690 if let Some(g) = guard2 {
692 g.release().await.expect("Redis error");
693 }
694 }
695
696 #[tokio::test]
697 #[ignore] async fn test_distributed_lock_explicit_release_prevents_double_release() {
699 let conn = create_test_redis_connection()
700 .await
701 .expect("Redis connection required for this test");
702
703 let lock_key = "test:lock:no_double_release";
704 let lock = DistributedLock::new(conn.clone(), lock_key, Duration::from_secs(60));
705
706 let guard = lock.try_acquire().await.expect("Redis error");
707 assert!(guard.is_some(), "Should acquire lock");
708
709 let released = guard.unwrap().release().await.expect("Redis error");
711 assert!(released, "Should release successfully");
712
713 tokio::time::sleep(Duration::from_millis(50)).await;
716
717 let lock2 = DistributedLock::new(conn, lock_key, Duration::from_secs(60));
718 let guard2 = lock2.try_acquire().await.expect("Redis error");
719 assert!(
720 guard2.is_some(),
721 "Should acquire lock - no double-release issue"
722 );
723
724 if let Some(g) = guard2 {
726 g.release().await.expect("Redis error");
727 }
728 }
729
730 #[tokio::test]
731 #[ignore] async fn test_distributed_lock_drop_on_early_return() {
733 let conn = create_test_redis_connection()
734 .await
735 .expect("Redis connection required for this test");
736
737 let lock_key = "test:lock:early_return";
738
739 async fn simulated_work_with_early_return(
741 conn: Arc<Pool>,
742 lock_key: &str,
743 ) -> Result<(), &'static str> {
744 let lock = DistributedLock::new(conn, lock_key, Duration::from_secs(60));
745 let _guard = lock
746 .try_acquire()
747 .await
748 .map_err(|_| "lock error")?
749 .ok_or("lock held")?;
750
751 Err("simulated error")
753
754 }
756
757 let result = simulated_work_with_early_return(conn.clone(), lock_key).await;
759 assert!(result.is_err(), "Should have returned early with error");
760
761 tokio::time::sleep(Duration::from_millis(100)).await;
763
764 let lock2 = DistributedLock::new(conn, lock_key, Duration::from_secs(60));
766 let guard2 = lock2.try_acquire().await.expect("Redis error");
767 assert!(
768 guard2.is_some(),
769 "Should acquire lock after early return released it"
770 );
771
772 if let Some(g) = guard2 {
774 g.release().await.expect("Redis error");
775 }
776 }
777
778 #[tokio::test]
783 #[ignore] async fn test_redis_connections_single_pool_operations() {
785 let pool = create_test_redis_connection()
786 .await
787 .expect("Redis connection required for this test");
788
789 let connections = RedisConnections::new_single_pool(pool);
790
791 let mut primary_conn = connections
793 .primary()
794 .get()
795 .await
796 .expect("Failed to get primary connection");
797 let mut reader_conn = connections
798 .reader()
799 .get()
800 .await
801 .expect("Failed to get reader connection");
802
803 let _: () = redis::cmd("SET")
805 .arg("test:connections:key")
806 .arg("test_value")
807 .query_async(&mut primary_conn)
808 .await
809 .expect("Failed to SET");
810
811 let value: String = redis::cmd("GET")
812 .arg("test:connections:key")
813 .query_async(&mut reader_conn)
814 .await
815 .expect("Failed to GET");
816
817 assert_eq!(value, "test_value");
818
819 let _: () = redis::cmd("DEL")
821 .arg("test:connections:key")
822 .query_async(&mut primary_conn)
823 .await
824 .expect("Failed to DEL");
825 }
826
827 #[tokio::test]
828 #[ignore] async fn test_redis_connections_backward_compatible() {
830 let pool = create_test_redis_connection()
832 .await
833 .expect("Redis connection required for this test");
834
835 let connections = Arc::new(RedisConnections::new_single_pool(pool));
836
837 let conn1 = connections.clone();
839 let conn2 = connections.clone();
840
841 let _primary1 = conn1.primary().get().await.expect("Failed to get primary");
843 let _reader1 = conn1.reader().get().await.expect("Failed to get reader");
844 let _primary2 = conn2.primary().get().await.expect("Failed to get primary");
845 let _reader2 = conn2.reader().get().await.expect("Failed to get reader");
846
847 }
849 }
850}