feat(core): implement StaticConfig/DynamicConfig split with ArcSwap hot-reload

Split alknet-core configuration into StaticConfig (immutable after startup)
and DynamicConfig (hot-reloadable at runtime via ArcSwap).

- Add StaticConfig struct in config/static_config.rs with all fields per ADR-030
- Add DynamicConfig struct with AuthPolicy, ForwardingPolicy, RateLimitConfig
- Add ForwardingPolicy with allow_all()/deny_all() defaults (ADR-031)
- Add ConfigReloadHandle with reload() method for runtime config updates
- Replace Arc<ServerAuthConfig> with Arc<ArcSwap<DynamicConfig>> in ServerHandler
- Add config_reload_handle() to Server for obtaining reload handles
- Add AuthPolicy with authenticate_publickey/authenticate_certificate methods
- All existing tests pass with the new config structure
- Default DynamicConfig produces identical behavior to current code
This commit is contained in:
2026-06-07 14:03:46 +00:00
parent a7f0dcdeb9
commit ee1b3f3819
36 changed files with 964 additions and 393 deletions

View File

@@ -46,7 +46,10 @@ async fn connect_direct(target: SocketAddr) -> Result<TcpStream, ChannelProxyErr
.map_err(|e| map_connection_error(e, target))
}
async fn connect_socks5(target: SocketAddr, proxy_addr: SocketAddr) -> Result<TcpStream, ChannelProxyError> {
async fn connect_socks5(
target: SocketAddr,
proxy_addr: SocketAddr,
) -> Result<TcpStream, ChannelProxyError> {
let mut stream = TcpStream::connect(proxy_addr)
.await
.map_err(ChannelProxyError::from)?;
@@ -134,10 +137,7 @@ async fn connect_http_connect(
}
let response_str = String::from_utf8_lossy(&response);
let status_line = response_str
.lines()
.next()
.unwrap_or("");
let status_line = response_str.lines().next().unwrap_or("");
if status_line.contains("200") {
Ok(stream)
@@ -279,11 +279,7 @@ mod tests {
.parse()
.unwrap();
let reply = vec![
0x05, 0x00, 0x00, 0x01,
0, 0, 0, 0,
0, 0,
];
let reply = vec![0x05, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 0];
proxy_sock.write_all(&reply).await.unwrap();
let mut target_stream = TcpStream::connect(target).await.unwrap();
@@ -323,11 +319,7 @@ mod tests {
let mut port_bytes = [0u8; 2];
proxy_sock.read_exact(&mut port_bytes).await.unwrap();
let reply = vec![
0x05, 0x05, 0x00, 0x01,
0, 0, 0, 0,
0, 0,
];
let reply = vec![0x05, 0x05, 0x00, 0x01, 0, 0, 0, 0, 0, 0];
proxy_sock.write_all(&reply).await.unwrap();
});
@@ -560,4 +552,4 @@ mod tests {
let proxy = direct_config();
proxy_channel(channel, target, &proxy).await;
}
}
}

View File

@@ -189,4 +189,4 @@ mod tests {
fn control_channel_destination_matches_prefix() {
assert!(is_reserved_destination(ALKNET_CONTROL_DESTINATION));
}
}
}

View File

