1use crate::constants::{
23 CONCURRENT_TASKS_HEADROOM_MULTIPLIER, DEFAULT_POOL_CONCURRENT_TASKS_PER_WORKER,
24 DEFAULT_POOL_CONNECT_RETRIES, DEFAULT_POOL_HEALTH_CHECK_INTERVAL_SECS,
25 DEFAULT_POOL_IDLE_TIMEOUT_MS, DEFAULT_POOL_MAX_CONNECTIONS, DEFAULT_POOL_MAX_THREADS_FLOOR,
26 DEFAULT_POOL_MIN_THREADS, DEFAULT_POOL_QUEUE_SEND_TIMEOUT_MS,
27 DEFAULT_POOL_REQUEST_TIMEOUT_SECS, DEFAULT_POOL_SOCKET_BACKLOG,
28 DEFAULT_SOCKET_IDLE_TIMEOUT_SECS, DEFAULT_SOCKET_READ_TIMEOUT_SECS, DEFAULT_TRACE_TIMEOUT_MS,
29 MAX_CONCURRENT_TASKS_PER_WORKER,
30};
31use std::sync::OnceLock;
32
33static CONFIG: OnceLock<PluginConfig> = OnceLock::new();
35
36#[derive(Debug, Clone)]
38pub struct PluginConfig {
39 pub max_concurrency: usize,
42
43 pub pool_max_connections: usize,
46 pub pool_connect_retries: usize,
48 pub pool_request_timeout_secs: u64,
50
51 pub pool_max_queue_size: usize,
54 pub pool_queue_send_timeout_ms: u64,
56 pub pool_workers: usize,
58
59 pub socket_max_connections: usize,
62 pub socket_idle_timeout_secs: u64,
64 pub socket_read_timeout_secs: u64,
66
67 pub nodejs_pool_min_threads: usize,
70 pub nodejs_pool_max_threads: usize,
72 pub nodejs_pool_concurrent_tasks: usize,
74 pub nodejs_pool_idle_timeout_ms: u64,
76 pub nodejs_worker_heap_mb: usize,
78
79 pub pool_socket_backlog: usize,
82
83 pub health_check_interval_secs: u64,
86 pub trace_timeout_ms: u64,
88}
89
90impl PluginConfig {
91 pub fn from_env() -> Self {
93 let max_concurrency = env_parse("PLUGIN_MAX_CONCURRENCY", DEFAULT_POOL_MAX_CONNECTIONS);
96
97 let pool_max_connections = env_parse("PLUGIN_POOL_MAX_CONNECTIONS", max_concurrency);
101
102 let socket_max_connections = env_parse(
104 "PLUGIN_SOCKET_MAX_CONCURRENT_CONNECTIONS",
105 (max_concurrency as f64 * 1.5) as usize,
106 );
107
108 let pool_max_queue_size = env_parse("PLUGIN_POOL_MAX_QUEUE_SIZE", max_concurrency * 2);
110
111 let cpu_count = std::thread::available_parallelism()
114 .map(|n| n.get())
115 .unwrap_or(4);
116
117 let estimated_memory_budget = 16384_u64 / 2; let estimated_memory_threads = (estimated_memory_budget / 1024).max(4) as usize;
121 let estimated_concurrency_threads = (max_concurrency / 200).max(cpu_count);
122 let estimated_max_threads = estimated_memory_threads
123 .min(estimated_concurrency_threads)
124 .clamp(DEFAULT_POOL_MAX_THREADS_FLOOR, 32); let base_queue_timeout = DEFAULT_POOL_QUEUE_SEND_TIMEOUT_MS;
130 let workload_per_thread = max_concurrency / estimated_max_threads.max(1);
131 let derived_queue_timeout = if workload_per_thread > 100 {
132 base_queue_timeout * 2 } else if workload_per_thread > 50 {
135 base_queue_timeout + 250 } else {
138 base_queue_timeout };
141 let pool_queue_send_timeout_ms =
142 env_parse("PLUGIN_POOL_QUEUE_SEND_TIMEOUT_MS", derived_queue_timeout);
143
144 let pool_connect_retries =
146 env_parse("PLUGIN_POOL_CONNECT_RETRIES", DEFAULT_POOL_CONNECT_RETRIES);
147 let pool_request_timeout_secs = env_parse(
148 "PLUGIN_POOL_REQUEST_TIMEOUT_SECS",
149 DEFAULT_POOL_REQUEST_TIMEOUT_SECS,
150 );
151 let pool_workers = env_parse("PLUGIN_POOL_WORKERS", 0); let socket_idle_timeout_secs = env_parse(
154 "PLUGIN_SOCKET_IDLE_TIMEOUT_SECS",
155 DEFAULT_SOCKET_IDLE_TIMEOUT_SECS,
156 );
157 let socket_read_timeout_secs = env_parse(
158 "PLUGIN_SOCKET_READ_TIMEOUT_SECS",
159 DEFAULT_SOCKET_READ_TIMEOUT_SECS,
160 );
161
162 let health_check_interval_secs = env_parse(
163 "PLUGIN_POOL_HEALTH_CHECK_INTERVAL_SECS",
164 DEFAULT_POOL_HEALTH_CHECK_INTERVAL_SECS,
165 );
166 let trace_timeout_ms = env_parse("PLUGIN_TRACE_TIMEOUT_MS", DEFAULT_TRACE_TIMEOUT_MS);
167
168 let derived_min_threads = DEFAULT_POOL_MIN_THREADS.max(cpu_count / 2);
174 let nodejs_pool_min_threads = env_parse("PLUGIN_POOL_MIN_THREADS", derived_min_threads);
175
176 let total_memory_mb = {
196 #[cfg(target_os = "macos")]
197 {
198 use std::process::Command;
200 Command::new("sysctl")
201 .args(["-n", "hw.memsize"])
202 .output()
203 .ok()
204 .and_then(|o| String::from_utf8(o.stdout).ok())
205 .and_then(|s| s.trim().parse::<u64>().ok())
206 .map(|bytes| bytes / 1024 / 1024)
207 .unwrap_or(16384) }
209 #[cfg(target_os = "linux")]
210 {
211 std::fs::read_to_string("/proc/meminfo")
213 .ok()
214 .and_then(|contents| {
215 contents
216 .lines()
217 .find(|l| l.starts_with("MemTotal:"))
218 .and_then(|l| {
219 l.split_whitespace()
220 .nth(1)
221 .and_then(|s| s.parse::<u64>().ok())
222 })
223 })
224 .map(|kb| kb / 1024)
225 .unwrap_or(16384) }
227 #[cfg(not(any(target_os = "macos", target_os = "linux")))]
228 {
229 16384_u64 }
231 };
232
233 let memory_budget_mb = total_memory_mb / 2;
237 let heap_per_worker_mb = 1024_u64; let memory_based_max_threads = (memory_budget_mb / heap_per_worker_mb).max(4) as usize;
239
240 let concurrency_based_threads = (max_concurrency / 200).max(cpu_count);
244
245 let derived_max_threads = memory_based_max_threads
248 .min(concurrency_based_threads)
249 .clamp(DEFAULT_POOL_MAX_THREADS_FLOOR, 32); tracing::debug!(
252 total_memory_mb = total_memory_mb,
253 memory_based_max = memory_based_max_threads,
254 concurrency_based = concurrency_based_threads,
255 derived_max_threads = derived_max_threads,
256 "Thread scaling calculation"
257 );
258
259 let nodejs_pool_max_threads = env_parse("PLUGIN_POOL_MAX_THREADS", derived_max_threads);
260
261 let base_tasks = max_concurrency / nodejs_pool_max_threads.max(1);
271 let derived_concurrent_tasks =
272 ((base_tasks as f64 * CONCURRENT_TASKS_HEADROOM_MULTIPLIER) as usize).clamp(
273 DEFAULT_POOL_CONCURRENT_TASKS_PER_WORKER,
274 MAX_CONCURRENT_TASKS_PER_WORKER,
275 );
276 let nodejs_pool_concurrent_tasks =
277 env_parse("PLUGIN_POOL_CONCURRENT_TASKS", derived_concurrent_tasks);
278
279 let nodejs_pool_idle_timeout_ms =
280 env_parse("PLUGIN_POOL_IDLE_TIMEOUT", DEFAULT_POOL_IDLE_TIMEOUT_MS);
281
282 let base_worker_heap = 512_usize;
291 let heap_per_task = 5_usize;
292 let derived_worker_heap_mb =
293 (base_worker_heap + (nodejs_pool_concurrent_tasks * heap_per_task)).clamp(1024, 2048); let nodejs_worker_heap_mb = env_parse("PLUGIN_WORKER_HEAP_MB", derived_worker_heap_mb);
295
296 let default_backlog = DEFAULT_POOL_SOCKET_BACKLOG as usize;
304 let pool_socket_backlog = env_parse(
305 "PLUGIN_POOL_SOCKET_BACKLOG",
306 max_concurrency.max(default_backlog),
307 );
308
309 let config = Self {
310 max_concurrency,
311 pool_max_connections,
312 pool_connect_retries,
313 pool_request_timeout_secs,
314 pool_max_queue_size,
315 pool_queue_send_timeout_ms,
316 pool_workers,
317 socket_max_connections,
318 socket_idle_timeout_secs,
319 socket_read_timeout_secs,
320 nodejs_pool_min_threads,
321 nodejs_pool_max_threads,
322 nodejs_pool_concurrent_tasks,
323 nodejs_pool_idle_timeout_ms,
324 nodejs_worker_heap_mb,
325 pool_socket_backlog,
326 health_check_interval_secs,
327 trace_timeout_ms,
328 };
329
330 config.validate();
332
333 config
334 }
335
336 fn validate(&self) {
338 assert!(
340 self.pool_max_connections <= self.socket_max_connections,
341 "pool_max_connections ({}) must be <= socket_max_connections ({})",
342 self.pool_max_connections,
343 self.socket_max_connections
344 );
345 assert!(
346 self.nodejs_pool_min_threads <= self.nodejs_pool_max_threads,
347 "nodejs_pool_min_threads ({}) must be <= nodejs_pool_max_threads ({})",
348 self.nodejs_pool_min_threads,
349 self.nodejs_pool_max_threads
350 );
351 assert!(
352 self.max_concurrency > 0,
353 "max_concurrency must be > 0, got {}",
354 self.max_concurrency
355 );
356 assert!(
357 self.nodejs_pool_max_threads > 0,
358 "nodejs_pool_max_threads must be > 0, got {}",
359 self.nodejs_pool_max_threads
360 );
361
362 if self.pool_max_queue_size < self.max_concurrency {
364 tracing::warn!(
365 "pool_max_queue_size ({}) is less than max_concurrency ({}). \
366 This may cause request rejections under load.",
367 self.pool_max_queue_size,
368 self.max_concurrency
369 );
370 }
371 if self.nodejs_pool_concurrent_tasks > 500 {
372 tracing::warn!(
373 "nodejs_pool_concurrent_tasks ({}) is very high. \
374 This may cause excessive memory usage per worker.",
375 self.nodejs_pool_concurrent_tasks
376 );
377 }
378 }
379
380 pub fn log_config(&self) {
382 let tasks_per_thread = self.max_concurrency / self.nodejs_pool_max_threads.max(1);
383 let socket_ratio = self.socket_max_connections as f64 / self.max_concurrency as f64;
384 let queue_ratio = self.pool_max_queue_size as f64 / self.max_concurrency as f64;
385 let total_worker_heap_mb = self.nodejs_pool_max_threads * self.nodejs_worker_heap_mb;
386
387 tracing::info!(
388 max_concurrency = self.max_concurrency,
389 pool_max_connections = self.pool_max_connections,
390 pool_max_queue_size = self.pool_max_queue_size,
391 queue_timeout_ms = self.pool_queue_send_timeout_ms,
392 socket_max_connections = self.socket_max_connections,
393 socket_backlog = self.pool_socket_backlog,
394 nodejs_min_threads = self.nodejs_pool_min_threads,
395 nodejs_max_threads = self.nodejs_pool_max_threads,
396 nodejs_concurrent_tasks = self.nodejs_pool_concurrent_tasks,
397 nodejs_worker_heap_mb = self.nodejs_worker_heap_mb,
398 total_worker_heap_mb = total_worker_heap_mb,
399 tasks_per_thread = tasks_per_thread,
400 socket_multiplier = %format!("{:.2}x", socket_ratio),
401 queue_multiplier = %format!("{:.2}x", queue_ratio),
402 "Plugin configuration loaded (Rust + Node.js)"
403 );
404 }
405}
406
407impl Default for PluginConfig {
408 fn default() -> Self {
412 let max_concurrency = DEFAULT_POOL_MAX_CONNECTIONS;
415 let cpu_count = std::thread::available_parallelism()
416 .map(|n| n.get())
417 .unwrap_or(4);
418
419 let pool_max_connections = max_concurrency;
421 let socket_max_connections = (max_concurrency as f64 * 1.5) as usize;
422 let pool_max_queue_size = max_concurrency * 2;
423
424 let assumed_memory_mb = 16384_u64;
427 let memory_budget_mb = assumed_memory_mb / 2;
428 let heap_per_worker_mb = 1024_u64; let memory_based_max_threads = (memory_budget_mb / heap_per_worker_mb).max(4) as usize;
430 let concurrency_based_threads = (max_concurrency / 200).max(cpu_count);
431
432 let nodejs_pool_max_threads = memory_based_max_threads
433 .min(concurrency_based_threads)
434 .clamp(DEFAULT_POOL_MAX_THREADS_FLOOR, 32);
435 let nodejs_pool_min_threads = DEFAULT_POOL_MIN_THREADS.max(cpu_count / 2);
436
437 let base_tasks = max_concurrency / nodejs_pool_max_threads.max(1);
438 let nodejs_pool_concurrent_tasks =
439 ((base_tasks as f64 * CONCURRENT_TASKS_HEADROOM_MULTIPLIER) as usize).clamp(
440 DEFAULT_POOL_CONCURRENT_TASKS_PER_WORKER,
441 MAX_CONCURRENT_TASKS_PER_WORKER,
442 );
443
444 let base_worker_heap = 512_usize;
446 let heap_per_task = 5_usize;
447 let nodejs_worker_heap_mb =
448 (base_worker_heap + (nodejs_pool_concurrent_tasks * heap_per_task)).clamp(1024, 2048);
449
450 let default_backlog = DEFAULT_POOL_SOCKET_BACKLOG as usize;
451 let pool_socket_backlog = max_concurrency.max(default_backlog);
452
453 Self {
454 max_concurrency,
455 pool_max_connections,
456 pool_connect_retries: DEFAULT_POOL_CONNECT_RETRIES,
457 pool_request_timeout_secs: DEFAULT_POOL_REQUEST_TIMEOUT_SECS,
458 pool_max_queue_size,
459 pool_queue_send_timeout_ms: DEFAULT_POOL_QUEUE_SEND_TIMEOUT_MS,
460 pool_workers: 0,
461 socket_max_connections,
462 socket_idle_timeout_secs: DEFAULT_SOCKET_IDLE_TIMEOUT_SECS,
463 socket_read_timeout_secs: DEFAULT_SOCKET_READ_TIMEOUT_SECS,
464 nodejs_pool_min_threads,
465 nodejs_pool_max_threads,
466 nodejs_pool_concurrent_tasks,
467 nodejs_pool_idle_timeout_ms: DEFAULT_POOL_IDLE_TIMEOUT_MS,
468 nodejs_worker_heap_mb,
469 pool_socket_backlog,
470 health_check_interval_secs: DEFAULT_POOL_HEALTH_CHECK_INTERVAL_SECS,
471 trace_timeout_ms: DEFAULT_TRACE_TIMEOUT_MS,
472 }
473 }
474}
475
476pub fn get_config() -> &'static PluginConfig {
478 CONFIG.get_or_init(|| {
479 let config = PluginConfig::from_env();
480 config.log_config();
481 config
482 })
483}
484
485fn env_parse<T: std::str::FromStr>(name: &str, default: T) -> T {
487 std::env::var(name)
488 .ok()
489 .and_then(|s| s.parse().ok())
490 .unwrap_or(default)
491}
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496
497 #[test]
498 fn test_default_config() {
499 let config = PluginConfig::default();
500 assert_eq!(config.max_concurrency, DEFAULT_POOL_MAX_CONNECTIONS);
501 assert_eq!(config.pool_max_connections, DEFAULT_POOL_MAX_CONNECTIONS);
502 assert_eq!(config.pool_max_queue_size, config.max_concurrency * 2);
504 assert!(
505 config.socket_max_connections >= config.pool_max_connections,
506 "socket connections should be >= pool connections"
507 );
508 }
509
510 #[test]
511 fn test_auto_derivation_ratios() {
512 let config = PluginConfig {
514 max_concurrency: 1000,
515 pool_max_connections: 1000,
516 socket_max_connections: 1500, pool_max_queue_size: 2000, ..Default::default()
519 };
520
521 assert_eq!(
522 config.socket_max_connections,
523 config.max_concurrency * 3 / 2
524 );
525 assert_eq!(config.pool_max_queue_size, config.max_concurrency * 2);
526 }
527
528 #[test]
529 fn test_very_low_concurrency() {
530 let max_concurrency = 10;
534 let cpu_count = std::thread::available_parallelism()
535 .map(|n| n.get())
536 .unwrap_or(4);
537
538 let pool_max_connections = max_concurrency;
539 let socket_max_connections = (max_concurrency as f64 * 1.5) as usize;
540 let pool_max_queue_size = max_concurrency * 2;
541
542 let memory_budget_mb = 16384 / 2;
544 let memory_based_max = (memory_budget_mb / 1024).max(4);
545 let concurrency_based = (max_concurrency / 200).max(cpu_count);
546 let nodejs_pool_max_threads = memory_based_max
547 .min(concurrency_based)
548 .max(DEFAULT_POOL_MAX_THREADS_FLOOR)
549 .min(32);
550
551 assert_eq!(pool_max_connections, 10);
552 assert_eq!(socket_max_connections, 15); assert_eq!(pool_max_queue_size, 20); assert!(nodejs_pool_max_threads >= DEFAULT_POOL_MAX_THREADS_FLOOR);
557 }
558
559 #[test]
560 fn test_medium_concurrency() {
561 let max_concurrency = 1000;
563 let cpu_count = std::thread::available_parallelism()
564 .map(|n| n.get())
565 .unwrap_or(4);
566
567 let socket_max_connections = (max_concurrency as f64 * 1.5) as usize;
568 let pool_max_queue_size = max_concurrency * 2;
569
570 let memory_budget_mb = 16384 / 2;
572 let memory_based_max = (memory_budget_mb / 1024).max(4);
573 let concurrency_based = (max_concurrency / 200).max(cpu_count);
574 let nodejs_pool_max_threads = memory_based_max
575 .min(concurrency_based)
576 .max(DEFAULT_POOL_MAX_THREADS_FLOOR)
577 .min(32);
578
579 assert_eq!(socket_max_connections, 1500); assert_eq!(pool_max_queue_size, 2000); assert!(nodejs_pool_max_threads <= 16);
586 }
587
588 #[test]
589 fn test_high_concurrency() {
590 let max_concurrency = 10000;
593
594 let socket_max_connections = (max_concurrency as f64 * 1.5) as usize;
595 let pool_max_queue_size = max_concurrency * 2;
596
597 let cpu_count = std::thread::available_parallelism()
598 .map(|n| n.get())
599 .unwrap_or(4);
600
601 let memory_budget_mb = 16384 / 2;
603 let memory_based_max = (memory_budget_mb / 1024).max(4);
604 let concurrency_based = (max_concurrency / 200).max(cpu_count);
605 let nodejs_pool_max_threads = memory_based_max
606 .min(concurrency_based)
607 .max(DEFAULT_POOL_MAX_THREADS_FLOOR)
608 .min(32);
609
610 assert_eq!(socket_max_connections, 15000); assert_eq!(pool_max_queue_size, 20000); assert!(nodejs_pool_max_threads <= 32);
616
617 let base_tasks = max_concurrency / nodejs_pool_max_threads;
619 let derived_concurrent_tasks = ((base_tasks as f64 * CONCURRENT_TASKS_HEADROOM_MULTIPLIER)
620 as usize)
621 .max(DEFAULT_POOL_CONCURRENT_TASKS_PER_WORKER)
622 .min(MAX_CONCURRENT_TASKS_PER_WORKER);
623 assert!(derived_concurrent_tasks <= MAX_CONCURRENT_TASKS_PER_WORKER);
625 }
626
627 #[test]
628 fn test_validation_catches_invalid_config() {
629 let mut config = PluginConfig::default();
630
631 config.pool_max_connections = 1000;
633 config.socket_max_connections = 500;
634
635 let result = std::panic::catch_unwind(|| {
636 config.validate();
637 });
638 assert!(
639 result.is_err(),
640 "Should panic on invalid pool > socket connections"
641 );
642 }
643
644 #[test]
645 fn test_validation_catches_invalid_threads() {
646 let mut config = PluginConfig::default();
647
648 config.nodejs_pool_min_threads = 64;
650 config.nodejs_pool_max_threads = 8;
651
652 let result = std::panic::catch_unwind(|| {
653 config.validate();
654 });
655 assert!(result.is_err(), "Should panic on invalid min > max threads");
656 }
657
658 #[test]
659 fn test_overridden_values_respected() {
660 let max_concurrency = 1000;
663 let pool_max_queue_size = 5000; let pool_max_connections = 1000; assert_eq!(pool_max_connections, max_concurrency); assert_eq!(pool_max_queue_size, 5000); let auto_derived_queue = max_concurrency * 2;
672 assert_eq!(auto_derived_queue, 2000);
673 assert_ne!(pool_max_queue_size, auto_derived_queue); }
675}