1use std::{fmt, sync::Arc};
4
5use crate::observability::request_id::get_request_id;
6use crate::{
7 jobs::JobProducerTrait,
8 models::{
9 AppState, NetworkRepoModel, NotificationRepoModel, PluginCallRequest, PluginMetadata,
10 PluginModel, RelayerRepoModel, SignerRepoModel, ThinDataAppState, TransactionRepoModel,
11 },
12 repositories::{
13 ApiKeyRepositoryTrait, NetworkRepository, PluginRepositoryTrait, RelayerRepository,
14 Repository, TransactionCounterTrait, TransactionRepository,
15 },
16};
17use actix_web::web;
18use async_trait::async_trait;
19use serde::{Deserialize, Serialize};
20use thiserror::Error;
21use uuid::Uuid;
22
23pub mod config;
24pub use config::*;
25
26pub mod health;
27pub use health::*;
28
29pub mod protocol;
30pub use protocol::*;
31
32pub mod connection;
33pub use connection::*;
34
35pub mod runner;
36pub use runner::*;
37
38pub mod relayer_api;
39pub use relayer_api::*;
40
41pub mod script_executor;
42pub use script_executor::*;
43
44pub mod pool_executor;
45pub use pool_executor::*;
46
47pub mod shared_socket;
48pub use shared_socket::*;
49
50#[cfg(test)]
51use mockall::automock;
52
53#[derive(Error, Debug, Serialize)]
54pub enum PluginError {
55 #[error("Socket error: {0}")]
56 SocketError(String),
57 #[error("Plugin error: {0}")]
58 PluginError(String),
59 #[error("Relayer error: {0}")]
60 RelayerError(String),
61 #[error("Plugin execution error: {0}")]
62 PluginExecutionError(String),
63 #[error("Script execution timed out after {0} seconds")]
64 ScriptTimeout(u64),
65 #[error("Invalid method: {0}")]
66 InvalidMethod(String),
67 #[error("Invalid payload: {0}")]
68 InvalidPayload(String),
69 #[error("{0}")]
70 HandlerError(Box<PluginHandlerPayload>),
71}
72
73impl PluginError {
74 pub fn with_traces(self, traces: Vec<serde_json::Value>) -> Self {
77 match self {
78 PluginError::HandlerError(mut payload) => {
79 payload.append_traces(traces);
80 PluginError::HandlerError(payload)
81 }
82 other => other,
83 }
84 }
85}
86
87impl From<PluginError> for String {
88 fn from(error: PluginError) -> Self {
89 error.to_string()
90 }
91}
92
93#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
94pub struct PluginCallResponse {
95 pub result: serde_json::Value,
97 #[serde(skip_serializing_if = "Option::is_none")]
99 pub metadata: Option<PluginMetadata>,
100}
101
102#[derive(Debug, Serialize, Deserialize, utoipa::ToSchema)]
103pub struct PluginHandlerError {
104 #[serde(skip_serializing_if = "Option::is_none")]
105 pub code: Option<String>,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub details: Option<serde_json::Value>,
108}
109
110#[derive(Debug)]
111pub struct PluginHandlerResponse {
112 pub status: u16,
113 pub message: String,
114 pub error: PluginHandlerError,
115 pub metadata: Option<PluginMetadata>,
116}
117
118#[derive(Debug, Serialize)]
119pub struct PluginHandlerPayload {
120 pub status: u16,
121 pub message: String,
122 #[serde(skip_serializing_if = "Option::is_none")]
123 pub code: Option<String>,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 pub details: Option<serde_json::Value>,
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub logs: Option<Vec<LogEntry>>,
128 #[serde(skip_serializing_if = "Option::is_none")]
129 pub traces: Option<Vec<serde_json::Value>>,
130}
131
132impl PluginHandlerPayload {
133 fn append_traces(&mut self, traces: Vec<serde_json::Value>) {
134 match &mut self.traces {
135 Some(existing) => existing.extend(traces),
136 None => self.traces = Some(traces),
137 }
138 }
139
140 fn into_response(self, emit_logs: bool, emit_traces: bool) -> PluginHandlerResponse {
141 let logs = if emit_logs { self.logs } else { None };
142 let traces = if emit_traces { self.traces } else { None };
143 let message = derive_handler_message(&self.message, logs.as_deref());
144 let metadata = build_metadata(logs, traces);
145
146 PluginHandlerResponse {
147 status: self.status,
148 message,
149 error: PluginHandlerError {
150 code: self.code,
151 details: self.details,
152 },
153 metadata,
154 }
155 }
156}
157
158impl fmt::Display for PluginHandlerPayload {
159 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
160 f.write_str(&self.message)
161 }
162}
163
164fn derive_handler_message(message: &str, logs: Option<&[LogEntry]>) -> String {
165 if !message.trim().is_empty() {
166 return message.to_string();
167 }
168
169 if let Some(logs) = logs {
170 if let Some(entry) = logs
171 .iter()
172 .rev()
173 .find(|entry| matches!(entry.level, LogLevel::Error | LogLevel::Warn))
174 {
175 return entry.message.clone();
176 }
177
178 if let Some(entry) = logs.last() {
179 return entry.message.clone();
180 }
181 }
182
183 "Plugin execution failed".to_string()
184}
185
186fn build_metadata(
187 logs: Option<Vec<LogEntry>>,
188 traces: Option<Vec<serde_json::Value>>,
189) -> Option<PluginMetadata> {
190 if logs.is_some() || traces.is_some() {
191 Some(PluginMetadata { logs, traces })
192 } else {
193 None
194 }
195}
196
197fn forward_logs_to_tracing(plugin_id: &str, logs: &[LogEntry], request_id: &str) {
198 for entry in logs {
199 match entry.level {
200 LogLevel::Error => {
201 tracing::error!(target: "plugin", plugin_id = %plugin_id, request_id = %request_id, "{}", entry.message)
202 }
203 LogLevel::Warn => {
204 tracing::warn!(target: "plugin", plugin_id = %plugin_id, request_id = %request_id, "{}", entry.message)
205 }
206 LogLevel::Info | LogLevel::Log => {
207 tracing::info!(target: "plugin", plugin_id = %plugin_id, request_id = %request_id, "{}", entry.message)
208 }
209 LogLevel::Debug => {
210 tracing::debug!(target: "plugin", plugin_id = %plugin_id, request_id = %request_id, "{}", entry.message)
211 }
212 LogLevel::Result => {}
213 }
214 }
215}
216
217#[derive(Debug)]
218pub enum PluginCallResult {
219 Success(PluginCallResponse),
220 Handler(PluginHandlerResponse),
221 Fatal(PluginError),
222}
223
224#[derive(Default)]
225pub struct PluginService<R: PluginRunnerTrait> {
226 runner: R,
227}
228
229impl<R: PluginRunnerTrait> PluginService<R> {
230 pub fn new(runner: R) -> Self {
231 Self { runner }
232 }
233
234 pub fn resolve_plugin_path(plugin_path: &str) -> String {
235 if plugin_path.starts_with("plugins/") {
236 plugin_path.to_string()
237 } else {
238 format!("plugins/{plugin_path}")
239 }
240 }
241
242 #[allow(clippy::type_complexity)]
243 async fn call_plugin<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>(
244 &self,
245 plugin: PluginModel,
246 plugin_call_request: PluginCallRequest,
247 state: Arc<ThinDataAppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>,
248 ) -> PluginCallResult
249 where
250 J: JobProducerTrait + Send + Sync + 'static,
251 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
252 TR: TransactionRepository
253 + Repository<TransactionRepoModel, String>
254 + Send
255 + Sync
256 + 'static,
257 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
258 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
259 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
260 TCR: TransactionCounterTrait + Send + Sync + 'static,
261 PR: PluginRepositoryTrait + Send + Sync + 'static,
262 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
263 {
264 let socket_path = format!("/tmp/{}.sock", Uuid::new_v4());
265 let script_path = Self::resolve_plugin_path(&plugin.path);
266 let script_params = plugin_call_request.params.to_string();
267 let headers_json = plugin_call_request
268 .headers
269 .map(|h| serde_json::to_string(&h).unwrap_or_default());
270 let route = plugin_call_request.route;
271 let config_json = plugin
272 .config
273 .map(|c| serde_json::to_string(&c).unwrap_or_default());
274 let method = plugin_call_request.method;
275 let query_json = plugin_call_request
276 .query
277 .map(|q| serde_json::to_string(&q).unwrap_or_default());
278
279 let request_id = get_request_id();
280 let request_id_for_logs: String = request_id
281 .as_deref()
282 .filter(|id| !id.trim().is_empty())
283 .map(str::to_owned)
284 .unwrap_or_else(|| Uuid::new_v4().to_string());
285 let result = self
286 .runner
287 .run(
288 plugin.id.clone(),
289 &socket_path,
290 script_path,
291 plugin.timeout,
292 script_params,
293 request_id,
294 headers_json,
295 route,
296 config_json,
297 method,
298 query_json,
299 plugin.emit_traces,
300 state,
301 )
302 .await;
303
304 match result {
305 Ok(script_result) => {
306 if plugin.forward_logs {
307 forward_logs_to_tracing(&plugin.id, &script_result.logs, &request_id_for_logs);
308 }
309 let logs = if plugin.emit_logs {
311 Some(script_result.logs)
312 } else {
313 None
314 };
315 let traces = if plugin.emit_traces {
316 Some(script_result.trace)
317 } else {
318 None
319 };
320 let metadata = build_metadata(logs, traces);
321
322 let result = if script_result.return_value.trim() == "undefined" {
324 serde_json::Value::Null
325 } else {
326 serde_json::from_str::<serde_json::Value>(&script_result.return_value)
327 .unwrap_or(serde_json::Value::String(script_result.return_value))
328 };
329
330 PluginCallResult::Success(PluginCallResponse { result, metadata })
331 }
332 Err(e) => match e {
333 PluginError::HandlerError(payload) => {
334 if plugin.forward_logs {
335 if let Some(logs) = payload.logs.as_deref() {
336 forward_logs_to_tracing(&plugin.id, logs, &request_id_for_logs);
337 }
338 }
339 let failure = payload.into_response(plugin.emit_logs, plugin.emit_traces);
340 let has_logs = failure
341 .metadata
342 .as_ref()
343 .and_then(|meta| meta.logs.as_ref())
344 .is_some();
345 let has_traces = failure
346 .metadata
347 .as_ref()
348 .and_then(|meta| meta.traces.as_ref())
349 .is_some();
350
351 tracing::debug!(
352 status = failure.status,
353 message = %failure.message,
354 code = ?failure.error.code.as_ref(),
355 details = ?failure.error.details.as_ref(),
356 has_logs,
357 has_traces,
358 "Plugin handler returned error"
359 );
360
361 PluginCallResult::Handler(failure)
362 }
363 other => {
364 tracing::error!("Plugin execution failed: {:?}", other);
366 PluginCallResult::Fatal(other)
367 }
368 },
369 }
370 }
371}
372
373#[async_trait]
374#[cfg_attr(test, automock)]
375pub trait PluginServiceTrait<J, TR, RR, NR, NFR, SR, TCR, PR, AKR>: Send + Sync
376where
377 J: JobProducerTrait + 'static,
378 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
379 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
380 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
381 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
382 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
383 TCR: TransactionCounterTrait + Send + Sync + 'static,
384 PR: PluginRepositoryTrait + Send + Sync + 'static,
385 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
386{
387 fn new(runner: PluginRunner) -> Self;
388 async fn call_plugin(
389 &self,
390 plugin: PluginModel,
391 plugin_call_request: PluginCallRequest,
392 state: Arc<web::ThinData<AppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>>,
393 ) -> PluginCallResult;
394}
395
396#[async_trait]
397impl<J, TR, RR, NR, NFR, SR, TCR, PR, AKR> PluginServiceTrait<J, TR, RR, NR, NFR, SR, TCR, PR, AKR>
398 for PluginService<PluginRunner>
399where
400 J: JobProducerTrait + 'static,
401 TR: TransactionRepository + Repository<TransactionRepoModel, String> + Send + Sync + 'static,
402 RR: RelayerRepository + Repository<RelayerRepoModel, String> + Send + Sync + 'static,
403 NR: NetworkRepository + Repository<NetworkRepoModel, String> + Send + Sync + 'static,
404 NFR: Repository<NotificationRepoModel, String> + Send + Sync + 'static,
405 SR: Repository<SignerRepoModel, String> + Send + Sync + 'static,
406 TCR: TransactionCounterTrait + Send + Sync + 'static,
407 PR: PluginRepositoryTrait + Send + Sync + 'static,
408 AKR: ApiKeyRepositoryTrait + Send + Sync + 'static,
409{
410 fn new(runner: PluginRunner) -> Self {
411 Self::new(runner)
412 }
413
414 async fn call_plugin(
415 &self,
416 plugin: PluginModel,
417 plugin_call_request: PluginCallRequest,
418 state: Arc<web::ThinData<AppState<J, RR, TR, NR, NFR, SR, TCR, PR, AKR>>>,
419 ) -> PluginCallResult {
420 self.call_plugin(plugin, plugin_call_request, state).await
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use std::{io::Write, time::Duration};
427
428 use crate::{
429 constants::DEFAULT_PLUGIN_TIMEOUT_SECONDS,
430 jobs::MockJobProducerTrait,
431 models::PluginModel,
432 repositories::{
433 ApiKeyRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage,
434 PluginRepositoryStorage, RelayerRepositoryStorage, SignerRepositoryStorage,
435 TransactionCounterRepositoryStorage, TransactionRepositoryStorage,
436 },
437 utils::mocks::mockutils::create_mock_app_state,
438 };
439
440 use super::*;
441
442 #[test]
443 fn test_resolve_plugin_path() {
444 assert_eq!(
445 PluginService::<MockPluginRunnerTrait>::resolve_plugin_path("plugins/examples/test.ts"),
446 "plugins/examples/test.ts"
447 );
448
449 assert_eq!(
450 PluginService::<MockPluginRunnerTrait>::resolve_plugin_path("examples/test.ts"),
451 "plugins/examples/test.ts"
452 );
453
454 assert_eq!(
455 PluginService::<MockPluginRunnerTrait>::resolve_plugin_path("test.ts"),
456 "plugins/test.ts"
457 );
458 }
459
460 #[tokio::test]
461 async fn test_call_plugin() {
462 let plugin = PluginModel {
463 id: "test-plugin".to_string(),
464 path: "test-path".to_string(),
465 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
466 emit_logs: true,
467 emit_traces: false,
468 raw_response: false,
469 allow_get_invocation: false,
470 config: None,
471 forward_logs: false,
472 };
473 let app_state =
474 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
475
476 let mut plugin_runner = MockPluginRunnerTrait::default();
477
478 plugin_runner
479 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
480 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
481 Ok(ScriptResult {
482 logs: vec![LogEntry {
483 level: LogLevel::Log,
484 message: "test-log".to_string(),
485 }],
486 error: "test-error".to_string(),
487 return_value: "test-result".to_string(),
488 trace: Vec::new(),
489 })
490 });
491
492 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
493 let outcome = plugin_service
494 .call_plugin(
495 plugin,
496 PluginCallRequest {
497 params: serde_json::Value::Null,
498 headers: None,
499 route: None,
500 method: Some("POST".to_string()),
501 query: None,
502 },
503 Arc::new(web::ThinData(app_state)),
504 )
505 .await;
506 match outcome {
507 PluginCallResult::Success(result) => {
508 assert_eq!(
510 result.result,
511 serde_json::Value::String("test-result".to_string())
512 );
513 assert!(result.metadata.and_then(|meta| meta.logs).is_some());
515 }
516 PluginCallResult::Handler(_) | PluginCallResult::Fatal(_) => {
517 panic!("expected success outcome")
518 }
519 }
520 }
521
522 #[tokio::test]
523 async fn test_from_plugin_error_to_string() {
524 let error = PluginError::PluginExecutionError("test-error".to_string());
525 let result: String = error.into();
526 assert_eq!(result, "Plugin execution error: test-error");
527 }
528
529 #[test]
530 fn test_plugin_error_with_traces_handler_error() {
531 let payload = PluginHandlerPayload {
532 status: 400,
533 message: "test message".to_string(),
534 code: Some("TEST_CODE".to_string()),
535 details: None,
536 logs: None,
537 traces: Some(vec![serde_json::json!({"trace": "1"})]),
538 };
539 let error = PluginError::HandlerError(Box::new(payload));
540 let new_traces = vec![
541 serde_json::json!({"trace": "2"}),
542 serde_json::json!({"trace": "3"}),
543 ];
544
545 let enriched_error = error.with_traces(new_traces);
546
547 match enriched_error {
548 PluginError::HandlerError(payload) => {
549 let traces = payload.traces.unwrap();
550 assert_eq!(traces.len(), 3);
551 assert_eq!(traces[0], serde_json::json!({"trace": "1"}));
552 assert_eq!(traces[1], serde_json::json!({"trace": "2"}));
553 assert_eq!(traces[2], serde_json::json!({"trace": "3"}));
554 }
555 _ => panic!("Expected HandlerError variant"),
556 }
557 }
558
559 #[test]
560 fn test_plugin_error_with_traces_other_variants() {
561 let error = PluginError::PluginExecutionError("test".to_string());
562 let new_traces = vec![serde_json::json!({"trace": "1"})];
563
564 let result = error.with_traces(new_traces);
565
566 match result {
567 PluginError::PluginExecutionError(msg) => assert_eq!(msg, "test"),
568 _ => panic!("Expected PluginExecutionError variant"),
569 }
570 }
571
572 #[test]
573 fn test_derive_handler_message_with_message() {
574 let result = derive_handler_message("Custom error message", None);
575 assert_eq!(result, "Custom error message");
576 }
577
578 #[test]
579 fn test_derive_handler_message_with_error_log() {
580 let logs = vec![
581 LogEntry {
582 level: LogLevel::Log,
583 message: "info log".to_string(),
584 },
585 LogEntry {
586 level: LogLevel::Error,
587 message: "error log".to_string(),
588 },
589 ];
590 let result = derive_handler_message("", Some(&logs));
591 assert_eq!(result, "error log");
592 }
593
594 #[test]
595 fn test_derive_handler_message_with_warn_log() {
596 let logs = vec![
597 LogEntry {
598 level: LogLevel::Log,
599 message: "info log".to_string(),
600 },
601 LogEntry {
602 level: LogLevel::Warn,
603 message: "warn log".to_string(),
604 },
605 ];
606 let result = derive_handler_message("", Some(&logs));
607 assert_eq!(result, "warn log");
608 }
609
610 #[test]
611 fn test_derive_handler_message_with_only_info_logs() {
612 let logs = vec![
613 LogEntry {
614 level: LogLevel::Log,
615 message: "first log".to_string(),
616 },
617 LogEntry {
618 level: LogLevel::Info,
619 message: "last log".to_string(),
620 },
621 ];
622 let result = derive_handler_message("", Some(&logs));
623 assert_eq!(result, "last log");
624 }
625
626 #[test]
627 fn test_derive_handler_message_no_logs() {
628 let result = derive_handler_message("", None);
629 assert_eq!(result, "Plugin execution failed");
630 }
631
632 #[test]
633 fn test_build_metadata_with_logs_and_traces() {
634 let logs = vec![LogEntry {
635 level: LogLevel::Log,
636 message: "test".to_string(),
637 }];
638 let traces = vec![serde_json::json!({"trace": "1"})];
639
640 let result = build_metadata(Some(logs.clone()), Some(traces.clone()));
641
642 assert!(result.is_some());
643 let metadata = result.unwrap();
644 assert_eq!(metadata.logs.unwrap(), logs);
645 assert_eq!(metadata.traces.unwrap(), traces);
646 }
647
648 #[test]
649 fn test_build_metadata_with_only_logs() {
650 let logs = vec![LogEntry {
651 level: LogLevel::Log,
652 message: "test".to_string(),
653 }];
654
655 let result = build_metadata(Some(logs.clone()), None);
656
657 assert!(result.is_some());
658 let metadata = result.unwrap();
659 assert_eq!(metadata.logs.unwrap(), logs);
660 assert!(metadata.traces.is_none());
661 }
662
663 #[test]
664 fn test_build_metadata_with_only_traces() {
665 let traces = vec![serde_json::json!({"trace": "1"})];
666
667 let result = build_metadata(None, Some(traces.clone()));
668
669 assert!(result.is_some());
670 let metadata = result.unwrap();
671 assert!(metadata.logs.is_none());
672 assert_eq!(metadata.traces.unwrap(), traces);
673 }
674
675 #[test]
676 fn test_build_metadata_with_neither() {
677 let result = build_metadata(None, None);
678 assert!(result.is_none());
679 }
680
681 #[test]
682 fn test_plugin_handler_payload_append_traces_to_existing() {
683 let mut payload = PluginHandlerPayload {
684 status: 400,
685 message: "test".to_string(),
686 code: None,
687 details: None,
688 logs: None,
689 traces: Some(vec![serde_json::json!({"trace": "1"})]),
690 };
691
692 payload.append_traces(vec![serde_json::json!({"trace": "2"})]);
693
694 let traces = payload.traces.unwrap();
695 assert_eq!(traces.len(), 2);
696 assert_eq!(traces[0], serde_json::json!({"trace": "1"}));
697 assert_eq!(traces[1], serde_json::json!({"trace": "2"}));
698 }
699
700 #[test]
701 fn test_plugin_handler_payload_append_traces_to_none() {
702 let mut payload = PluginHandlerPayload {
703 status: 400,
704 message: "test".to_string(),
705 code: None,
706 details: None,
707 logs: None,
708 traces: None,
709 };
710
711 payload.append_traces(vec![serde_json::json!({"trace": "1"})]);
712
713 let traces = payload.traces.unwrap();
714 assert_eq!(traces.len(), 1);
715 assert_eq!(traces[0], serde_json::json!({"trace": "1"}));
716 }
717
718 #[test]
719 fn test_plugin_handler_payload_into_response_with_logs_and_traces() {
720 let logs = vec![LogEntry {
721 level: LogLevel::Error,
722 message: "error message".to_string(),
723 }];
724 let payload = PluginHandlerPayload {
725 status: 400,
726 message: "".to_string(),
727 code: Some("ERR_CODE".to_string()),
728 details: Some(serde_json::json!({"key": "value"})),
729 logs: Some(logs.clone()),
730 traces: Some(vec![serde_json::json!({"trace": "1"})]),
731 };
732
733 let response = payload.into_response(true, true);
734
735 assert_eq!(response.status, 400);
736 assert_eq!(response.message, "error message"); assert_eq!(response.error.code, Some("ERR_CODE".to_string()));
738 assert!(response.metadata.is_some());
739 let metadata = response.metadata.unwrap();
740 assert_eq!(metadata.logs.unwrap(), logs);
741 assert_eq!(metadata.traces.unwrap().len(), 1);
742 }
743
744 #[test]
745 fn test_plugin_handler_payload_into_response_without_logs() {
746 let logs = vec![LogEntry {
747 level: LogLevel::Log,
748 message: "test log".to_string(),
749 }];
750 let payload = PluginHandlerPayload {
751 status: 500,
752 message: "explicit message".to_string(),
753 code: None,
754 details: None,
755 logs: Some(logs),
756 traces: None,
757 };
758
759 let response = payload.into_response(false, false);
760
761 assert_eq!(response.status, 500);
762 assert_eq!(response.message, "explicit message");
763 assert!(response.metadata.is_none()); }
765
766 #[tokio::test]
767 async fn test_call_plugin_handler_error() {
768 let plugin = PluginModel {
769 id: "test-plugin".to_string(),
770 path: "test-path".to_string(),
771 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
772 emit_logs: true,
773 emit_traces: true,
774 raw_response: false,
775 allow_get_invocation: false,
776 config: None,
777 forward_logs: false,
778 };
779 let app_state =
780 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
781
782 let mut plugin_runner = MockPluginRunnerTrait::default();
783
784 plugin_runner
785 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
786 .returning(move |_, _, _, _, _, _, _, _, _, _, _, _, _| {
787 Err(PluginError::HandlerError(Box::new(PluginHandlerPayload {
788 status: 400,
789 message: "Plugin handler error".to_string(),
790 code: Some("VALIDATION_ERROR".to_string()),
791 details: Some(serde_json::json!({"field": "email"})),
792 logs: Some(vec![LogEntry {
793 level: LogLevel::Error,
794 message: "Invalid email".to_string(),
795 }]),
796 traces: Some(vec![serde_json::json!({"step": "validation"})]),
797 })))
798 });
799
800 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
801 let outcome = plugin_service
802 .call_plugin(
803 plugin,
804 PluginCallRequest {
805 params: serde_json::Value::Null,
806 headers: None,
807 route: None,
808 method: Some("POST".to_string()),
809 query: None,
810 },
811 Arc::new(web::ThinData(app_state)),
812 )
813 .await;
814
815 match outcome {
816 PluginCallResult::Handler(response) => {
817 assert_eq!(response.status, 400);
818 assert_eq!(response.error.code, Some("VALIDATION_ERROR".to_string()));
819 assert!(response.metadata.is_some());
820 let metadata = response.metadata.unwrap();
821 assert!(metadata.logs.is_some());
822 assert!(metadata.traces.is_some());
823 }
824 _ => panic!("Expected Handler result"),
825 }
826 }
827
828 #[tokio::test]
829 async fn test_call_plugin_fatal_error() {
830 let plugin = PluginModel {
831 id: "test-plugin".to_string(),
832 path: "test-path".to_string(),
833 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
834 emit_logs: false,
835 emit_traces: false,
836 raw_response: false,
837 allow_get_invocation: false,
838 config: None,
839 forward_logs: false,
840 };
841 let app_state =
842 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
843
844 let mut plugin_runner = MockPluginRunnerTrait::default();
845
846 plugin_runner
847 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
848 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
849 Err(PluginError::PluginExecutionError("Fatal error".to_string()))
850 });
851
852 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
853 let outcome = plugin_service
854 .call_plugin(
855 plugin,
856 PluginCallRequest {
857 params: serde_json::Value::Null,
858 headers: None,
859 route: None,
860 method: Some("POST".to_string()),
861 query: None,
862 },
863 Arc::new(web::ThinData(app_state)),
864 )
865 .await;
866
867 match outcome {
868 PluginCallResult::Fatal(error) => {
869 assert!(matches!(error, PluginError::PluginExecutionError(_)));
870 }
871 _ => panic!("Expected Fatal result"),
872 }
873 }
874
875 #[tokio::test]
876 async fn test_call_plugin_success_with_json_result() {
877 let plugin = PluginModel {
878 id: "test-plugin".to_string(),
879 path: "test-path".to_string(),
880 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
881 emit_logs: true,
882 emit_traces: true,
883 raw_response: false,
884 allow_get_invocation: false,
885 config: None,
886 forward_logs: false,
887 };
888 let app_state =
889 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
890
891 let mut plugin_runner = MockPluginRunnerTrait::default();
892
893 plugin_runner
894 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
895 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
896 Ok(ScriptResult {
897 logs: vec![LogEntry {
898 level: LogLevel::Log,
899 message: "test-log".to_string(),
900 }],
901 error: "".to_string(),
902 return_value: r#"{"result": "success"}"#.to_string(),
903 trace: vec![serde_json::json!({"step": "1"})],
904 })
905 });
906
907 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
908 let outcome = plugin_service
909 .call_plugin(
910 plugin,
911 PluginCallRequest {
912 params: serde_json::Value::Null,
913 headers: None,
914 route: None,
915 method: Some("POST".to_string()),
916 query: None,
917 },
918 Arc::new(web::ThinData(app_state)),
919 )
920 .await;
921
922 match outcome {
923 PluginCallResult::Success(result) => {
924 assert_eq!(result.result, serde_json::json!({"result": "success"}));
926 assert!(result.metadata.is_some());
927 let metadata = result.metadata.unwrap();
928 assert!(metadata.logs.is_some());
929 assert!(metadata.traces.is_some());
930 }
931 _ => panic!("Expected Success result"),
932 }
933 }
934
935 #[derive(Clone)]
936 struct VecWriter {
937 buffer: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
938 }
939
940 impl Write for VecWriter {
941 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
942 let mut buffer = self.buffer.lock().unwrap();
943 buffer.push(String::from_utf8_lossy(buf).to_string());
944 Ok(buf.len())
945 }
946
947 fn flush(&mut self) -> std::io::Result<()> {
948 Ok(())
949 }
950 }
951
952 fn init_capturing_subscriber(
953 buffer: std::sync::Arc<std::sync::Mutex<Vec<String>>>,
954 ) -> tracing::subscriber::DefaultGuard {
955 use tracing_subscriber::filter::LevelFilter;
956 let writer = VecWriter { buffer };
957 let subscriber = tracing_subscriber::fmt()
958 .with_writer(move || writer.clone())
959 .with_ansi(false)
960 .with_target(true)
961 .without_time()
962 .with_max_level(LevelFilter::DEBUG)
963 .finish();
964 tracing::subscriber::set_default(subscriber)
965 }
966
967 #[tokio::test]
968 async fn test_forward_logs_to_tracing_when_enabled_all_levels() {
969 use std::sync::{Arc as StdArc, Mutex};
970
971 let logs_buffer: StdArc<Mutex<Vec<String>>> = StdArc::new(Mutex::new(Vec::new()));
972 let _guard = init_capturing_subscriber(logs_buffer.clone());
973
974 let plugin = PluginModel {
975 id: "test-plugin-levels".to_string(),
976 path: "test-path".to_string(),
977 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
978 emit_logs: false,
979 emit_traces: false,
980 forward_logs: true,
981 raw_response: false,
982 allow_get_invocation: false,
983 config: None,
984 };
985 let app_state =
986 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
987
988 let mut plugin_runner = MockPluginRunnerTrait::default();
989
990 plugin_runner
991 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
992 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
993 Ok(ScriptResult {
994 logs: vec![
995 LogEntry {
996 level: LogLevel::Error,
997 message: "err-log".to_string(),
998 },
999 LogEntry {
1000 level: LogLevel::Warn,
1001 message: "warn-log".to_string(),
1002 },
1003 LogEntry {
1004 level: LogLevel::Info,
1005 message: "info-log".to_string(),
1006 },
1007 LogEntry {
1008 level: LogLevel::Log,
1009 message: "log-log".to_string(),
1010 },
1011 LogEntry {
1012 level: LogLevel::Debug,
1013 message: "debug-log".to_string(),
1014 },
1015 LogEntry {
1016 level: LogLevel::Result,
1017 message: "result-log".to_string(),
1018 },
1019 ],
1020 error: "".to_string(),
1021 return_value: "{}".to_string(),
1022 trace: vec![],
1023 })
1024 });
1025
1026 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1027 let _ = plugin_service
1028 .call_plugin(
1029 plugin,
1030 PluginCallRequest {
1031 params: serde_json::json!({}),
1032 headers: None,
1033 route: None,
1034 method: Some("POST".to_string()),
1035 query: None,
1036 },
1037 Arc::new(web::ThinData(app_state)),
1038 )
1039 .await;
1040
1041 let captured = logs_buffer.lock().unwrap().join("\n");
1042
1043 assert!(captured.contains("err-log"));
1044 assert!(captured.contains("warn-log"));
1045 assert!(captured.contains("info-log"));
1046 assert!(captured.contains("log-log"));
1047 assert!(captured.contains("debug-log"));
1048 assert!(!captured.contains("result-log"));
1049 assert!(captured.contains("plugin_id=test-plugin-levels"));
1050 assert!(captured.contains("ERROR"));
1051 assert!(captured.contains("WARN"));
1052 let request_id_values: Vec<&str> = captured
1053 .match_indices("request_id=")
1054 .filter_map(|(idx, _)| {
1055 let tail = &captured[idx + "request_id=".len()..];
1056 tail.split_whitespace()
1057 .next()
1058 .map(|value| value.trim_matches(|c: char| c == ',' || c == '"' || c == '}'))
1059 })
1060 .collect();
1061 assert!(
1062 !request_id_values.is_empty(),
1063 "expected forwarded plugin logs to include request_id field, captured: {}",
1064 captured
1065 );
1066 assert!(
1067 request_id_values.iter().all(|value| !value.is_empty()),
1068 "expected non-empty request_id values, captured: {}",
1069 captured
1070 );
1071 assert!(
1072 request_id_values
1073 .iter()
1074 .all(|value| uuid::Uuid::parse_str(value).is_ok()),
1075 "expected request_id fallback to be UUID when no span request id is set, captured: {}",
1076 captured
1077 );
1078 }
1079
1080 #[tokio::test]
1081 async fn test_forward_logs_not_emitted_when_disabled() {
1082 use std::sync::{Arc as StdArc, Mutex};
1083
1084 let logs_buffer: StdArc<Mutex<Vec<String>>> = StdArc::new(Mutex::new(Vec::new()));
1085 let _guard = init_capturing_subscriber(logs_buffer.clone());
1086
1087 let plugin = PluginModel {
1088 id: "test-plugin-disabled".to_string(),
1089 path: "test-path".to_string(),
1090 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1091 emit_logs: false,
1092 emit_traces: false,
1093 forward_logs: false,
1094 raw_response: false,
1095 allow_get_invocation: false,
1096 config: None,
1097 };
1098 let app_state =
1099 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1100
1101 let mut plugin_runner = MockPluginRunnerTrait::default();
1102 plugin_runner
1103 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1104 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
1105 Ok(ScriptResult {
1106 logs: vec![LogEntry {
1107 level: LogLevel::Warn,
1108 message: "should-not-emit".to_string(),
1109 }],
1110 error: "".to_string(),
1111 return_value: "{}".to_string(),
1112 trace: vec![],
1113 })
1114 });
1115
1116 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1117 let _ = plugin_service
1118 .call_plugin(
1119 plugin,
1120 PluginCallRequest {
1121 params: serde_json::json!({}),
1122 headers: None,
1123 route: None,
1124 method: Some("POST".to_string()),
1125 query: None,
1126 },
1127 Arc::new(web::ThinData(app_state)),
1128 )
1129 .await;
1130
1131 let captured = logs_buffer.lock().unwrap().join("\n");
1132 assert!(
1135 !captured.contains("should-not-emit"),
1136 "plugin logs should not be forwarded when disabled, but found: {captured}"
1137 );
1138 }
1139
1140 #[tokio::test]
1141 async fn test_forward_logs_on_handler_error() {
1142 use std::sync::{Arc as StdArc, Mutex};
1143
1144 let logs_buffer: StdArc<Mutex<Vec<String>>> = StdArc::new(Mutex::new(Vec::new()));
1145 let _guard = init_capturing_subscriber(logs_buffer.clone());
1146
1147 let plugin = PluginModel {
1148 id: "test-plugin-error".to_string(),
1149 path: "test-path".to_string(),
1150 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1151 emit_logs: true,
1152 emit_traces: false,
1153 forward_logs: true,
1154 raw_response: false,
1155 allow_get_invocation: false,
1156 config: None,
1157 };
1158 let app_state =
1159 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1160
1161 let mut plugin_runner = MockPluginRunnerTrait::default();
1162 plugin_runner
1163 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1164 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
1165 Err(PluginError::HandlerError(Box::new(PluginHandlerPayload {
1166 status: 400,
1167 message: "handler failed".to_string(),
1168 code: None,
1169 details: None,
1170 logs: Some(vec![LogEntry {
1171 level: LogLevel::Error,
1172 message: "handler-log".to_string(),
1173 }]),
1174 traces: None,
1175 })))
1176 });
1177
1178 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1179 let _ = plugin_service
1180 .call_plugin(
1181 plugin,
1182 PluginCallRequest {
1183 params: serde_json::json!({}),
1184 headers: None,
1185 route: None,
1186 method: Some("POST".to_string()),
1187 query: None,
1188 },
1189 Arc::new(web::ThinData(app_state)),
1190 )
1191 .await;
1192
1193 let captured = logs_buffer.lock().unwrap().join("\n");
1194 assert!(captured.contains("handler-log"));
1195 assert!(captured.contains("plugin_id=test-plugin-error"));
1196 assert!(captured.contains("ERROR"));
1197 }
1198
1199 #[tokio::test]
1200 async fn test_call_plugin_success_with_undefined_result() {
1201 let plugin = PluginModel {
1202 id: "test-plugin".to_string(),
1203 path: "test-path".to_string(),
1204 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1205 emit_logs: false,
1206 emit_traces: false,
1207 raw_response: false,
1208 allow_get_invocation: false,
1209 config: None,
1210 forward_logs: false,
1211 };
1212 let app_state =
1213 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1214
1215 let mut plugin_runner = MockPluginRunnerTrait::default();
1216
1217 plugin_runner
1218 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1219 .returning(|_, _, _, _, _, _, _, _, _, _, _, _, _| {
1220 Ok(ScriptResult {
1221 logs: vec![],
1222 error: "".to_string(),
1223 return_value: "undefined".to_string(),
1224 trace: vec![],
1225 })
1226 });
1227
1228 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1229 let outcome = plugin_service
1230 .call_plugin(
1231 plugin,
1232 PluginCallRequest {
1233 params: serde_json::Value::Null,
1234 headers: None,
1235 route: None,
1236 method: Some("POST".to_string()),
1237 query: None,
1238 },
1239 Arc::new(web::ThinData(app_state)),
1240 )
1241 .await;
1242
1243 match outcome {
1244 PluginCallResult::Success(result) => {
1245 assert_eq!(result.result, serde_json::Value::Null);
1247 assert!(result.metadata.is_none());
1249 }
1250 _ => panic!("Expected Success result"),
1251 }
1252 }
1253
1254 #[tokio::test]
1255 async fn test_call_plugin_with_headers() {
1256 use std::sync::{Arc as StdArc, Mutex};
1257
1258 let plugin = PluginModel {
1259 id: "test-plugin".to_string(),
1260 path: "test-path".to_string(),
1261 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1262 emit_logs: false,
1263 emit_traces: false,
1264 raw_response: false,
1265 allow_get_invocation: false,
1266 config: None,
1267 forward_logs: false,
1268 };
1269 let app_state =
1270 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1271
1272 let captured_headers: StdArc<Mutex<Option<String>>> = StdArc::new(Mutex::new(None));
1274 let captured_headers_clone = captured_headers.clone();
1275
1276 let mut plugin_runner = MockPluginRunnerTrait::default();
1277
1278 plugin_runner
1279 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1280 .returning(move |_, _, _, _, _, _, headers_json, _, _, _, _, _, _| {
1281 *captured_headers_clone.lock().unwrap() = headers_json;
1283 Ok(ScriptResult {
1284 logs: vec![],
1285 error: "".to_string(),
1286 return_value: "{}".to_string(),
1287 trace: vec![],
1288 })
1289 });
1290
1291 let mut headers_map = std::collections::HashMap::new();
1293 headers_map.insert(
1294 "x-custom-header".to_string(),
1295 vec!["custom-value".to_string()],
1296 );
1297 headers_map.insert(
1298 "authorization".to_string(),
1299 vec!["Bearer token123".to_string()],
1300 );
1301
1302 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1303 let _outcome = plugin_service
1304 .call_plugin(
1305 plugin,
1306 PluginCallRequest {
1307 params: serde_json::json!({"test": "data"}),
1308 headers: Some(headers_map.clone()),
1309 route: None,
1310 method: Some("POST".to_string()),
1311 query: None,
1312 },
1313 Arc::new(web::ThinData(app_state)),
1314 )
1315 .await;
1316
1317 let captured = captured_headers.lock().unwrap();
1319 assert!(
1320 captured.is_some(),
1321 "headers_json should be passed to runner"
1322 );
1323
1324 let headers_json = captured.as_ref().unwrap();
1325 let parsed: std::collections::HashMap<String, Vec<String>> =
1326 serde_json::from_str(headers_json).expect("headers_json should be valid JSON");
1327
1328 assert_eq!(
1329 parsed.get("x-custom-header"),
1330 Some(&vec!["custom-value".to_string()])
1331 );
1332 assert_eq!(
1333 parsed.get("authorization"),
1334 Some(&vec!["Bearer token123".to_string()])
1335 );
1336 }
1337
1338 #[tokio::test]
1339 async fn test_call_plugin_without_headers() {
1340 use std::sync::{Arc as StdArc, Mutex};
1341
1342 let plugin = PluginModel {
1343 id: "test-plugin".to_string(),
1344 path: "test-path".to_string(),
1345 timeout: Duration::from_secs(DEFAULT_PLUGIN_TIMEOUT_SECONDS),
1346 emit_logs: false,
1347 emit_traces: false,
1348 raw_response: false,
1349 allow_get_invocation: false,
1350 config: None,
1351 forward_logs: false,
1352 };
1353 let app_state =
1354 create_mock_app_state(None, None, None, None, Some(vec![plugin.clone()]), None).await;
1355
1356 let captured_headers: StdArc<Mutex<Option<String>>> = StdArc::new(Mutex::new(None));
1357 let captured_headers_clone = captured_headers.clone();
1358
1359 let mut plugin_runner = MockPluginRunnerTrait::default();
1360
1361 plugin_runner
1362 .expect_run::<MockJobProducerTrait, RelayerRepositoryStorage, TransactionRepositoryStorage, NetworkRepositoryStorage, NotificationRepositoryStorage, SignerRepositoryStorage, TransactionCounterRepositoryStorage, PluginRepositoryStorage, ApiKeyRepositoryStorage>()
1363 .returning(move |_, _, _, _, _, _, headers_json, _, _, _, _, _, _| {
1364 *captured_headers_clone.lock().unwrap() = headers_json;
1365 Ok(ScriptResult {
1366 logs: vec![],
1367 error: "".to_string(),
1368 return_value: "{}".to_string(),
1369 trace: vec![],
1370 })
1371 });
1372
1373 let plugin_service = PluginService::<MockPluginRunnerTrait>::new(plugin_runner);
1374 let _outcome = plugin_service
1375 .call_plugin(
1376 plugin,
1377 PluginCallRequest {
1378 params: serde_json::json!({}),
1379 headers: None, route: None,
1381 method: Some("POST".to_string()),
1382 query: None,
1383 },
1384 Arc::new(web::ThinData(app_state)),
1385 )
1386 .await;
1387
1388 let captured = captured_headers.lock().unwrap();
1390 assert!(
1391 captured.is_none(),
1392 "headers_json should be None when no headers provided"
1393 );
1394 }
1395}