@@ -2,16 +2,15 @@ use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::time::Instant;
use arc_swap::ArcSwap;
use async_trait::async_trait;
use russh::keys::ssh_key::HashAlg;
use russh::server::{Auth, Handler, Msg, Session};
use russh::Channel;
use russh::ChannelId;
use crate::auth::ServerAuthConfig;
use crate::server::control_channel::{
ControlChannelHandler, ControlChannelRouter, ALKNET_PREFIX,
};
use crate::config::DynamicConfig;
use crate::server::control_channel::{ControlChannelHandler, ControlChannelRouter, ALKNET_PREFIX};
use crate::server::rate_limit::{AuthAttemptLimiter, ConnectionRateLimiter};
#[derive(Debug, Clone)]
@@ -44,7 +43,7 @@ impl std::fmt::Display for TransportKind {
}
pub struct ServerHandler {
auth_config: Arc<ServerAuthConfig>,
dynamic: Arc<ArcSwap<DynamicConfig>>,
#[allow(dead_code)]
outbound_proxy: Option<ProxyConfig>,
remote_addr: Option<SocketAddr>,
@@ -59,7 +58,7 @@ pub struct ServerHandler {
impl ServerHandler {
pub fn new(
auth_config: Arc<ServerAuthConfig>,
dynamic: Arc<ArcSwap<DynamicConfig>>,
outbound_proxy: Option<ProxyConfig>,
remote_addr: Option<SocketAddr>,
transport: TransportKind,
@@ -89,7 +88,7 @@ impl ServerHandler {
};
Self {
auth_config,
dynamic,
outbound_proxy,
remote_addr,
control_channel_router: ControlChannelRouter::without_handler(),
@@ -127,10 +126,7 @@ impl Drop for ServerHandler {
}
impl ServerHandler {
pub fn with_control_channel_handler(
mut self,
handler: Box<dyn ControlChannelHandler>,
) -> Self {
pub fn with_control_channel_handler(mut self, handler: Box<dyn ControlChannelHandler>) -> Self {
self.control_channel_router = ControlChannelRouter::with_handler(handler);
self
}
@@ -172,7 +168,8 @@ impl Handler for ServerHandler {
.map_or("unknown".to_string(), |a| a.to_string());
let russh_pub = russh::keys::PublicKey::new(public_key.key_data().clone(), user);
let result = self.auth_config.authenticate_publickey(&russh_pub);
let auth_config = self.dynamic.load();
let result = auth_config.auth.authenticate_publickey(&russh_pub);
match result {
Ok(()) => {
@@ -226,17 +223,25 @@ impl Handler for ServerHandler {
});
tokio::spawn(async move {
let target = match format!("{target_host}:{target_port}").parse::<std::net::SocketAddr>() {
Ok(addr) => addr,
Err(_) => match tokio::net::lookup_host((&target_host[..], target_port as u16)).await {
Ok(mut addrs) => match addrs.next() {
Some(addr) => addr,
None => return,
let target =
match format!("{target_host}:{target_port}").parse::<std::net::SocketAddr>() {
Ok(addr) => addr,
Err(_) => match tokio::net::lookup_host((&target_host[..], target_port as u16))
.await
{
Ok(mut addrs) => match addrs.next() {
Some(addr) => addr,
None => return,
},
Err(_) => return,
},
Err(_) => return,
},
};
crate::server::channel_proxy::proxy_channel(channel.into_stream(), target, &proxy_config).await;
};
crate::server::channel_proxy::proxy_channel(
channel.into_stream(),
target,
&proxy_config,
)
.await;
});
let _ = (originator_address, originator_port);
@@ -389,7 +394,12 @@ impl Handler for ServerHandler {
channel = %channel,
"rejected x11 request on channel"
);
let _ = (single_connection, x11_auth_protocol, x11_auth_cookie, x11_screen_number);
let _ = (
single_connection,
x11_auth_protocol,
x11_auth_cookie,
x11_screen_number,
);
let _ = session.channel_failure(channel);
Ok(())
}
@@ -469,6 +479,8 @@ impl Handler for ServerHandler {
mod tests {
use super::*;
use crate::auth::keys::KeySource;
use crate::auth::ServerAuthConfig;
use crate::config::AuthPolicy;
use russh::keys::{decode_secret_key, PrivateKey};
use std::io::Write;
@@ -487,19 +499,19 @@ mod tests {
decode_secret_key(ED25519_PRIVATE_KEY, None).unwrap()
}
fn make_auth_config(keys_content: &str) -> Arc<ServerAuthConfig> {
fn make_auth_config(keys_content: &str) -> Arc<ArcSwap<DynamicConfig>> {
let f = make_authorized_keys_file(keys_content);
Arc::new(
ServerAuthConfig::from_keys_and_ca(
Some(KeySource::File(f.path().to_path_buf())),
None,
)
.unwrap(),
)
let server_auth =
ServerAuthConfig::from_keys_and_ca(Some(KeySource::File(f.path().to_path_buf())), None)
.unwrap();
let auth_policy = AuthPolicy::from_server_auth_config(server_auth);
let dynamic = DynamicConfig::new(auth_policy);
Arc::new(ArcSwap::new(Arc::new(dynamic)))
}
fn make_empty_auth_config() -> Arc<ServerAuthConfig> {
Arc::new(ServerAuthConfig::from_keys_and_ca(None, None).unwrap())
fn make_empty_auth_config() -> Arc<ArcSwap<DynamicConfig>> {
let dynamic = DynamicConfig::default();
Arc::new(ArcSwap::new(Arc::new(dynamic)))
}
fn default_limiter() -> Arc<ConnectionRateLimiter> {
@@ -507,11 +519,18 @@ mod tests {
}
fn make_handler(
auth_config: Arc<ServerAuthConfig>,
dynamic: Arc<ArcSwap<DynamicConfig>>,
outbound_proxy: Option<ProxyConfig>,
remote_addr: Option<SocketAddr>,
) -> ServerHandler {
ServerHandler::new(auth_config, outbound_proxy, remote_addr, TransportKind::Tcp, default_limiter(), 10)
ServerHandler::new(
dynamic,
outbound_proxy,
remote_addr,
TransportKind::Tcp,
default_limiter(),
10,
)
}
#[tokio::test]
@@ -530,10 +549,9 @@ mod tests {
let mut handler = make_handler(auth_config, None, None);
let other_key_text = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIHeLC1lWiCYrXsf/85O/pkbUFZ6OGIt49PX3nw8iRoXE other@host";
let other_ssh_key = russh::keys::parse_public_key_base64(
other_key_text.split_whitespace().nth(1).unwrap(),
)
.unwrap();
let other_ssh_key =
russh::keys::parse_public_key_base64(other_key_text.split_whitespace().nth(1).unwrap())
.unwrap();
let result = handler
.auth_publickey("testuser", &other_ssh_key)
@@ -553,10 +571,7 @@ mod tests {
let mut handler = make_handler(auth_config, None, None);
let ssh_key = load_key().public_key().clone();
let result = handler
.auth_publickey("testuser", &ssh_key)
.await
.unwrap();
let result = handler.auth_publickey("testuser", &ssh_key).await.unwrap();
assert_eq!(
result,
Auth::Reject {
@@ -629,8 +644,16 @@ mod tests {
#[test]
fn one_handler_per_connection() {
let auth_config = make_empty_auth_config();
let handler1 = make_handler(auth_config.clone(), None, Some("10.0.0.1:22".parse().unwrap()));
let handler2 = make_handler(auth_config.clone(), None, Some("10.0.0.2:22".parse().unwrap()));
let handler1 = make_handler(
auth_config.clone(),
None,
Some("10.0.0.1:22".parse().unwrap()),
);
let handler2 = make_handler(
auth_config.clone(),
None,
Some("10.0.0.2:22".parse().unwrap()),
);
assert!(handler1.remote_addr != handler2.remote_addr);
}
@@ -651,10 +674,20 @@ mod tests {
let ssh_key = load_key().public_key().clone();
let r1 = handler.auth_publickey("user", &ssh_key).await.unwrap();
assert_eq!(r1, Auth::Reject { proceed_with_methods: None });
assert_eq!(
r1,
Auth::Reject {
proceed_with_methods: None
}
);
let r2 = handler.auth_publickey("user", &ssh_key).await.unwrap();
assert_eq!(r2, Auth::Reject { proceed_with_methods: None });
assert_eq!(
r2,
Auth::Reject {
proceed_with_methods: None
}
);
assert!(!handler.auth_limiter.check());
}
@@ -733,4 +766,4 @@ mod tests {
10,
);
}
}
}

View File

@@ -16,10 +16,12 @@ pub mod stealth;
pub use channel_proxy::{connect_outbound, proxy_channel};
pub use control_channel::{
ControlChannelHandler, ControlChannelRouter, DuplexStream, ALKNET_CONTROL_DESTINATION,
ALKNET_PREFIX, is_reserved_destination,
is_reserved_destination, ControlChannelHandler, ControlChannelRouter, DuplexStream,
ALKNET_CONTROL_DESTINATION, ALKNET_PREFIX,
};
pub use handler::{ProxyConfig, ProxyMode, ServerHandler, TransportKind};
pub use rate_limit::{AuthAttemptLimiter, ConnectionRateLimiter};
pub use serve::{Server, ServeError, ServeOptions, ServeTransportMode};
pub use stealth::{ProtocolDetection, detect_protocol, send_fake_nginx_404, validate_stealth_config};
pub use serve::{ServeError, ServeOptions, ServeTransportMode, Server};
pub use stealth::{
detect_protocol, send_fake_nginx_404, validate_stealth_config, ProtocolDetection,
};

View File

@@ -197,4 +197,4 @@ mod tests {
h.join().unwrap();
}
}
}
}

View File

@@ -8,12 +8,14 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwap;
use russh::server::{self, Config};
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{error, info, warn};
use crate::auth::keys::KeySource;
use crate::auth::server_auth::ServerAuthConfig;
use crate::config::{AuthPolicy, ConfigReloadHandle, DynamicConfig};
use crate::error::ConfigError;
use crate::server::handler::{ProxyConfig, ProxyMode, ServerHandler, TransportKind};
use crate::server::rate_limit::ConnectionRateLimiter;
@@ -228,7 +230,7 @@ struct ActiveSession {
/// Supports stealth mode (TLS only), outbound proxy routing, and connection rate limiting.
pub struct Server {
config: Arc<server::Config>,
auth_config: Arc<ServerAuthConfig>,
dynamic: Arc<ArcSwap<DynamicConfig>>,
connection_limiter: Arc<ConnectionRateLimiter>,
outbound_proxy: Option<ProxyConfig>,
stealth: bool,
@@ -244,17 +246,24 @@ impl Server {
pub fn new(opts: ServeOptions) -> Result<Self, ServeError> {
opts.validate().map_err(ServeError::Config)?;
let private_key =
crate::auth::keys::load_private_key(opts.key.clone()).map_err(ServeError::KeyLoadFailed)?;
let private_key = crate::auth::keys::load_private_key(opts.key.clone())
.map_err(ServeError::KeyLoadFailed)?;
let auth_config = Arc::new(
ServerAuthConfig::from_keys_and_ca(opts.authorized_keys.clone(), opts.cert_authority.clone())
.map_err(ServeError::KeyLoadFailed)?,
);
let auth_config = ServerAuthConfig::from_keys_and_ca(
opts.authorized_keys.clone(),
opts.cert_authority.clone(),
)
.map_err(ServeError::KeyLoadFailed)?;
let auth_policy = AuthPolicy::from_server_auth_config(auth_config);
let dynamic_config = DynamicConfig::new(auth_policy);
let max_auth_attempts = opts.max_auth_attempts;
let max_connections_per_ip = opts.max_connections_per_ip;
let config = Arc::new(Config {
keys: vec![private_key],
max_auth_attempts: opts.max_auth_attempts,
max_auth_attempts,
methods: russh::MethodSet::PUBLICKEY,
preferred: russh::Preferred::DEFAULT,
..Default::default()
@@ -262,19 +271,21 @@ impl Server {
let outbound_proxy = parse_proxy_config(opts.proxy.as_deref());
let connection_limiter = Arc::new(ConnectionRateLimiter::new(opts.max_connections_per_ip));
let connection_limiter = Arc::new(ConnectionRateLimiter::new(max_connections_per_ip));
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
let dynamic = Arc::new(ArcSwap::new(Arc::new(dynamic_config)));
Ok(Self {
config,
auth_config,
dynamic,
connection_limiter,
outbound_proxy,
stealth: opts.stealth,
transport_mode: opts.transport_mode,
listen_addr: opts.listen_addr,
max_auth_attempts: opts.max_auth_attempts,
max_auth_attempts,
shutdown_tx,
shutdown_rx,
sessions: Arc::new(tokio::sync::Mutex::new(Vec::new())),
@@ -285,6 +296,12 @@ impl Server {
self.shutdown_tx.clone()
}
pub fn config_reload_handle(&self) -> ConfigReloadHandle {
ConfigReloadHandle {
dynamic: Arc::clone(&self.dynamic),
}
}
pub async fn shutdown(&self) -> Result<(), ServeError> {
info!("initiating graceful shutdown");
let _ = self.shutdown_tx.send(true);
@@ -292,11 +309,15 @@ impl Server {
{
let sessions = self.sessions.lock().await;
for session in sessions.iter() {
if let Err(e) = session.handle.disconnect(
russh::Disconnect::ByApplication,
"shutdown".to_string(),
String::new(),
).await {
if let Err(e) = session
.handle
.disconnect(
russh::Disconnect::ByApplication,
"shutdown".to_string(),
String::new(),
)
.await
{
warn!("failed to send SSH disconnect: {e}");
}
}
@@ -392,7 +413,7 @@ impl Server {
let handler_transport_kind = transport_kind;
let handler = ServerHandler::new(
Arc::clone(&server.auth_config),
Arc::clone(&server.dynamic),
server.outbound_proxy.clone(),
remote_addr,
handler_transport_kind,
@@ -410,15 +431,9 @@ impl Server {
let transport_is_tls = server.transport_mode == ServeTransportMode::Tls;
tokio::spawn(async move {
let result = handle_connection(
stream,
config,
handler,
sessions,
stealth,
transport_is_tls,
)
.await;
let result =
handle_connection(stream, config, handler, sessions, stealth, transport_is_tls)
.await;
if let Err(e) = result {
warn!("connection error: {e}");
@@ -611,8 +626,7 @@ mod tests {
#[test]
fn serve_options_validate_tcp_with_acme_rejected() {
let opts =
ServeOptions::new(make_key_source()).acme_domain("example.com");
let opts = ServeOptions::new(make_key_source()).acme_domain("example.com");
assert!(opts.validate().is_err());
}
@@ -626,8 +640,8 @@ mod tests {
#[test]
fn server_new_creates_server() {
let opts = ServeOptions::new(make_key_source())
.authorized_keys(make_authorized_keys_source());
let opts =
ServeOptions::new(make_key_source()).authorized_keys(make_authorized_keys_source());
let server = Server::new(opts).unwrap();
assert_eq!(server.max_auth_attempts, 10);
}
@@ -662,8 +676,8 @@ mod tests {
#[test]
fn serve_options_debug_redacts_keys() {
let opts = ServeOptions::new(make_key_source())
.authorized_keys(make_authorized_keys_source());
let opts =
ServeOptions::new(make_key_source()).authorized_keys(make_authorized_keys_source());
let debug_str = format!("{:?}", opts);
assert!(debug_str.contains("<KeySource>"));
assert!(!debug_str.contains("OPENSSH"));
@@ -715,8 +729,8 @@ mod tests {
#[test]
fn server_shutdown_sender_clones() {
let opts = ServeOptions::new(make_key_source())
.authorized_keys(make_authorized_keys_source());
let opts =
ServeOptions::new(make_key_source()).authorized_keys(make_authorized_keys_source());
let server = Server::new(opts).unwrap();
let sender = server.shutdown_sender();
assert!(!server.is_shutdown());
@@ -726,8 +740,7 @@ mod tests {
#[test]
fn server_holds_listen_addr() {
let opts = ServeOptions::new(make_key_source())
.listen_addr("0.0.0.0:443");
let opts = ServeOptions::new(make_key_source()).listen_addr("0.0.0.0:443");
let server = Server::new(opts).unwrap();
assert_eq!(server.listen_addr, "0.0.0.0:443");
}
@@ -747,12 +760,10 @@ mod tests {
let server = Server::new(opts).unwrap();
let shutdown_tx = server.shutdown_sender();
let server_handle = tokio::spawn(async move {
server
.run(acceptor, None)
.await
.expect("server run failed")
});
let server_handle =
tokio::spawn(
async move { server.run(acceptor, None).await.expect("server run failed") },
);
tokio::time::sleep(Duration::from_millis(50)).await;
@@ -760,6 +771,9 @@ mod tests {
let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
assert!(result.is_ok(), "server should have shut down within timeout");
assert!(
result.is_ok(),
"server should have shut down within timeout"
);
}
}
}

View File

@@ -134,7 +134,10 @@ mod tests {
let mut all_data = Vec::new();
reader.read_to_end(&mut all_data).await.unwrap();
assert!(all_data.starts_with(banner), "banner bytes must be preserved after detection");
assert!(
all_data.starts_with(banner),
"banner bytes must be preserved after detection"
);
}
#[tokio::test]
@@ -142,7 +145,10 @@ mod tests {
let (client, server) = duplex(1024);
let (mut client_read, mut client_write) = tokio::io::split(client);
client_write.write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n").await.unwrap();
client_write
.write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
.await
.unwrap();
drop(client_write);
let (detection, mut reader) = detect_protocol(server).await;
@@ -206,7 +212,10 @@ mod tests {
let (client, server) = duplex(1024);
let mut client = client;
client.write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n").await.unwrap();
client
.write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
.await
.unwrap();
let (detection, mut reader) = detect_protocol(server).await;
assert_eq!(detection, ProtocolDetection::Http);
@@ -223,4 +232,4 @@ mod tests {
let result = client.read(&mut extra).await;
assert!(result.is_err() || result.unwrap() == 0);
}
}
}