openzeppelin_relayer/utils/
redis.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use color_eyre::Result;
5use deadpool_redis::{Config, Pool, Runtime};
6use tracing::{debug, info, warn};
7
8use crate::config::ServerConfig;
9
10/// Holds separate connection pools for read and write operations.
11///
12/// This struct enables optimization for Redis deployments with read replicas,
13/// such as AWS ElastiCache. Write operations use the primary pool, while read
14/// operations can be distributed across reader replicas.
15///
16/// When `REDIS_READER_URL` is not configured, both pools point to the same
17/// Redis instance (the primary), maintaining backward compatibility.
18#[derive(Clone, Debug)]
19pub struct RedisConnections {
20    /// Pool for write operations (connected to primary endpoint)
21    primary_pool: Arc<Pool>,
22    /// Pool for read operations (connected to reader endpoint, or primary if not configured)
23    reader_pool: Arc<Pool>,
24}
25
26impl RedisConnections {
27    /// Creates a new `RedisConnections` with a single pool used for both reads and writes.
28    ///
29    /// This is useful for:
30    /// - Testing where read/write separation is not needed
31    /// - Simple deployments without read replicas
32    /// - Backward compatibility
33    pub fn new_single_pool(pool: Arc<Pool>) -> Self {
34        Self {
35            primary_pool: pool.clone(),
36            reader_pool: pool,
37        }
38    }
39
40    /// Returns the primary pool for write operations.
41    ///
42    /// Use this for: `create`, `update`, `delete`, and any operation that
43    /// modifies data in Redis.
44    pub fn primary(&self) -> &Arc<Pool> {
45        &self.primary_pool
46    }
47
48    /// Returns the reader pool for read operations.
49    ///
50    /// Use this for: `get_by_id`, `list`, `find_by_*`, `count`, and any
51    /// operation that only reads data from Redis.
52    ///
53    /// If no reader endpoint is configured, this returns the same pool as `primary()`.
54    pub fn reader(&self) -> &Arc<Pool> {
55        &self.reader_pool
56    }
57}
58
59/// Creates a Redis connection pool with the specified URL, pool size, and configuration.
60async fn create_pool(url: &str, pool_max_size: usize, config: &ServerConfig) -> Result<Arc<Pool>> {
61    let cfg = Config::from_url(url);
62
63    let pool = cfg
64        .builder()
65        .map_err(|e| eyre::eyre!("Failed to create Redis pool builder for {}: {}", url, e))?
66        .max_size(pool_max_size)
67        .wait_timeout(Some(Duration::from_millis(config.redis_pool_timeout_ms)))
68        .create_timeout(Some(Duration::from_millis(
69            config.redis_connection_timeout_ms,
70        )))
71        .recycle_timeout(Some(Duration::from_millis(
72            config.redis_connection_timeout_ms,
73        )))
74        .runtime(Runtime::Tokio1)
75        .build()
76        .map_err(|e| eyre::eyre!("Failed to build Redis pool for {}: {}", url, e))?;
77
78    // Verify the pool is working by getting a connection
79    let conn = pool
80        .get()
81        .await
82        .map_err(|e| eyre::eyre!("Failed to get initial Redis connection from {}: {}", url, e))?;
83    drop(conn);
84
85    Ok(Arc::new(pool))
86}
87
88/// Initializes Redis connection pools for both primary and reader endpoints.
89///
90/// # Arguments
91///
92/// * `config` - The server configuration containing Redis URLs and pool settings.
93///
94/// # Returns
95///
96/// A `RedisConnections` struct containing:
97/// - `primary_pool`: Connected to `REDIS_URL` (for write operations)
98/// - `reader_pool`: Connected to `REDIS_READER_URL` if set, otherwise same as primary
99///
100/// # Features
101///
102/// - **Read/Write Separation**: When `REDIS_READER_URL` is configured, read operations
103///   can be distributed across read replicas, reducing load on the primary.
104/// - **Backward Compatible**: If `REDIS_READER_URL` is not set, both pools use
105///   the primary URL, maintaining existing behavior.
106/// - **Connection Pooling**: Both pools use deadpool-redis with configurable
107///   max size, wait timeout, and connection timeouts.
108///
109/// # Example Configuration
110///
111/// ```bash
112/// # AWS ElastiCache with read replicas
113/// REDIS_URL=redis://my-cluster.xxx.cache.amazonaws.com:6379
114/// REDIS_READER_URL=redis://my-cluster-ro.xxx.cache.amazonaws.com:6379
115/// ```
116pub async fn initialize_redis_connections(config: &ServerConfig) -> Result<Arc<RedisConnections>> {
117    let primary_pool_size = config.redis_pool_max_size;
118    let primary_pool = create_pool(&config.redis_url, primary_pool_size, config).await?;
119
120    info!(
121        primary_url = %config.redis_url,
122        primary_pool_size = %primary_pool_size,
123        "initializing primary Redis connection pool"
124    );
125    let reader_pool = match &config.redis_reader_url {
126        Some(reader_url) if !reader_url.is_empty() => {
127            let reader_pool_size = config.redis_reader_pool_max_size;
128
129            info!(
130                primary_url = %config.redis_url,
131                reader_url = %reader_url,
132                primary_pool_size = %primary_pool_size,
133                reader_pool_size = %reader_pool_size,
134                "Using separate reader endpoint for read operations"
135            );
136            create_pool(reader_url, reader_pool_size, config).await?
137        }
138        _ => {
139            debug!(
140                primary_url = %config.redis_url,
141                pool_size = %primary_pool_size,
142                "No reader URL configured, using primary for all operations"
143            );
144            primary_pool.clone()
145        }
146    };
147
148    Ok(Arc::new(RedisConnections {
149        primary_pool,
150        reader_pool,
151    }))
152}
153
154/// A distributed lock implementation using Redis SET NX EX pattern.
155///
156/// This lock is designed for distributed systems where multiple instances
157/// need to coordinate exclusive access to shared resources. It uses the
158/// Redis SET command with NX (only set if not exists) and EX (expiry time)
159/// options to atomically acquire locks.
160///
161/// # Features
162/// - **Atomic acquisition**: Uses Redis SET NX to ensure only one instance can acquire the lock
163/// - **Automatic expiry**: Lock automatically expires after TTL to prevent deadlocks
164/// - **Unique identifiers**: Each lock acquisition gets a unique ID to prevent accidental release by other instances
165/// - **Auto-release on drop**: Lock is released when the guard is dropped (best-effort via spawned task)
166///
167/// # TTL Considerations
168/// The TTL should be set to accommodate the worst-case runtime of the protected operation.
169/// If the operation runs longer than TTL, another instance may acquire the lock concurrently.
170/// For long-running operations, consider:
171/// - Setting a generous TTL (e.g., 2x expected runtime)
172/// - Implementing lock refresh/extension during processing
173/// - Breaking the operation into smaller locked sections
174///
175/// # Example
176/// ```ignore
177/// // Key format: {prefix}:lock:{name}
178/// let lock_key = format!("{}:lock:{}", prefix, "my-operation");
179/// let lock = DistributedLock::new(redis_client, &lock_key, Duration::from_secs(60));
180/// if let Some(guard) = lock.try_acquire().await? {
181///     // Do exclusive work here
182///     // Lock is automatically released when guard is dropped
183/// }
184/// ```
185#[derive(Clone)]
186pub struct DistributedLock {
187    pool: Arc<Pool>,
188    lock_key: String,
189    ttl: Duration,
190}
191
192impl DistributedLock {
193    /// Creates a new distributed lock instance.
194    ///
195    /// # Arguments
196    /// * `pool` - Redis connection pool
197    /// * `lock_key` - Full Redis key for this lock (e.g., "myprefix:lock:cleanup")
198    /// * `ttl` - Time-to-live for the lock. Lock will automatically expire after this duration
199    ///   to prevent deadlocks if the holder crashes.
200    pub fn new(pool: Arc<Pool>, lock_key: &str, ttl: Duration) -> Self {
201        Self {
202            pool,
203            lock_key: lock_key.to_string(),
204            ttl,
205        }
206    }
207
208    /// Attempts to acquire the distributed lock.
209    ///
210    /// This is a non-blocking operation. If the lock is already held by another
211    /// instance, it returns `Ok(None)` immediately without waiting.
212    ///
213    /// # Returns
214    /// * `Ok(Some(LockGuard))` - Lock was successfully acquired
215    /// * `Ok(None)` - Lock is already held by another instance
216    /// * `Err(_)` - Redis communication error
217    ///
218    /// # Lock Semantics
219    /// The lock is implemented using Redis `SET key value NX EX ttl`:
220    /// - `NX`: Only set if key does not exist (atomic check-and-set)
221    /// - `EX`: Set expiry time in seconds
222    pub async fn try_acquire(&self) -> Result<Option<LockGuard>> {
223        let lock_value = generate_lock_value();
224        let mut conn = self.pool.get().await?;
225
226        // Use SET NX EX for atomic lock acquisition with expiry
227        let result: Option<String> = redis::cmd("SET")
228            .arg(&self.lock_key)
229            .arg(&lock_value)
230            .arg("NX")
231            .arg("EX")
232            .arg(self.ttl.as_secs())
233            .query_async(&mut conn)
234            .await?;
235
236        match result {
237            Some(_) => {
238                debug!(
239                    lock_key = %self.lock_key,
240                    ttl_secs = %self.ttl.as_secs(),
241                    "distributed lock acquired"
242                );
243                Ok(Some(LockGuard {
244                    release_info: Some(LockReleaseInfo {
245                        pool: self.pool.clone(),
246                        lock_key: self.lock_key.clone(),
247                        lock_value,
248                    }),
249                }))
250            }
251            None => {
252                debug!(
253                    lock_key = %self.lock_key,
254                    "distributed lock already held by another instance"
255                );
256                Ok(None)
257            }
258        }
259    }
260}
261
262/// A guard that represents an acquired distributed lock.
263///
264/// The lock is automatically released when this guard is dropped. This ensures
265/// the lock is released regardless of how the protected code exits (normal return,
266/// early return via `?`, or panic).
267///
268/// The release is performed via a spawned task to avoid blocking in `Drop`.
269/// If you need to confirm the release succeeded, call `release()` explicitly instead.
270///
271/// # Drop Behavior
272/// When dropped, the guard spawns an async task to release the lock. This is
273/// best-effort: if the task fails (e.g., Redis unavailable), the lock will
274/// still expire after TTL.
275pub struct LockGuard {
276    /// Release info is wrapped in Option to support both explicit release and Drop.
277    /// When `release()` is called, this is taken (set to None) to prevent double-release.
278    /// When Drop runs, if this is Some, we spawn a release task.
279    release_info: Option<LockReleaseInfo>,
280}
281
282/// Internal struct holding the information needed to release a lock.
283struct LockReleaseInfo {
284    pool: Arc<Pool>,
285    lock_key: String,
286    lock_value: String,
287}
288
289impl LockGuard {
290    /// Explicitly releases the lock before TTL expiry.
291    ///
292    /// This uses a Lua script to ensure we only delete the lock if we still own it.
293    /// This prevents accidentally releasing a lock that was already expired and
294    /// re-acquired by another instance.
295    ///
296    /// Calling this method consumes the guard and prevents the Drop-based release.
297    ///
298    /// # Returns
299    /// * `Ok(true)` - Lock was successfully released
300    /// * `Ok(false)` - Lock was not released (already expired or owned by another instance)
301    /// * `Err(_)` - Redis communication error
302    pub async fn release(mut self) -> Result<bool> {
303        // Take the release info to prevent Drop from also trying to release
304        let info = self
305            .release_info
306            .take()
307            .expect("release_info should always be Some before release");
308
309        Self::do_release(info).await
310    }
311
312    /// Internal async release implementation.
313    async fn do_release(info: LockReleaseInfo) -> Result<bool> {
314        let mut conn = info.pool.get().await?;
315
316        // Lua script to atomically check and delete
317        // Only delete if the value matches (we still own the lock)
318        let script = r#"
319            if redis.call("GET", KEYS[1]) == ARGV[1] then
320                return redis.call("DEL", KEYS[1])
321            else
322                return 0
323            end
324        "#;
325
326        let result: i32 = redis::Script::new(script)
327            .key(&info.lock_key)
328            .arg(&info.lock_value)
329            .invoke_async(&mut conn)
330            .await?;
331
332        if result == 1 {
333            debug!(lock_key = %info.lock_key, "distributed lock released");
334            Ok(true)
335        } else {
336            warn!(
337                lock_key = %info.lock_key,
338                "distributed lock release failed - lock already expired or owned by another instance"
339            );
340            Ok(false)
341        }
342    }
343}
344
345impl Drop for LockGuard {
346    fn drop(&mut self) {
347        // If release_info is still Some, we need to release the lock
348        if let Some(info) = self.release_info.take() {
349            // Spawn a task to release the lock asynchronously
350            // This is best-effort; if it fails, the lock will expire after TTL
351            tokio::spawn(async move {
352                if let Err(e) = LockGuard::do_release(info).await {
353                    warn!(error = %e, "failed to release distributed lock in drop, will expire after TTL");
354                }
355            });
356        }
357    }
358}
359
360/// Generates a unique value for lock ownership verification.
361///
362/// Uses a combination of process ID, timestamp, and a monotonic counter
363/// to create a unique identifier for this lock acquisition. This avoids
364/// collisions in the same process even when calls share a timestamp.
365fn generate_lock_value() -> String {
366    use std::sync::atomic::{AtomicU64, Ordering};
367
368    static LOCK_VALUE_COUNTER: AtomicU64 = AtomicU64::new(0);
369    let process_id = std::process::id();
370    let timestamp = std::time::SystemTime::now()
371        .duration_since(std::time::UNIX_EPOCH)
372        .map(|d| d.as_nanos())
373        .unwrap_or(0);
374    let counter = LOCK_VALUE_COUNTER.fetch_add(1, Ordering::Relaxed);
375
376    format!("{process_id}:{timestamp}:{counter}")
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382
383    #[test]
384    fn test_generate_lock_value_is_unique() {
385        let value1 = generate_lock_value();
386        let value2 = generate_lock_value();
387
388        // Values should be different due to monotonic counter
389        assert_ne!(value1, value2);
390    }
391
392    #[test]
393    fn test_generate_lock_value_contains_process_id() {
394        let value = generate_lock_value();
395
396        // Should contain a colon separator
397        assert!(value.contains(':'));
398
399        // Should have two parts: process_id and timestamp
400        let parts: Vec<&str> = value.split(':').collect();
401        assert_eq!(parts.len(), 3);
402
403        // First part should be parseable as u32 (process ID)
404        assert!(parts[0].parse::<u32>().is_ok());
405    }
406
407    #[test]
408    fn test_distributed_lock_key_format() {
409        // Verify the lock key format: {prefix}:lock:{name}
410        let prefix = "myrelayer";
411        let lock_name = "transaction_cleanup";
412        let lock_key = format!("{prefix}:lock:{lock_name}");
413        assert_eq!(lock_key, "myrelayer:lock:transaction_cleanup");
414    }
415
416    #[test]
417    fn test_distributed_lock_key_format_with_complex_prefix() {
418        // Test with a more realistic prefix
419        let prefix = "oz-relayer-prod";
420        let lock_name = "transaction_cleanup";
421        let lock_key = format!("{prefix}:lock:{lock_name}");
422        assert_eq!(lock_key, "oz-relayer-prod:lock:transaction_cleanup");
423    }
424
425    #[test]
426    fn test_distributed_lock_key_uses_exact_key() {
427        // DistributedLock now uses the exact key provided (no automatic prefix)
428        let lock_key = "myprefix:lock:myoperation";
429        // The lock will use this key exactly as provided
430        assert_eq!(lock_key, "myprefix:lock:myoperation");
431    }
432
433    // =========================================================================
434    // RedisConnections tests
435    // =========================================================================
436
437    mod redis_connections_tests {
438        use super::*;
439
440        #[test]
441        fn test_new_single_pool_returns_same_pool_for_both() {
442            // This test verifies the backward-compatible single pool mode
443            // When no REDIS_READER_URL is set, both pools should be the same
444            let cfg = Config::from_url("redis://127.0.0.1:6379");
445            let pool = cfg
446                .builder()
447                .expect("Failed to create pool builder")
448                .max_size(16)
449                .runtime(Runtime::Tokio1)
450                .build()
451                .expect("Failed to build pool");
452            let pool = Arc::new(pool);
453
454            let connections = RedisConnections::new_single_pool(pool.clone());
455
456            // Both primary and reader should return the same pool
457            assert!(Arc::ptr_eq(connections.primary(), &pool));
458            assert!(Arc::ptr_eq(connections.reader(), &pool));
459            assert!(Arc::ptr_eq(connections.primary(), connections.reader()));
460        }
461
462        #[test]
463        fn test_redis_connections_clone() {
464            // Verify that RedisConnections can be cloned
465            let cfg = Config::from_url("redis://127.0.0.1:6379");
466            let pool = cfg
467                .builder()
468                .expect("Failed to create pool builder")
469                .max_size(16)
470                .runtime(Runtime::Tokio1)
471                .build()
472                .expect("Failed to build pool");
473            let pool = Arc::new(pool);
474
475            let connections = RedisConnections::new_single_pool(pool);
476            let cloned = connections.clone();
477
478            // Both should point to the same pools
479            assert!(Arc::ptr_eq(connections.primary(), cloned.primary()));
480            assert!(Arc::ptr_eq(connections.reader(), cloned.reader()));
481        }
482
483        #[test]
484        fn test_redis_connections_debug() {
485            // Verify Debug implementation exists
486            let cfg = Config::from_url("redis://127.0.0.1:6379");
487            let pool = cfg
488                .builder()
489                .expect("Failed to create pool builder")
490                .max_size(16)
491                .runtime(Runtime::Tokio1)
492                .build()
493                .expect("Failed to build pool");
494            let pool = Arc::new(pool);
495
496            let connections = RedisConnections::new_single_pool(pool);
497            let debug_str = format!("{connections:?}");
498
499            assert!(debug_str.contains("RedisConnections"));
500        }
501    }
502
503    // =========================================================================
504    // Integration tests - require a running Redis instance
505    // Run with: cargo test --lib redis::tests::integration -- --ignored
506    // =========================================================================
507
508    /// Helper to create a Redis connection for integration tests.
509    /// Expects Redis to be running on localhost:6379.
510    async fn create_test_redis_connection() -> Option<Arc<Pool>> {
511        let cfg = Config::from_url("redis://127.0.0.1:6379");
512        let pool = cfg
513            .builder()
514            .ok()?
515            .max_size(16)
516            .runtime(Runtime::Tokio1)
517            .build()
518            .ok()?;
519        Some(Arc::new(pool))
520    }
521
522    mod integration {
523        use super::*;
524
525        #[tokio::test]
526        #[ignore] // Requires running Redis instance
527        async fn test_distributed_lock_acquire_and_release() {
528            let conn = create_test_redis_connection()
529                .await
530                .expect("Redis connection required for this test");
531
532            let lock =
533                DistributedLock::new(conn, "test:lock:acquire_release", Duration::from_secs(60));
534
535            // Should be able to acquire the lock
536            let guard = lock.try_acquire().await.expect("Redis error");
537            assert!(guard.is_some(), "Should acquire lock when not held");
538
539            // Release the lock
540            let released = guard.unwrap().release().await.expect("Redis error");
541            assert!(released, "Should successfully release the lock");
542        }
543
544        #[tokio::test]
545        #[ignore] // Requires running Redis instance
546        async fn test_distributed_lock_prevents_double_acquisition() {
547            let conn = create_test_redis_connection()
548                .await
549                .expect("Redis connection required for this test");
550
551            let lock1 = DistributedLock::new(
552                conn.clone(),
553                "test:lock:double_acquire",
554                Duration::from_secs(60),
555            );
556            let lock2 =
557                DistributedLock::new(conn, "test:lock:double_acquire", Duration::from_secs(60));
558
559            // First acquisition should succeed
560            let guard1 = lock1.try_acquire().await.expect("Redis error");
561            assert!(guard1.is_some(), "First acquisition should succeed");
562
563            // Second acquisition should fail (lock already held)
564            let guard2 = lock2.try_acquire().await.expect("Redis error");
565            assert!(
566                guard2.is_none(),
567                "Second acquisition should fail - lock already held"
568            );
569
570            // Release the first lock
571            guard1.unwrap().release().await.expect("Redis error");
572
573            // Now second acquisition should succeed
574            let guard2_retry = lock2.try_acquire().await.expect("Redis error");
575            assert!(guard2_retry.is_some(), "Should acquire lock after release");
576
577            // Cleanup
578            guard2_retry.unwrap().release().await.expect("Redis error");
579        }
580
581        #[tokio::test]
582        #[ignore] // Requires running Redis instance
583        async fn test_distributed_lock_expires_after_ttl() {
584            let conn = create_test_redis_connection()
585                .await
586                .expect("Redis connection required for this test");
587
588            // Use a very short TTL for testing
589            let lock =
590                DistributedLock::new(conn.clone(), "test:lock:ttl_expiry", Duration::from_secs(1));
591
592            // Acquire the lock
593            let guard = lock.try_acquire().await.expect("Redis error");
594            assert!(guard.is_some(), "Should acquire lock");
595
596            // Don't release - let it expire
597            drop(guard);
598
599            // Wait for TTL to expire
600            tokio::time::sleep(Duration::from_secs(2)).await;
601
602            // Should be able to acquire again after expiry
603            let lock2 = DistributedLock::new(conn, "test:lock:ttl_expiry", Duration::from_secs(60));
604            let guard2 = lock2.try_acquire().await.expect("Redis error");
605            assert!(guard2.is_some(), "Should acquire lock after TTL expiry");
606
607            // Cleanup
608            if let Some(g) = guard2 {
609                g.release().await.expect("Redis error");
610            }
611        }
612
613        #[tokio::test]
614        #[ignore] // Requires running Redis instance
615        async fn test_distributed_lock_release_only_own_lock() {
616            let conn = create_test_redis_connection()
617                .await
618                .expect("Redis connection required for this test");
619
620            // Use a short TTL
621            let lock = DistributedLock::new(
622                conn.clone(),
623                "test:lock:release_own",
624                Duration::from_secs(1),
625            );
626
627            // Acquire the lock
628            let guard = lock.try_acquire().await.expect("Redis error");
629            assert!(guard.is_some(), "Should acquire lock");
630
631            // Wait for lock to expire
632            tokio::time::sleep(Duration::from_secs(2)).await;
633
634            // Try to release expired lock - should return false (not owned anymore)
635            let released = guard.unwrap().release().await.expect("Redis error");
636            assert!(!released, "Should not release expired lock");
637        }
638
639        #[tokio::test]
640        #[ignore] // Requires running Redis instance
641        async fn test_distributed_lock_with_prefix() {
642            let conn = create_test_redis_connection()
643                .await
644                .expect("Redis connection required for this test");
645
646            // Simulate prefixed lock key like in transaction cleanup
647            // Format: {prefix}:lock:{name}
648            let prefix = "test_prefix";
649            let lock_name = "cleanup";
650            let lock_key = format!("{prefix}:lock:{lock_name}");
651
652            let lock = DistributedLock::new(conn, &lock_key, Duration::from_secs(60));
653
654            let guard = lock.try_acquire().await.expect("Redis error");
655            assert!(guard.is_some(), "Should acquire prefixed lock");
656
657            // Cleanup
658            guard.unwrap().release().await.expect("Redis error");
659        }
660
661        #[tokio::test]
662        #[ignore] // Requires running Redis instance
663        async fn test_distributed_lock_drop_releases_lock() {
664            let conn = create_test_redis_connection()
665                .await
666                .expect("Redis connection required for this test");
667
668            let lock_key = "test:lock:drop_release";
669
670            // Acquire lock in inner scope
671            {
672                let lock = DistributedLock::new(conn.clone(), lock_key, Duration::from_secs(60));
673                let guard = lock.try_acquire().await.expect("Redis error");
674                assert!(guard.is_some(), "Should acquire lock");
675
676                // Guard is dropped here without explicit release()
677            }
678
679            // Give the spawned release task time to complete
680            tokio::time::sleep(Duration::from_millis(100)).await;
681
682            // Should be able to acquire again because Drop released the lock
683            let lock2 = DistributedLock::new(conn, lock_key, Duration::from_secs(60));
684            let guard2 = lock2.try_acquire().await.expect("Redis error");
685            assert!(
686                guard2.is_some(),
687                "Should acquire lock after previous guard was dropped"
688            );
689
690            // Cleanup
691            if let Some(g) = guard2 {
692                g.release().await.expect("Redis error");
693            }
694        }
695
696        #[tokio::test]
697        #[ignore] // Requires running Redis instance
698        async fn test_distributed_lock_explicit_release_prevents_double_release() {
699            let conn = create_test_redis_connection()
700                .await
701                .expect("Redis connection required for this test");
702
703            let lock_key = "test:lock:no_double_release";
704            let lock = DistributedLock::new(conn.clone(), lock_key, Duration::from_secs(60));
705
706            let guard = lock.try_acquire().await.expect("Redis error");
707            assert!(guard.is_some(), "Should acquire lock");
708
709            // Explicitly release
710            let released = guard.unwrap().release().await.expect("Redis error");
711            assert!(released, "Should release successfully");
712
713            // Guard is now consumed by release(), Drop won't run
714            // Verify lock is still released (not double-locked)
715            tokio::time::sleep(Duration::from_millis(50)).await;
716
717            let lock2 = DistributedLock::new(conn, lock_key, Duration::from_secs(60));
718            let guard2 = lock2.try_acquire().await.expect("Redis error");
719            assert!(
720                guard2.is_some(),
721                "Should acquire lock - no double-release issue"
722            );
723
724            // Cleanup
725            if let Some(g) = guard2 {
726                g.release().await.expect("Redis error");
727            }
728        }
729
730        #[tokio::test]
731        #[ignore] // Requires running Redis instance
732        async fn test_distributed_lock_drop_on_early_return() {
733            let conn = create_test_redis_connection()
734                .await
735                .expect("Redis connection required for this test");
736
737            let lock_key = "test:lock:early_return";
738
739            // Simulate a function that returns early (like ? operator)
740            async fn simulated_work_with_early_return(
741                conn: Arc<Pool>,
742                lock_key: &str,
743            ) -> Result<(), &'static str> {
744                let lock = DistributedLock::new(conn, lock_key, Duration::from_secs(60));
745                let _guard = lock
746                    .try_acquire()
747                    .await
748                    .map_err(|_| "lock error")?
749                    .ok_or("lock held")?;
750
751                // Simulate early return (error path)
752                Err("simulated error")
753
754                // _guard is dropped here due to early return
755            }
756
757            // Call the function that returns early
758            let result = simulated_work_with_early_return(conn.clone(), lock_key).await;
759            assert!(result.is_err(), "Should have returned early with error");
760
761            // Give the spawned release task time to complete
762            tokio::time::sleep(Duration::from_millis(100)).await;
763
764            // Lock should be released despite the early return
765            let lock2 = DistributedLock::new(conn, lock_key, Duration::from_secs(60));
766            let guard2 = lock2.try_acquire().await.expect("Redis error");
767            assert!(
768                guard2.is_some(),
769                "Should acquire lock after early return released it"
770            );
771
772            // Cleanup
773            if let Some(g) = guard2 {
774                g.release().await.expect("Redis error");
775            }
776        }
777
778        // =========================================================================
779        // RedisConnections integration tests
780        // =========================================================================
781
782        #[tokio::test]
783        #[ignore] // Requires running Redis instance
784        async fn test_redis_connections_single_pool_operations() {
785            let pool = create_test_redis_connection()
786                .await
787                .expect("Redis connection required for this test");
788
789            let connections = RedisConnections::new_single_pool(pool);
790
791            // Test that we can get connections from both pools
792            let mut primary_conn = connections
793                .primary()
794                .get()
795                .await
796                .expect("Failed to get primary connection");
797            let mut reader_conn = connections
798                .reader()
799                .get()
800                .await
801                .expect("Failed to get reader connection");
802
803            // Test basic operations on both connections
804            let _: () = redis::cmd("SET")
805                .arg("test:connections:key")
806                .arg("test_value")
807                .query_async(&mut primary_conn)
808                .await
809                .expect("Failed to SET");
810
811            let value: String = redis::cmd("GET")
812                .arg("test:connections:key")
813                .query_async(&mut reader_conn)
814                .await
815                .expect("Failed to GET");
816
817            assert_eq!(value, "test_value");
818
819            // Cleanup
820            let _: () = redis::cmd("DEL")
821                .arg("test:connections:key")
822                .query_async(&mut primary_conn)
823                .await
824                .expect("Failed to DEL");
825        }
826
827        #[tokio::test]
828        #[ignore] // Requires running Redis instance
829        async fn test_redis_connections_backward_compatible() {
830            // Verify that single pool mode (no reader URL) works correctly
831            let pool = create_test_redis_connection()
832                .await
833                .expect("Redis connection required for this test");
834
835            let connections = Arc::new(RedisConnections::new_single_pool(pool));
836
837            // Multiple repositories should be able to share the connections
838            let conn1 = connections.clone();
839            let conn2 = connections.clone();
840
841            // Both should be able to get connections
842            let _primary1 = conn1.primary().get().await.expect("Failed to get primary");
843            let _reader1 = conn1.reader().get().await.expect("Failed to get reader");
844            let _primary2 = conn2.primary().get().await.expect("Failed to get primary");
845            let _reader2 = conn2.reader().get().await.expect("Failed to get reader");
846
847            // All should work without issues (backward compatible)
848        }
849    }
850}