Implement DynamicConfig with ArcSwap hot-reload and ConfigReloadHandle
Add ConfigReloadHandle with Arc<ArcSwap<DynamicConfig>> for lock-free reads on the request hot path and tokio::sync::Mutex-serialized reload. Add static config change detection via diff_static_config(). Add DynamicConfig validation (rate_limit, body_limit, site checks). Add PartialEq derives to config types. Include unit tests for ArcSwap swap visibility, invalid config rejection, and concurrent reload serialization.
This commit is contained in:
@@ -1,7 +1,14 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use arc_swap::ArcSwap;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
use super::static_config::StaticConfig;
|
||||||
|
use super::validation::validate_config;
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
#[derive(Debug, Deserialize, Clone)]
|
#[derive(Debug, Deserialize, Clone, PartialEq)]
|
||||||
pub struct DynamicConfig {
|
pub struct DynamicConfig {
|
||||||
pub sites: Vec<SiteConfig>,
|
pub sites: Vec<SiteConfig>,
|
||||||
pub rate_limit: RateLimitConfig,
|
pub rate_limit: RateLimitConfig,
|
||||||
@@ -9,7 +16,7 @@ pub struct DynamicConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
#[derive(Debug, Deserialize, Clone)]
|
#[derive(Debug, Deserialize, Clone, PartialEq)]
|
||||||
pub struct SiteConfig {
|
pub struct SiteConfig {
|
||||||
pub host: String,
|
pub host: String,
|
||||||
pub upstream: String,
|
pub upstream: String,
|
||||||
@@ -37,14 +44,173 @@ fn default_request_timeout() -> u64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
#[derive(Debug, Deserialize, Clone)]
|
#[derive(Debug, Deserialize, Clone, PartialEq)]
|
||||||
pub struct RateLimitConfig {
|
pub struct RateLimitConfig {
|
||||||
pub requests_per_second: u32,
|
pub requests_per_second: u32,
|
||||||
pub burst: u32,
|
pub burst: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
#[derive(Debug, Deserialize, Clone)]
|
#[derive(Debug, Deserialize, Clone, PartialEq)]
|
||||||
pub struct BodyConfig {
|
pub struct BodyConfig {
|
||||||
pub limit_bytes: u64,
|
pub limit_bytes: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub struct ConfigReloadHandle {
|
||||||
|
config: Arc<ArcSwap<DynamicConfig>>,
|
||||||
|
static_config: StaticConfig,
|
||||||
|
reload_mutex: Mutex<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
impl ConfigReloadHandle {
|
||||||
|
pub fn new(config: Arc<ArcSwap<DynamicConfig>>, static_config: StaticConfig) -> Self {
|
||||||
|
Self {
|
||||||
|
config,
|
||||||
|
static_config,
|
||||||
|
reload_mutex: Mutex::new(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn load(&self) -> Arc<DynamicConfig> {
|
||||||
|
self.config.load_full()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn reload(
|
||||||
|
&self,
|
||||||
|
new_static: StaticConfig,
|
||||||
|
new_dynamic: DynamicConfig,
|
||||||
|
) -> anyhow::Result<Vec<String>> {
|
||||||
|
let _guard = self.reload_mutex.lock().await;
|
||||||
|
|
||||||
|
validate_config(&new_static, &new_dynamic)?;
|
||||||
|
|
||||||
|
let changed_fields = diff_static_config(&self.static_config, &new_static);
|
||||||
|
|
||||||
|
self.config.store(Arc::new(new_dynamic));
|
||||||
|
|
||||||
|
Ok(changed_fields)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn diff_static_config(old: &StaticConfig, new: &StaticConfig) -> Vec<String> {
|
||||||
|
let mut changes = Vec::new();
|
||||||
|
|
||||||
|
if old.listeners != new.listeners {
|
||||||
|
changes.push("listeners".to_string());
|
||||||
|
}
|
||||||
|
if old.allow_wildcard_bind != new.allow_wildcard_bind {
|
||||||
|
changes.push("allow_wildcard_bind".to_string());
|
||||||
|
}
|
||||||
|
if old.health_check_port != new.health_check_port {
|
||||||
|
changes.push("health_check_port".to_string());
|
||||||
|
}
|
||||||
|
if old.admin_socket_path != new.admin_socket_path {
|
||||||
|
changes.push("admin_socket_path".to_string());
|
||||||
|
}
|
||||||
|
if old.shutdown_timeout_secs != new.shutdown_timeout_secs {
|
||||||
|
changes.push("shutdown_timeout_secs".to_string());
|
||||||
|
}
|
||||||
|
if old.logging != new.logging {
|
||||||
|
changes.push("logging".to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
changes
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::config::test_fixtures;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn arcswap_swap_visible_after_reload() {
|
||||||
|
let initial = test_fixtures::test_dynamic_config();
|
||||||
|
let config_arc = Arc::new(ArcSwap::from_pointee(initial.clone()));
|
||||||
|
let static_config = test_fixtures::test_static_config();
|
||||||
|
let handle = ConfigReloadHandle::new(config_arc.clone(), static_config);
|
||||||
|
|
||||||
|
let loaded = handle.load();
|
||||||
|
assert_eq!(loaded.sites.len(), 1);
|
||||||
|
assert_eq!(loaded.rate_limit.requests_per_second, 10);
|
||||||
|
|
||||||
|
let mut new_dynamic = initial.clone();
|
||||||
|
new_dynamic.rate_limit.requests_per_second = 50;
|
||||||
|
new_dynamic.sites.push(SiteConfig {
|
||||||
|
host: "new.test".to_string(),
|
||||||
|
upstream: "127.0.0.1:9090".to_string(),
|
||||||
|
upstream_scheme: "http".to_string(),
|
||||||
|
upstream_connect_timeout_secs: 5,
|
||||||
|
upstream_request_timeout_secs: 60,
|
||||||
|
});
|
||||||
|
|
||||||
|
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||||
|
rt.block_on(handle.reload(test_fixtures::test_static_config(), new_dynamic))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let loaded = handle.load();
|
||||||
|
assert_eq!(loaded.sites.len(), 2);
|
||||||
|
assert_eq!(loaded.rate_limit.requests_per_second, 50);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn reload_rejects_invalid_config() {
|
||||||
|
let initial = test_fixtures::test_dynamic_config();
|
||||||
|
let config_arc = Arc::new(ArcSwap::from_pointee(initial.clone()));
|
||||||
|
let static_config = test_fixtures::test_static_config();
|
||||||
|
let handle = ConfigReloadHandle::new(config_arc.clone(), static_config);
|
||||||
|
|
||||||
|
let mut invalid_dynamic = initial.clone();
|
||||||
|
invalid_dynamic.rate_limit.requests_per_second = 0;
|
||||||
|
|
||||||
|
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||||
|
let result =
|
||||||
|
rt.block_on(handle.reload(test_fixtures::test_static_config(), invalid_dynamic));
|
||||||
|
assert!(result.is_err());
|
||||||
|
|
||||||
|
let loaded = config_arc.load();
|
||||||
|
assert_eq!(loaded.rate_limit.requests_per_second, 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn concurrent_reload_serialization() {
|
||||||
|
let initial = test_fixtures::test_dynamic_config();
|
||||||
|
let config_arc = Arc::new(ArcSwap::from_pointee(initial.clone()));
|
||||||
|
let static_config = test_fixtures::test_static_config();
|
||||||
|
let handle = Arc::new(ConfigReloadHandle::new(config_arc.clone(), static_config));
|
||||||
|
|
||||||
|
let mut handles = Vec::new();
|
||||||
|
for i in 1..=5u32 {
|
||||||
|
let h = handle.clone();
|
||||||
|
let initial = initial.clone();
|
||||||
|
handles.push(tokio::spawn(async move {
|
||||||
|
let mut dynamic = initial.clone();
|
||||||
|
dynamic.rate_limit.requests_per_second = i * 10;
|
||||||
|
h.reload(test_fixtures::test_static_config(), dynamic).await
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
for h in handles {
|
||||||
|
h.await.unwrap().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let loaded = config_arc.load();
|
||||||
|
let rps = loaded.rate_limit.requests_per_second;
|
||||||
|
assert!((rps == 10) || (rps == 20) || (rps == 30) || (rps == 40) || (rps == 50));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn static_config_diff_detects_changes() {
|
||||||
|
let old = test_fixtures::test_static_config();
|
||||||
|
let mut new = old.clone();
|
||||||
|
assert!(diff_static_config(&old, &new).is_empty());
|
||||||
|
|
||||||
|
new.health_check_port = 8080;
|
||||||
|
new.logging.level = "debug".to_string();
|
||||||
|
let changes = diff_static_config(&old, &new);
|
||||||
|
assert!(changes.contains(&"health_check_port".to_string()));
|
||||||
|
assert!(changes.contains(&"logging".to_string()));
|
||||||
|
assert_eq!(changes.len(), 2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -2,3 +2,9 @@ pub mod dynamic_config;
|
|||||||
pub mod static_config;
|
pub mod static_config;
|
||||||
pub mod test_fixtures;
|
pub mod test_fixtures;
|
||||||
pub mod validation;
|
pub mod validation;
|
||||||
|
|
||||||
|
pub use dynamic_config::{
|
||||||
|
BodyConfig, ConfigReloadHandle, DynamicConfig, RateLimitConfig, SiteConfig,
|
||||||
|
};
|
||||||
|
pub use static_config::{ListenerConfig, LoggingConfig, StaticConfig, TlsConfig};
|
||||||
|
pub use validation::validate_config;
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize, PartialEq)]
|
||||||
pub struct StaticConfig {
|
pub struct StaticConfig {
|
||||||
pub listeners: Vec<ListenerConfig>,
|
pub listeners: Vec<ListenerConfig>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
@@ -32,7 +32,7 @@ fn default_shutdown_timeout_secs() -> u64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize, PartialEq)]
|
||||||
pub struct ListenerConfig {
|
pub struct ListenerConfig {
|
||||||
pub bind_addr: String,
|
pub bind_addr: String,
|
||||||
#[serde(default = "default_http_port")]
|
#[serde(default = "default_http_port")]
|
||||||
@@ -55,7 +55,7 @@ fn default_https_port() -> u16 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize, PartialEq)]
|
||||||
pub struct TlsConfig {
|
pub struct TlsConfig {
|
||||||
pub mode: String,
|
pub mode: String,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
@@ -76,7 +76,7 @@ fn default_acme_directory() -> String {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize, PartialEq)]
|
||||||
pub struct LoggingConfig {
|
pub struct LoggingConfig {
|
||||||
#[serde(default = "default_log_level")]
|
#[serde(default = "default_log_level")]
|
||||||
pub level: String,
|
pub level: String,
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use anyhow::Result;
|
use anyhow::{bail, Result};
|
||||||
|
|
||||||
use super::dynamic_config::DynamicConfig;
|
use super::dynamic_config::DynamicConfig;
|
||||||
use super::static_config::StaticConfig;
|
use super::static_config::StaticConfig;
|
||||||
@@ -6,7 +6,61 @@ use super::static_config::StaticConfig;
|
|||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub fn validate_config(
|
pub fn validate_config(
|
||||||
_static_config: &StaticConfig,
|
_static_config: &StaticConfig,
|
||||||
_dynamic_config: &DynamicConfig,
|
dynamic_config: &DynamicConfig,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
|
if dynamic_config.rate_limit.requests_per_second == 0 {
|
||||||
|
bail!("rate_limit.requests_per_second must be > 0");
|
||||||
|
}
|
||||||
|
if dynamic_config.rate_limit.burst == 0 {
|
||||||
|
bail!("rate_limit.burst must be > 0");
|
||||||
|
}
|
||||||
|
if dynamic_config.body.limit_bytes == 0 {
|
||||||
|
bail!("body.limit_bytes must be > 0");
|
||||||
|
}
|
||||||
|
for site in &dynamic_config.sites {
|
||||||
|
if site.host.is_empty() {
|
||||||
|
bail!("site host must not be empty");
|
||||||
|
}
|
||||||
|
if site.upstream.is_empty() {
|
||||||
|
bail!("site upstream must not be empty");
|
||||||
|
}
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::config::test_fixtures;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn valid_config_passes_validation() {
|
||||||
|
let static_config = test_fixtures::test_static_config();
|
||||||
|
let dynamic_config = test_fixtures::test_dynamic_config();
|
||||||
|
assert!(validate_config(&static_config, &dynamic_config).is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zero_requests_per_second_fails() {
|
||||||
|
let static_config = test_fixtures::test_static_config();
|
||||||
|
let mut dynamic_config = test_fixtures::test_dynamic_config();
|
||||||
|
dynamic_config.rate_limit.requests_per_second = 0;
|
||||||
|
assert!(validate_config(&static_config, &dynamic_config).is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zero_burst_fails() {
|
||||||
|
let static_config = test_fixtures::test_static_config();
|
||||||
|
let mut dynamic_config = test_fixtures::test_dynamic_config();
|
||||||
|
dynamic_config.rate_limit.burst = 0;
|
||||||
|
assert!(validate_config(&static_config, &dynamic_config).is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn zero_body_limit_fails() {
|
||||||
|
let static_config = test_fixtures::test_static_config();
|
||||||
|
let mut dynamic_config = test_fixtures::test_dynamic_config();
|
||||||
|
dynamic_config.body.limit_bytes = 0;
|
||||||
|
assert!(validate_config(&static_config, &dynamic_config).is_err());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -207,13 +207,27 @@ mod tests {
|
|||||||
.map(|cs| format!("{cs:?}"))
|
.map(|cs| format!("{cs:?}"))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
assert!(cipher_suites.iter().any(|cs| cs.contains("AES_256_GCM_SHA384")));
|
assert!(cipher_suites
|
||||||
assert!(cipher_suites.iter().any(|cs| cs.contains("AES_128_GCM_SHA256")));
|
.iter()
|
||||||
assert!(cipher_suites.iter().any(|cs| cs.contains("CHACHA20_POLY1305_SHA256")));
|
.any(|cs| cs.contains("AES_256_GCM_SHA384")));
|
||||||
assert!(cipher_suites.iter().any(|cs| cs.contains("ECDHE_ECDSA_WITH_AES_256_GCM_SHA384")));
|
assert!(cipher_suites
|
||||||
assert!(cipher_suites.iter().any(|cs| cs.contains("ECDHE_ECDSA_WITH_AES_128_GCM_SHA256")));
|
.iter()
|
||||||
assert!(cipher_suites.iter().any(|cs| cs.contains("ECDHE_RSA_WITH_AES_256_GCM_SHA384")));
|
.any(|cs| cs.contains("AES_128_GCM_SHA256")));
|
||||||
assert!(cipher_suites.iter().any(|cs| cs.contains("ECDHE_RSA_WITH_AES_128_GCM_SHA256")));
|
assert!(cipher_suites
|
||||||
|
.iter()
|
||||||
|
.any(|cs| cs.contains("CHACHA20_POLY1305_SHA256")));
|
||||||
|
assert!(cipher_suites
|
||||||
|
.iter()
|
||||||
|
.any(|cs| cs.contains("ECDHE_ECDSA_WITH_AES_256_GCM_SHA384")));
|
||||||
|
assert!(cipher_suites
|
||||||
|
.iter()
|
||||||
|
.any(|cs| cs.contains("ECDHE_ECDSA_WITH_AES_128_GCM_SHA256")));
|
||||||
|
assert!(cipher_suites
|
||||||
|
.iter()
|
||||||
|
.any(|cs| cs.contains("ECDHE_RSA_WITH_AES_256_GCM_SHA384")));
|
||||||
|
assert!(cipher_suites
|
||||||
|
.iter()
|
||||||
|
.any(|cs| cs.contains("ECDHE_RSA_WITH_AES_128_GCM_SHA256")));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -310,4 +324,4 @@ mod tests {
|
|||||||
let result = load_private_key("/nonexistent/path/key.pem");
|
let result = load_private_key("/nonexistent/path/key.pem");
|
||||||
assert!(result.is_err());
|
assert!(result.is_err());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user