openzeppelin_relayer/services/plugins/
runner.rs

1//! This module is the orchestrator of the plugin execution.
2//!
3//! 1. Initiates connection to shared socket service - shared_socket.rs
4//! 2. Executes the plugin script - script_executor.rs OR pool_executor.rs
5//! 3. Collects traces via ExecutionGuard - shared_socket.rs
6//! 4. Returns the output of the script - script_executor.rs
7//!
8//! ## Execution Modes
9//!
10//! - **Pool mode** (default): Uses persistent Piscina worker pool.
11//!   Faster execution with precompilation and worker reuse.
12//! - **ts-node mode** (`PLUGIN_USE_POOL=false`): Spawns ts-node per request.
13//!   Simple but slower. Uses the shared socket for bidirectional communication.
14//!
15use std::{collections::HashMap, sync::Arc, time::Duration};
16
17use crate::services::plugins::{
18    ensure_shared_socket_started, get_pool_manager, get_shared_socket_service, ScriptExecutor,
19    ScriptResult,
20};
21use crate::{
22    jobs::JobProducerTrait,
23    models::{
24        NetworkRepoModel, NotificationRepoModel, RelayerRepoModel, SignerRepoModel,
25        ThinDataAppState, TransactionRepoModel,
26    },
27    repositories::{
28        ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
29        Repository, TransactionCounterTrait, TransactionRepository,
30    },
31};
32
33use super::{config::get_config, PluginError};
34use async_trait::async_trait;
35use tokio::time::timeout;
36use tracing::debug;
37use uuid::Uuid;
38
39#[cfg(test)]
40use mockall::automock;
41
42/// Check if pool-based execution is enabled via environment variable
43/// Pool mode is enabled by default for better performance
44fn use_pool_executor() -> bool {
45    std::env::var("PLUGIN_USE_POOL")
46        .map(|v| v.eq_ignore_ascii_case("true") || v == "1")
47        .unwrap_or(true) // Pool mode is now the default
48}
49
50/// Get trace timeout duration from centralized config
51fn get_trace_timeout() -> Duration {
52    Duration::from_millis(get_config().trace_timeout_ms)
53}
54
55#[cfg_attr(test, automock)]
56#[async_trait]
57pub trait PluginRunnerTrait {
58    #[allow(clippy::type_complexity, clippy::too_many_arguments)]
59    async fn run<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
60        &self,
61        plugin_id: String,
62        socket_path: &str,
63        script_path: String,
64        timeout_duration: Duration,
65        script_params: String,
66        http_request_id: Option<String>,
67        headers_json: Option<String>,
68        route: Option<String>,
69        config_json: Option<String>,
70        method: Option<String>,
71        query_json: Option<String>,
72        emit_traces: bool,
73        state: Arc<ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>,
74    ) -> Result<ScriptResult, PluginError>
75    where
76        J: JobProducerTrait + Send + Sync + 'static,
77        RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
78        TR: TransactionRepository
79            + Repository<TransactionRepoModel, String>
80            + Send
81            + Sync
82            + 'static,
83        NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
84        NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
85        SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
86        TCR: TransactionCounterTrait + Send + Sync + 'static,
87        PR: PluginRepositoryTrait + Send + Sync + 'static,
88        AKR: ApiKeyRepositoryTrait + Send + Sync + 'static;
89}
90
91#[derive(Default)]
92pub struct PluginRunner;
93
94#[async_trait]
95impl PluginRunnerTrait for PluginRunner {
96    async fn run<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
97        &self,
98        plugin_id: String,
99        socket_path: &str,
100        script_path: String,
101        timeout_duration: Duration,
102        script_params: String,
103        http_request_id: Option<String>,
104        headers_json: Option<String>,
105        route: Option<String>,
106        config_json: Option<String>,
107        method: Option<String>,
108        query_json: Option<String>,
109        emit_traces: bool,
110        state: Arc<ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>,
111    ) -> Result<ScriptResult, PluginError>
112    where
113        J: JobProducerTrait + Send + Sync + 'static,
114        RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
115        TR: TransactionRepository
116            + Repository<TransactionRepoModel, String>
117            + Send
118            + Sync
119            + 'static,
120        NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
121        NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
122        SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
123        TCR: TransactionCounterTrait + Send + Sync + 'static,
124        PR: PluginRepositoryTrait + Send + Sync + 'static,
125        AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
126    {
127        // Choose execution mode based on environment variable
128        if use_pool_executor() {
129            return self
130                .run_with_pool(
131                    plugin_id,
132                    socket_path,
133                    script_path,
134                    timeout_duration,
135                    script_params,
136                    http_request_id,
137                    headers_json,
138                    route,
139                    config_json,
140                    method,
141                    query_json,
142                    emit_traces,
143                    state,
144                )
145                .await;
146        }
147
148        // Default: ts-node execution
149        self.run_with_tsnode(
150            plugin_id,
151            socket_path,
152            script_path,
153            timeout_duration,
154            script_params,
155            http_request_id,
156            headers_json,
157            route,
158            config_json,
159            method,
160            query_json,
161            emit_traces,
162            state,
163        )
164        .await
165    }
166}
167
168impl PluginRunner {
169    /// Execute plugin using ts-node with shared socket
170    #[allow(clippy::too_many_arguments, clippy::type_complexity)]
171    async fn run_with_tsnode<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
172        &self,
173        plugin_id: String,
174        _socket_path: &str, // Unused - kept for signature compatibility
175        script_path: String,
176        timeout_duration: Duration,
177        script_params: String,
178        http_request_id: Option<String>,
179        headers_json: Option<String>,
180        route: Option<String>,
181        config_json: Option<String>,
182        method: Option<String>,
183        query_json: Option<String>,
184        emit_traces: bool,
185        state: Arc<ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>,
186    ) -> Result<ScriptResult, PluginError>
187    where
188        J: JobProducerTrait + Send + Sync + 'static,
189        RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
190        TR: TransactionRepository
191            + Repository<TransactionRepoModel, String>
192            + Send
193            + Sync
194            + 'static,
195        NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
196        NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
197        SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
198        TCR: TransactionCounterTrait + Send + Sync + 'static,
199        PR: PluginRepositoryTrait + Send + Sync + 'static,
200        AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
201    {
202        // Ensure shared socket is started
203        ensure_shared_socket_started(Arc::clone(&state)).await?;
204
205        // Get the shared socket service
206        let shared_socket = get_shared_socket_service()?;
207        let shared_socket_path = shared_socket.socket_path().to_string();
208
209        // Generate execution_id from http_request_id or plugin_id
210        let execution_id = http_request_id
211            .clone()
212            .unwrap_or_else(|| format!("{}-{}", plugin_id, uuid::Uuid::new_v4()));
213
214        // Register execution (RAII guard auto-unregisters on drop)
215        let guard = shared_socket
216            .register_execution(execution_id.clone(), emit_traces)
217            .await;
218
219        let exec_outcome = match timeout(
220            timeout_duration,
221            ScriptExecutor::execute_typescript(
222                plugin_id,
223                script_path,
224                shared_socket_path, // Use shared socket path
225                script_params,
226                Some(execution_id),
227                headers_json,
228                route,
229                config_json,
230                method,
231                query_json,
232            ),
233        )
234        .await
235        {
236            Ok(result) => result,
237            Err(_) => {
238                return Err(PluginError::ScriptTimeout(timeout_duration.as_secs()));
239            }
240        };
241
242        // Collect traces from the guard
243        let traces = if emit_traces {
244            match guard.into_receiver() {
245                Some(mut traces_rx) => {
246                    match tokio::time::timeout(get_trace_timeout(), traces_rx.recv()).await {
247                        Ok(Some(traces)) => traces,
248                        Ok(None) => Vec::new(),
249                        Err(_) => {
250                            debug!("Timeout waiting for traces");
251                            Vec::new()
252                        }
253                    }
254                }
255                None => Vec::new(),
256            }
257        } else {
258            Vec::new()
259        };
260
261        match exec_outcome {
262            Ok(mut script_result) => {
263                // attach traces on success
264                script_result.trace = traces;
265                Ok(script_result)
266            }
267            Err(err) => Err(err.with_traces(traces)),
268        }
269    }
270
271    /// Execute plugin using worker pool (new high-performance mode)
272    /// Uses shared socket service for better scalability
273    #[allow(clippy::too_many_arguments, clippy::type_complexity)]
274    async fn run_with_pool<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
275        &self,
276        plugin_id: String,
277        _socket_path: &str, // Unused - we use shared socket instead
278        script_path: String,
279        timeout_duration: Duration,
280        script_params: String,
281        http_request_id: Option<String>,
282        headers_json: Option<String>,
283        route: Option<String>,
284        config_json: Option<String>,
285        method: Option<String>,
286        query_json: Option<String>,
287        emit_traces: bool,
288        state: Arc<ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>,
289    ) -> Result<ScriptResult, PluginError>
290    where
291        J: JobProducerTrait + Send + Sync + 'static,
292        RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
293        TR: TransactionRepository
294            + Repository<TransactionRepoModel, String>
295            + Send
296            + Sync
297            + 'static,
298        NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
299        NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
300        SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
301        TCR: TransactionCounterTrait + Send + Sync + 'static,
302        PR: PluginRepositoryTrait + Send + Sync + 'static,
303        AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
304    {
305        // Ensure shared socket service is started
306        ensure_shared_socket_started(Arc::clone(&state)).await?;
307
308        // Get shared socket service
309        let shared_socket = get_shared_socket_service()?;
310        let shared_socket_path = shared_socket.socket_path().to_string();
311
312        // Generate execution ID (use http_request_id if available, otherwise generate one)
313        let execution_id = http_request_id
314            .clone()
315            .unwrap_or_else(|| format!("exec-{}", Uuid::new_v4()));
316
317        // Always register execution so API calls from plugin can be validated
318        // ExecutionGuard will auto-unregister on drop (RAII pattern)
319        let execution_guard = shared_socket
320            .register_execution(execution_id.clone(), emit_traces)
321            .await;
322
323        // Execute via pool manager (using shared socket path)
324        let pool_manager = get_pool_manager();
325
326        // Parse params as JSON Value
327        let params: serde_json::Value = serde_json::from_str(&script_params)
328            .unwrap_or(serde_json::Value::String(script_params.clone()));
329
330        // Parse headers if present
331        let headers: Option<HashMap<String, Vec<String>>> = headers_json
332            .as_ref()
333            .and_then(|h| serde_json::from_str(h).ok());
334
335        // Parse config if present
336        let config: Option<serde_json::Value> = config_json
337            .as_ref()
338            .and_then(|c| serde_json::from_str(c).ok());
339
340        // Parse query if present
341        let query: Option<serde_json::Value> = query_json
342            .as_ref()
343            .and_then(|q| serde_json::from_str(q).ok());
344
345        let exec_outcome = match timeout(
346            timeout_duration,
347            pool_manager.execute_plugin(
348                plugin_id.clone(),
349                None,              // compiled_code - will be fetched from cache
350                Some(script_path), // plugin_path
351                params,
352                headers,
353                shared_socket_path, // Use shared socket path instead of unique one
354                Some(execution_id.clone()), // Pass the registered execution_id
355                Some(timeout_duration.as_secs()),
356                route,
357                config,
358                method,
359                query,
360            ),
361        )
362        .await
363        {
364            Ok(result) => result,
365            Err(_) => {
366                // No need to manually unregister - ExecutionGuard handles it
367                return Err(PluginError::ScriptTimeout(timeout_duration.as_secs()));
368            }
369        };
370
371        // Collect traces only if emit_traces is enabled
372        let traces = if emit_traces {
373            // Convert guard to receiver only now, keeping guard alive during execution
374            match execution_guard.into_receiver() {
375                Some(mut rx) => {
376                    // Wait for traces with short timeout - they arrive immediately if the plugin used the API
377                    let trace_timeout = get_trace_timeout().min(timeout_duration);
378                    match timeout(trace_timeout, rx.recv()).await {
379                        Ok(Some(traces)) => traces,
380                        Ok(None) | Err(_) => Vec::new(),
381                    }
382                }
383                None => Vec::new(),
384            }
385        } else {
386            // Drop the guard without waiting for traces
387            drop(execution_guard);
388            Vec::new()
389        };
390
391        // ExecutionGuard auto-unregisters when guard is dropped (after trace collection)
392
393        match exec_outcome {
394            Ok(mut script_result) => {
395                script_result.trace = traces;
396                Ok(script_result)
397            }
398            Err(e) => Err(e.with_traces(traces)),
399        }
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use actix_web::web;
406    use std::fs;
407
408    use crate::{
409        jobs::MockJobProducerTrait,
410        repositories::{
411            ApiKeyRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage,
412            PluginRepositoryStorage, RelayerRepositoryStorage, SignerRepositoryStorage,
413            TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
414        },
415        services::plugins::LogLevel,
416        utils::mocks::mockutils::create_mock_app_state,
417    };
418    use tempfile::tempdir;
419
420    use super::*;
421
422    static TS_CONFIG: &str = r#"
423        {
424            "compilerOptions": {
425              "target": "es2016",
426              "module": "commonjs",
427              "esModuleInterop": true,
428              "forceConsistentCasingInFileNames": true,
429              "strict": true,
430              "skipLibCheck": true
431            }
432          }
433    "#;
434
435    // ============================================
436    // use_pool_executor() tests
437    // ============================================
438
439    #[test]
440    fn test_use_pool_executor_default_true() {
441        // Clear the env var to test default behavior
442        std::env::remove_var("PLUGIN_USE_POOL");
443
444        // Default should be true (pool mode is the default)
445        assert!(use_pool_executor());
446    }
447
448    #[test]
449    fn test_use_pool_executor_explicit_true() {
450        std::env::set_var("PLUGIN_USE_POOL", "true");
451        assert!(use_pool_executor());
452
453        std::env::set_var("PLUGIN_USE_POOL", "TRUE");
454        assert!(use_pool_executor());
455
456        std::env::set_var("PLUGIN_USE_POOL", "True");
457        assert!(use_pool_executor());
458
459        std::env::set_var("PLUGIN_USE_POOL", "1");
460        assert!(use_pool_executor());
461
462        std::env::remove_var("PLUGIN_USE_POOL");
463    }
464
465    #[test]
466    fn test_use_pool_executor_explicit_false() {
467        std::env::set_var("PLUGIN_USE_POOL", "false");
468        assert!(!use_pool_executor());
469
470        std::env::set_var("PLUGIN_USE_POOL", "FALSE");
471        assert!(!use_pool_executor());
472
473        std::env::set_var("PLUGIN_USE_POOL", "0");
474        assert!(!use_pool_executor());
475
476        std::env::set_var("PLUGIN_USE_POOL", "no");
477        assert!(!use_pool_executor());
478
479        std::env::set_var("PLUGIN_USE_POOL", "anything_else");
480        assert!(!use_pool_executor());
481
482        std::env::remove_var("PLUGIN_USE_POOL");
483    }
484
485    // ============================================
486    // get_trace_timeout() tests
487    // ============================================
488
489    #[test]
490    fn test_get_trace_timeout_returns_duration() {
491        let timeout = get_trace_timeout();
492        // Should return a duration from config
493        assert!(timeout.as_millis() > 0);
494    }
495
496    // ============================================
497    // PluginRunner tests
498    // ============================================
499
500    #[test]
501    fn test_plugin_runner_default() {
502        let runner = PluginRunner;
503        // Just verify it can be created
504        let _runner = runner;
505    }
506
507    #[tokio::test]
508    async fn test_run() {
509        // Use ts-node mode for this test since temp files are outside plugins directory
510        std::env::set_var("PLUGIN_USE_POOL", "false");
511
512        let temp_dir = tempdir().unwrap();
513        let ts_config = temp_dir.path().join("tsconfig.json");
514        let script_path = temp_dir.path().join("test_run.ts");
515        let socket_path = temp_dir.path().join("test_run.sock");
516
517        let content = r#"
518            export async function handler(api: any, params: any) {
519                console.log('test');
520                console.error('test-error');
521                return 'test-result';
522            }
523        "#;
524        fs::write(script_path.clone(), content).unwrap();
525        fs::write(ts_config.clone(), TS_CONFIG.as_bytes()).unwrap();
526
527        let state = create_mock_app_state(None, None, None, None, None, None).await;
528
529        let plugin_runner = PluginRunner;
530        let plugin_id = "test-plugin".to_string();
531        let socket_path_str = socket_path.display().to_string();
532        let script_path_str = script_path.display().to_string();
533        let result = plugin_runner
534            .run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>(
535                plugin_id,
536                &socket_path_str,
537                script_path_str,
538                Duration::from_secs(10),
539                "{ \"test\": \"test\" }".to_string(),
540                None,
541                None,
542                None,
543                None,
544                None,
545                None,
546                false, // emit_traces
547                Arc::new(web::ThinData(state)),
548            )
549            .await;
550
551        // Cleanup env var
552        std::env::remove_var("PLUGIN_USE_POOL");
553
554        if matches!(
555            result,
556            Err(PluginError::SocketError(ref msg)) if msg.contains("Operation not permitted")
557        ) {
558            eprintln!("skipping test_run due to sandbox socket restrictions");
559            return;
560        }
561
562        let result = result.expect("runner should complete without error");
563        assert_eq!(result.logs[0].level, LogLevel::Log);
564        assert_eq!(result.logs[0].message, "test");
565        assert_eq!(result.logs[1].level, LogLevel::Error);
566        assert_eq!(result.logs[1].message, "test-error");
567        assert_eq!(result.return_value, "test-result");
568    }
569
570    #[tokio::test]
571    async fn test_run_timeout() {
572        // Use ts-node mode for this test since temp files are outside plugins directory
573        std::env::set_var("PLUGIN_USE_POOL", "false");
574
575        let temp_dir = tempdir().unwrap();
576        let ts_config = temp_dir.path().join("tsconfig.json");
577        let script_path = temp_dir.path().join("test_simple_timeout.ts");
578        let socket_path = temp_dir.path().join("test_simple_timeout.sock");
579
580        // Script that takes 200ms
581        let content = r#"
582            function sleep(ms) {
583                return new Promise(resolve => setTimeout(resolve, ms));
584            }
585
586            async function main() {
587                await sleep(200); // 200ms
588                console.log(JSON.stringify({ level: 'result', message: 'Should not reach here' }));
589            }
590
591            main();
592        "#;
593
594        fs::write(script_path.clone(), content).unwrap();
595        fs::write(ts_config.clone(), TS_CONFIG.as_bytes()).unwrap();
596
597        let state = create_mock_app_state(None, None, None, None, None, None).await;
598        let plugin_runner = PluginRunner;
599
600        // Use 100ms timeout for a 200ms script
601        let plugin_id = "test-plugin".to_string();
602        let socket_path_str = socket_path.display().to_string();
603        let script_path_str = script_path.display().to_string();
604        let result = plugin_runner
605            .run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>(
606                plugin_id,
607                &socket_path_str,
608                script_path_str,
609                Duration::from_millis(100), // 100ms timeout
610                "{}".to_string(),
611                None,
612                None,
613                None,
614                None,
615                None,
616                None,
617                false, // emit_traces
618                Arc::new(web::ThinData(state)),
619            )
620            .await;
621
622        // Cleanup env var
623        std::env::remove_var("PLUGIN_USE_POOL");
624
625        // Should timeout
626        if matches!(
627            result,
628            Err(PluginError::SocketError(ref msg)) if msg.contains("Operation not permitted")
629        ) {
630            eprintln!("skipping test_run_timeout due to sandbox socket restrictions");
631            return;
632        }
633
634        let err = result.expect_err("runner should timeout");
635        assert!(err.to_string().contains("Script execution timed out after"));
636    }
637
638    #[tokio::test]
639    async fn test_run_with_emit_traces_true() {
640        // Use ts-node mode for this test since temp files are outside plugins directory
641        std::env::set_var("PLUGIN_USE_POOL", "false");
642
643        let temp_dir = tempdir().unwrap();
644        let ts_config = temp_dir.path().join("tsconfig.json");
645        let script_path = temp_dir.path().join("test_traces.ts");
646        let socket_path = temp_dir.path().join("test_traces.sock");
647
648        let content = r#"
649            export async function handler(api: any, params: any) {
650                console.log('trace test');
651                return 'trace-result';
652            }
653        "#;
654        fs::write(script_path.clone(), content).unwrap();
655        fs::write(ts_config.clone(), TS_CONFIG.as_bytes()).unwrap();
656
657        let state = create_mock_app_state(None, None, None, None, None, None).await;
658
659        let plugin_runner = PluginRunner;
660        let plugin_id = "test-plugin-traces".to_string();
661        let socket_path_str = socket_path.display().to_string();
662        let script_path_str = script_path.display().to_string();
663        let result = plugin_runner
664            .run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>(
665                plugin_id,
666                &socket_path_str,
667                script_path_str,
668                Duration::from_secs(10),
669                "{ \"test\": \"test\" }".to_string(),
670                Some("http-req-123".to_string()), // Test with http_request_id
671                Some(r#"{"content-type": ["application/json"]}"#.to_string()), // Test with headers
672                Some("/api/test".to_string()), // Test with route
673                Some(r#"{"key": "value"}"#.to_string()), // Test with config
674                Some("POST".to_string()), // Test with method
675                Some(r#"{"page": "1"}"#.to_string()), // Test with query
676                true, // emit_traces = true
677                Arc::new(web::ThinData(state)),
678            )
679            .await;
680
681        // Cleanup env var
682        std::env::remove_var("PLUGIN_USE_POOL");
683
684        if matches!(
685            result,
686            Err(PluginError::SocketError(ref msg)) if msg.contains("Operation not permitted")
687        ) {
688            eprintln!("skipping test_run_with_emit_traces_true due to sandbox socket restrictions");
689            return;
690        }
691
692        let result = result.expect("runner should complete without error");
693        assert_eq!(result.logs[0].level, LogLevel::Log);
694        assert_eq!(result.logs[0].message, "trace test");
695        assert_eq!(result.return_value, "trace-result");
696    }
697
698    #[tokio::test]
699    async fn test_run_with_generated_execution_id() {
700        // Use ts-node mode for this test
701        std::env::set_var("PLUGIN_USE_POOL", "false");
702
703        let temp_dir = tempdir().unwrap();
704        let ts_config = temp_dir.path().join("tsconfig.json");
705        let script_path = temp_dir.path().join("test_gen_exec_id.ts");
706        let socket_path = temp_dir.path().join("test_gen_exec_id.sock");
707
708        let content = r#"
709            export async function handler(api: any, params: any) {
710                return 'generated-id-test';
711            }
712        "#;
713        fs::write(script_path.clone(), content).unwrap();
714        fs::write(ts_config.clone(), TS_CONFIG.as_bytes()).unwrap();
715
716        let state = create_mock_app_state(None, None, None, None, None, None).await;
717
718        let plugin_runner = PluginRunner;
719        let plugin_id = "test-plugin-gen-id".to_string();
720        let socket_path_str = socket_path.display().to_string();
721        let script_path_str = script_path.display().to_string();
722
723        // Test with http_request_id = None (should generate one)
724        let result = plugin_runner
725            .run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>(
726                plugin_id,
727                &socket_path_str,
728                script_path_str,
729                Duration::from_secs(10),
730                "{}".to_string(),
731                None, // No http_request_id - will be generated
732                None,
733                None,
734                None,
735                None,
736                None,
737                false,
738                Arc::new(web::ThinData(state)),
739            )
740            .await;
741
742        // Cleanup env var
743        std::env::remove_var("PLUGIN_USE_POOL");
744
745        if matches!(
746            result,
747            Err(PluginError::SocketError(ref msg)) if msg.contains("Operation not permitted")
748        ) {
749            eprintln!(
750                "skipping test_run_with_generated_execution_id due to sandbox socket restrictions"
751            );
752            return;
753        }
754
755        let result = result.expect("runner should complete without error");
756        assert_eq!(result.return_value, "generated-id-test");
757    }
758
759    #[tokio::test]
760    async fn test_run_script_error() {
761        // Use ts-node mode for this test
762        std::env::set_var("PLUGIN_USE_POOL", "false");
763
764        let temp_dir = tempdir().unwrap();
765        let ts_config = temp_dir.path().join("tsconfig.json");
766        let script_path = temp_dir.path().join("test_error.ts");
767        let socket_path = temp_dir.path().join("test_error.sock");
768
769        // Script that throws an error
770        let content = r#"
771            export async function handler(api: any, params: any) {
772                throw new Error('Intentional test error');
773            }
774        "#;
775        fs::write(script_path.clone(), content).unwrap();
776        fs::write(ts_config.clone(), TS_CONFIG.as_bytes()).unwrap();
777
778        let state = create_mock_app_state(None, None, None, None, None, None).await;
779
780        let plugin_runner = PluginRunner;
781        let plugin_id = "test-plugin-error".to_string();
782        let socket_path_str = socket_path.display().to_string();
783        let script_path_str = script_path.display().to_string();
784
785        let result = plugin_runner
786            .run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>(
787                plugin_id,
788                &socket_path_str,
789                script_path_str,
790                Duration::from_secs(10),
791                "{}".to_string(),
792                None,
793                None,
794                None,
795                None,
796                None,
797                None,
798                true, // emit_traces to test error path with traces
799                Arc::new(web::ThinData(state)),
800            )
801            .await;
802
803        // Cleanup env var
804        std::env::remove_var("PLUGIN_USE_POOL");
805
806        if matches!(
807            result,
808            Err(PluginError::SocketError(ref msg)) if msg.contains("Operation not permitted")
809        ) {
810            eprintln!("skipping test_run_script_error due to sandbox socket restrictions");
811            return;
812        }
813
814        // Should return an error
815        assert!(result.is_err());
816        let err = result.unwrap_err();
817        let err_str = format!("{err:?}");
818        assert!(
819            err_str.contains("Intentional test error") || err_str.contains("Error"),
820            "Expected error message, got: {err_str}"
821        );
822    }
823}