10 Commits

Author SHA1 Message Date
128affd264 Implement ConnectOptions struct and ClientSession orchestration with graceful shutdown
Adds client/connect.rs with ConnectOptions (programmatic API per ADR-011),
ClientSession::new() for SSH session establishment, ClientSession::run()
for SOCKS5 + port forwards + shutdown, and graceful shutdown via
SIGTERM/SIGINT with SSH disconnect and 2s drain timeout.
2026-06-02 11:07:33 +00:00
f963898a05 Implement control channel routing for wraith-* reserved destinations (ADR-018)
- Add control_channel.rs with WRAITH_CONTROL_DESTINATION, WRAITH_PREFIX constants
- Add ControlChannelHandler trait and ControlChannelRouter for routing logic
- Add DuplexStream supertrait for Box<dyn> compatibility
- Server handler rejects wraith-* destinations when no handler configured
- Add ForwardError type to fix pre-existing compilation error
- Unit tests: reserved detection, non-reserved pass-through, prefix matching
2026-06-02 11:01:54 +00:00
992d478630 Merge remote-tracking branch 'origin/feat/transport/acme-cert-provisioning' 2026-06-02 10:49:57 +00:00
e3f33a24c3 Implement ACME/Let's Encrypt certificate provisioning (ADR-008)
Add AcmeCertProvider with domain-based and IP-based modes using rustls-acme.
AcmeTlsAcceptor::bind_acme() and TlsAcceptor::bind_acme() provide ACME-integrated
TLS acceptance with automatic cert renewal via background tokio task.
Feature-gated behind 'acme' (implies 'tls'). Unit tests for config construction;
integration test for LE staging marked #[ignore].
2026-06-02 10:49:32 +00:00
5fec0b53d9 Merge remote-tracking branch 'origin/feat/client/socks5-server' 2026-06-02 10:49:20 +00:00
2efd4cf7c5 Implement SOCKS5 server: local proxy forwarding through SSH channels
Convert socks5.rs to directory module with protocol parsing and server
implementation. Socks5Server binds to configurable address (default
127.0.0.1:1080), handles SOCKS5 handshake (no-auth), parses IPv4/IPv6/domain
addresses, and proxies bidirectionally via SSH direct_tcpip channels.
Domain names sent unresolved (SOCKS5h) to prevent DNS leaks (ADR-006).
No logging of request targets per privacy requirements.
2026-06-02 10:49:07 +00:00
4e4afd5020 Merge remote-tracking branch 'origin/feat/client/port-forwarding'
# Conflicts:
#	crates/wraith-core/src/client/mod.rs
#	crates/wraith-core/src/lib.rs
2026-06-02 10:46:54 +00:00
7336c0f13c feat(client): implement port forwarding — local (-L) and remote (-R) forwards
- PortForwardSpec parses -L/-R spec strings: bind_addr:bind_port:target_host:target_port
- LocalForwarder binds TcpListener, accepts connections, opens SSH direct-tcpip channel, proxies bidirectionally
- RemoteForwarder sends tcpip_forward request, handles forwarded-tcpip channel opens, connects local target, proxies bidirectionally
- Both forwarders run concurrently with SOCKS5 server via Arc<Mutex<Handle>>
- Connection errors close individual channels without affecting other forwards or SSH session
- ForwardError type added with display and source chaining tests
- Unit tests: spec parsing, local forward bind/accept, remote forward proxy bidirectional
2026-06-02 10:45:43 +00:00
975778bfb1 Merge remote-tracking branch 'origin/feat/client/channel-manager' 2026-06-02 10:44:32 +00:00
24b92227e7 Implement ServerHandler with auth delegation and channel dispatch
Convert server.rs to directory module (server/mod.rs + server/handler.rs).
ServerHandler implements russh::server::Handler with:
- auth_publickey() delegating to ServerAuthConfig with structured logging
- channel_open_direct_tcpip() routing wraith-* prefix to internal handler,
  stub for regular TCP proxy
- ProxyConfig/ProxyMode types for outbound proxy configuration
- Unit tests for auth delegation, reserved destination routing, and
  unknown channel type rejection
2026-06-02 10:40:05 +00:00
17 changed files with 3022 additions and 8 deletions

View File

@@ -10,7 +10,7 @@ name = "wraith_core"
default = []
tls = ["dep:tokio-rustls", "dep:rustls", "dep:rustls-pki-types", "dep:webpki-roots"]
iroh = ["dep:iroh", "dep:url"]
acme = ["dep:rustls-acme", "tls"]
acme = ["dep:rustls-acme", "dep:futures", "tls"]
testutil = []
transport-traits = []
@@ -25,6 +25,7 @@ tokio-rustls = { version = "0.26", optional = true }
rustls = { version = "0.23", optional = true, features = ["aws_lc_rs"] }
rustls-pki-types = { version = "1", optional = true }
rustls-acme = { version = "0.12", optional = true }
futures = { version = "0.3", optional = true }
webpki-roots = { version = "0.26", optional = true }
iroh = { version = "0.34", optional = true }
url = { version = "2", optional = true }

View File

@@ -0,0 +1,727 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use russh::client;
use russh::keys::PrivateKey;
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};
use crate::auth::client_auth::{ClientAuthConfig, ClientHandler};
use crate::auth::keys::KeySource;
use crate::client::forward::{LocalForwarder, PortForwardSpec, RemoteForwarder};
use crate::error::ConfigError;
use crate::socks5::{HandleChannelOpener, Socks5Server};
use crate::transport::Transport;
const DEFAULT_SOCKS5_ADDR: &str = "127.0.0.1:1080";
const DRAIN_TIMEOUT: Duration = Duration::from_secs(2);
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TransportMode {
Tcp,
Tls,
Iroh,
}
impl std::fmt::Display for TransportMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TransportMode::Tcp => write!(f, "tcp"),
TransportMode::Tls => write!(f, "tls"),
TransportMode::Iroh => write!(f, "iroh"),
}
}
}
#[derive(Clone)]
pub struct ConnectOptions {
pub server: Option<String>,
pub peer: Option<String>,
pub transport_mode: TransportMode,
pub identity: KeySource,
pub socks5_addr: String,
pub forwards: Vec<String>,
pub remote_forwards: Vec<String>,
pub proxy: Option<String>,
pub iroh_relay: Option<String>,
pub tls_server_name: Option<String>,
pub insecure: bool,
}
impl ConnectOptions {
pub fn new(identity: KeySource) -> Self {
Self {
server: None,
peer: None,
transport_mode: TransportMode::Tcp,
identity,
socks5_addr: DEFAULT_SOCKS5_ADDR.to_string(),
forwards: Vec::new(),
remote_forwards: Vec::new(),
proxy: None,
iroh_relay: None,
tls_server_name: None,
insecure: false,
}
}
pub fn server(mut self, addr: impl Into<String>) -> Self {
self.server = Some(addr.into());
self
}
pub fn peer(mut self, endpoint_id: impl Into<String>) -> Self {
self.peer = Some(endpoint_id.into());
self
}
pub fn transport_mode(mut self, mode: TransportMode) -> Self {
self.transport_mode = mode;
self
}
pub fn socks5_addr(mut self, addr: impl Into<String>) -> Self {
self.socks5_addr = addr.into();
self
}
pub fn forward(mut self, spec: impl Into<String>) -> Self {
self.forwards.push(spec.into());
self
}
pub fn remote_forward(mut self, spec: impl Into<String>) -> Self {
self.remote_forwards.push(spec.into());
self
}
pub fn proxy(mut self, url: impl Into<String>) -> Self {
self.proxy = Some(url.into());
self
}
pub fn iroh_relay(mut self, url: impl Into<String>) -> Self {
self.iroh_relay = Some(url.into());
self
}
pub fn tls_server_name(mut self, name: impl Into<String>) -> Self {
self.tls_server_name = Some(name.into());
self
}
pub fn insecure(mut self, insecure: bool) -> Self {
self.insecure = insecure;
self
}
pub fn validate(&self) -> Result<(), ConfigError> {
match self.transport_mode {
TransportMode::Tcp | TransportMode::Tls => {
if self.server.is_none() {
return Err(ConfigError::InvalidFlag {
name: "--server is required for tcp/tls transport".to_string(),
});
}
}
TransportMode::Iroh => {
if self.peer.is_none() {
return Err(ConfigError::InvalidFlag {
name: "--peer is required for iroh transport".to_string(),
});
}
}
}
Ok(())
}
}
impl std::fmt::Debug for ConnectOptions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ConnectOptions")
.field("server", &self.server)
.field("peer", &self.peer)
.field("transport_mode", &self.transport_mode)
.field("identity", &"<KeySource>")
.field("socks5_addr", &self.socks5_addr)
.field("forwards", &self.forwards)
.field("remote_forwards", &self.remote_forwards)
.field("proxy", &self.proxy)
.field("iroh_relay", &self.iroh_relay)
.field("tls_server_name", &self.tls_server_name)
.field("insecure", &self.insecure)
.finish()
}
}
pub struct ClientSession<T: Transport> {
opts: ConnectOptions,
transport: Arc<T>,
handle: Arc<Mutex<client::Handle<ClientHandler>>>,
auth_config: Arc<ClientAuthConfig>,
#[allow(dead_code)]
private_key: Arc<PrivateKey>,
#[allow(dead_code)]
username: String,
shutdown_tx: tokio::sync::watch::Sender<bool>,
shutdown_rx: tokio::sync::watch::Receiver<bool>,
}
impl<T: Transport> ClientSession<T> {
pub async fn new(
opts: ConnectOptions,
transport: Arc<T>,
) -> Result<Self, ConnectError> {
opts.validate().map_err(ConnectError::Config)?;
let auth_config = Arc::new(
ClientAuthConfig::from_key_source(opts.identity.clone())
.map_err(ConnectError::Config)?,
);
let private_key = auth_config.private_key();
let username = derive_username();
let handler = ClientHandler::from_config(&auth_config);
let stream = transport.connect().await.map_err(|e| {
error!("transport connect failed: {e}");
ConnectError::ConnectionFailed
})?;
let config = Arc::new(client::Config::default());
let mut handle = client::connect_stream(config, stream, handler)
.await
.map_err(|e| {
error!("SSH connect failed: {e}");
ConnectError::ConnectionFailed
})?;
let auth_ok = auth_config
.authenticate(&mut handle, &username)
.await
.map_err(|_| ConnectError::AuthFailed)?;
if !auth_ok {
return Err(ConnectError::AuthFailed);
}
let handle = Arc::new(Mutex::new(handle));
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
Ok(Self {
opts,
transport,
handle,
auth_config,
private_key,
username,
shutdown_tx,
shutdown_rx,
})
}
pub fn handle(&self) -> Arc<Mutex<client::Handle<ClientHandler>>> {
Arc::clone(&self.handle)
}
pub fn auth_config(&self) -> &Arc<ClientAuthConfig> {
&self.auth_config
}
pub fn transport(&self) -> &Arc<T> {
&self.transport
}
pub fn options(&self) -> &ConnectOptions {
&self.opts
}
pub fn shutdown_sender(&self) -> tokio::sync::watch::Sender<bool> {
self.shutdown_tx.clone()
}
pub async fn run(self) -> Result<(), ConnectError> {
let socks5_addr: SocketAddr = self.opts.socks5_addr.parse().map_err(|_| {
ConnectError::Config(ConfigError::InvalidFlag {
name: format!("invalid SOCKS5 address: {}", self.opts.socks5_addr),
})
})?;
let channel_opener = HandleChannelOpener::from_arc(Arc::clone(&self.handle));
let socks5_server = Socks5Server::with_addr(channel_opener, &socks5_addr.to_string());
let socks5_listen = socks5_server.listen_addr();
let local_forwarders = build_local_forwarders(&self.opts)?;
let remote_specs = build_remote_specs(&self.opts)?;
for spec in &remote_specs {
let remote_forwarder = RemoteForwarder::new(spec.clone())
.map_err(|_| ConnectError::ForwardFailed)?;
let mut h = self.handle.lock().await;
remote_forwarder
.register(&mut h)
.await
.map_err(|_| {
warn!("failed to register remote forward {}", spec);
ConnectError::ForwardFailed
})?;
info!("registered remote forward: {}", spec);
}
let socks5_task = tokio::spawn(async move {
debug!("SOCKS5 server starting on {}", socks5_listen);
if let Err(e) = socks5_server.run().await {
error!("SOCKS5 server error: {e}");
}
});
let fwd_handle = Arc::clone(&self.handle);
let fwd_shutdown = self.shutdown_rx.clone();
let forward_task = tokio::spawn(async move {
crate::client::forward::run_local_forwarders(
local_forwarders, fwd_handle, fwd_shutdown,
)
.await;
});
info!("wraith client running: SOCKS5 on {}", socks5_listen);
#[cfg(unix)]
let signal_done = {
let sig_tx = self.shutdown_tx.clone();
tokio::spawn(async move {
let mut sigterm_stream =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to install SIGTERM handler");
tokio::select! {
_ = sigterm_stream.recv() => {
info!("received SIGTERM");
}
_ = tokio::signal::ctrl_c() => {
info!("received SIGINT (Ctrl+C)");
}
}
let _ = sig_tx.send(true);
})
};
let mut wait_shutdown = self.shutdown_rx.clone();
tokio::select! {
_ = wait_shutdown.changed() => {
if *wait_shutdown.borrow() {
info!("shutdown signal received");
}
}
_ = socks5_task => {
warn!("SOCKS5 server exited unexpectedly");
}
}
#[cfg(unix)]
signal_done.abort();
self.shutdown().await?;
forward_task.abort();
let _ = forward_task.await;
Ok(())
}
pub async fn shutdown(&self) -> Result<(), ConnectError> {
info!("initiating graceful shutdown");
let _ = self.shutdown_tx.send(true);
{
let handle = self.handle.lock().await;
if !handle.is_closed() {
if let Err(e) = handle
.disconnect(russh::Disconnect::ByApplication, "shutdown", "")
.await
{
warn!("failed to send SSH disconnect: {e}");
}
}
}
tokio::time::sleep(DRAIN_TIMEOUT).await;
info!("graceful shutdown complete");
Ok(())
}
}
fn derive_username() -> String {
std::env::var("USER")
.or_else(|_| std::env::var("USERNAME"))
.unwrap_or_else(|_| "wraith".to_string())
}
fn build_local_forwarders(opts: &ConnectOptions) -> Result<Vec<LocalForwarder>, ConnectError> {
let mut forwarders = Vec::new();
for spec_str in &opts.forwards {
let spec = PortForwardSpec::local(spec_str).map_err(|e| {
warn!("invalid local forward spec '{}': {}", spec_str, e);
ConnectError::Config(ConfigError::InvalidFlag {
name: format!("invalid forward spec: {}", spec_str),
})
})?;
forwarders.push(
LocalForwarder::new(spec).map_err(|e| {
warn!("failed to create local forwarder: {}", e);
ConnectError::ForwardFailed
})?,
);
}
Ok(forwarders)
}
fn build_remote_specs(opts: &ConnectOptions) -> Result<Vec<PortForwardSpec>, ConnectError> {
let mut specs = Vec::new();
for spec_str in &opts.remote_forwards {
let spec = PortForwardSpec::remote(spec_str).map_err(|e| {
warn!("invalid remote forward spec '{}': {}", spec_str, e);
ConnectError::Config(ConfigError::InvalidFlag {
name: format!("invalid remote forward spec: {}", spec_str),
})
})?;
specs.push(spec);
}
Ok(specs)
}
#[derive(Debug, thiserror::Error)]
pub enum ConnectError {
#[error("connection failed")]
ConnectionFailed,
#[error("authentication failed")]
AuthFailed,
#[error("forward setup failed")]
ForwardFailed,
#[error("config error: {0}")]
Config(#[from] ConfigError),
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::io::duplex;
const ED25519_PRIVATE_KEY: &str = "-----BEGIN OPENSSH PRIVATE KEY-----\nb3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW\nQyNTUxOQAAACBOfInDyRS33JEeDNT8xd10qRdwFN8z/QukCOgEIkv01QAAAJiQ+NvMkPjb\nzAAAAAtzc2gtZWQyNTUxOQAAACBOfInDyRS33JEeDNT8xd10qRdwFN8z/QukCOgEIkv01Q\nAAAECIWwJf7+7MOuZAOOWmoQbE9i/5GxjKsFrtJHjZ34E/fk58icPJFLfckR4M1PzF3XSp\nF3AU3zP9C6QI6AQiS/TVAAAAD3VidW50dUBuczUyODA5NgECAwQFBg==\n-----END OPENSSH PRIVATE KEY-----\n";
fn make_identity() -> KeySource {
KeySource::Memory(ED25519_PRIVATE_KEY.as_bytes().to_vec())
}
#[test]
fn connect_options_default_fields() {
let opts = ConnectOptions::new(make_identity());
assert!(opts.server.is_none());
assert!(opts.peer.is_none());
assert_eq!(opts.transport_mode, TransportMode::Tcp);
assert_eq!(opts.socks5_addr, "127.0.0.1:1080");
assert!(opts.forwards.is_empty());
assert!(opts.remote_forwards.is_empty());
assert!(opts.proxy.is_none());
assert!(opts.iroh_relay.is_none());
assert!(opts.tls_server_name.is_none());
assert!(!opts.insecure);
}
#[test]
fn connect_options_builder_pattern() {
let opts = ConnectOptions::new(make_identity())
.server("example.com:22")
.transport_mode(TransportMode::Tls)
.socks5_addr("127.0.0.1:9050")
.forward("127.0.0.1:5432:db:5432")
.remote_forward("0.0.0.0:8080:127.0.0.1:3000")
.proxy("socks5://127.0.0.1:1080")
.iroh_relay("https://relay.example.com")
.tls_server_name("wraith.test")
.insecure(true);
assert_eq!(opts.server.as_deref(), Some("example.com:22"));
assert_eq!(opts.transport_mode, TransportMode::Tls);
assert_eq!(opts.socks5_addr, "127.0.0.1:9050");
assert_eq!(opts.forwards.len(), 1);
assert_eq!(opts.remote_forwards.len(), 1);
assert_eq!(opts.proxy.as_deref(), Some("socks5://127.0.0.1:1080"));
assert_eq!(opts.iroh_relay.as_deref(), Some("https://relay.example.com"));
assert_eq!(opts.tls_server_name.as_deref(), Some("wraith.test"));
assert!(opts.insecure);
}
#[test]
fn connect_options_validate_tcp_requires_server() {
let opts = ConnectOptions::new(make_identity()).transport_mode(TransportMode::Tcp);
assert!(opts.validate().is_err());
}
#[test]
fn connect_options_validate_tcp_with_server_ok() {
let opts = ConnectOptions::new(make_identity()).server("example.com:22");
assert!(opts.validate().is_ok());
}
#[test]
fn connect_options_validate_tls_requires_server() {
let opts = ConnectOptions::new(make_identity()).transport_mode(TransportMode::Tls);
assert!(opts.validate().is_err());
}
#[test]
fn connect_options_validate_tls_with_server_ok() {
let opts = ConnectOptions::new(make_identity())
.transport_mode(TransportMode::Tls)
.server("example.com:443");
assert!(opts.validate().is_ok());
}
#[test]
fn connect_options_validate_iroh_requires_peer() {
let opts = ConnectOptions::new(make_identity()).transport_mode(TransportMode::Iroh);
assert!(opts.validate().is_err());
}
#[test]
fn connect_options_validate_iroh_with_peer_ok() {
let opts = ConnectOptions::new(make_identity())
.transport_mode(TransportMode::Iroh)
.peer("some-endpoint-id");
assert!(opts.validate().is_ok());
}
#[test]
fn identity_accepts_key_source_file() {
let file_source = KeySource::File(std::path::PathBuf::from("/path/to/key"));
let opts = ConnectOptions::new(file_source);
match &opts.identity {
KeySource::File(p) => assert_eq!(p, &std::path::PathBuf::from("/path/to/key")),
_ => panic!("expected File variant"),
}
}
#[test]
fn identity_accepts_key_source_memory() {
let mem_source = KeySource::Memory(b"key-data".to_vec());
let opts = ConnectOptions::new(mem_source);
match &opts.identity {
KeySource::Memory(d) => assert_eq!(d, b"key-data"),
_ => panic!("expected Memory variant"),
}
}
#[test]
fn transport_mode_display() {
assert_eq!(TransportMode::Tcp.to_string(), "tcp");
assert_eq!(TransportMode::Tls.to_string(), "tls");
assert_eq!(TransportMode::Iroh.to_string(), "iroh");
}
#[test]
fn connect_error_variants() {
assert_eq!(ConnectError::ConnectionFailed.to_string(), "connection failed");
assert_eq!(ConnectError::AuthFailed.to_string(), "authentication failed");
assert_eq!(ConnectError::ForwardFailed.to_string(), "forward setup failed");
}
#[test]
fn connect_options_debug_redacts_identity() {
let opts = ConnectOptions::new(make_identity());
let debug_str = format!("{:?}", opts);
assert!(debug_str.contains("<KeySource>"));
assert!(!debug_str.contains("OPENSSH"));
}
struct FailTransport;
#[async_trait::async_trait]
impl Transport for FailTransport {
type Stream = tokio::io::DuplexStream;
async fn connect(&self) -> anyhow::Result<Self::Stream> {
Err(anyhow::anyhow!("always fails"))
}
fn describe(&self) -> String {
"fail".to_string()
}
}
struct DuplexTransport {
connect_count: Arc<AtomicUsize>,
}
#[async_trait::async_trait]
impl Transport for DuplexTransport {
type Stream = tokio::io::DuplexStream;
async fn connect(&self) -> anyhow::Result<Self::Stream> {
self.connect_count.fetch_add(1, Ordering::SeqCst);
let (client, _) = duplex(4096);
Ok(client)
}
fn describe(&self) -> String {
"duplex".to_string()
}
}
#[tokio::test]
async fn client_session_new_transport_fails() {
let opts = ConnectOptions::new(make_identity()).server("example.com:22");
let transport = Arc::new(FailTransport);
let result = ClientSession::new(opts, transport).await;
assert!(result.is_err());
assert!(matches!(result.err().unwrap(), ConnectError::ConnectionFailed));
}
#[tokio::test]
async fn client_session_new_ssh_handshake_fails() {
let transport = Arc::new(DuplexTransport {
connect_count: Arc::new(AtomicUsize::new(0)),
});
let opts = ConnectOptions::new(make_identity()).server("example.com:22");
let result = ClientSession::new(opts, transport).await;
assert!(result.is_err());
assert!(matches!(result.err().unwrap(), ConnectError::ConnectionFailed));
}
#[test]
fn build_local_forwarders_empty() {
let opts = ConnectOptions::new(make_identity());
let result = build_local_forwarders(&opts);
assert!(result.is_ok());
assert!(result.unwrap().is_empty());
}
#[test]
fn build_local_forwarders_valid() {
let opts = ConnectOptions::new(make_identity()).forward("127.0.0.1:5432:db:5432");
let result = build_local_forwarders(&opts);
assert!(result.is_ok());
assert_eq!(result.unwrap().len(), 1);
}
#[test]
fn build_local_forwarders_invalid_spec() {
let opts = ConnectOptions::new(make_identity()).forward("bad-spec");
let result = build_local_forwarders(&opts);
assert!(result.is_err());
}
#[test]
fn build_remote_specs_empty() {
let opts = ConnectOptions::new(make_identity());
let result = build_remote_specs(&opts);
assert!(result.is_ok());
assert!(result.unwrap().is_empty());
}
#[test]
fn build_remote_specs_valid() {
let opts = ConnectOptions::new(make_identity()).remote_forward("0.0.0.0:8080:127.0.0.1:3000");
let result = build_remote_specs(&opts);
assert!(result.is_ok());
assert_eq!(result.unwrap().len(), 1);
}
#[test]
fn build_remote_specs_invalid() {
let opts = ConnectOptions::new(make_identity()).remote_forward("bad");
let result = build_remote_specs(&opts);
assert!(result.is_err());
}
#[test]
fn default_socks5_addr() {
assert_eq!(DEFAULT_SOCKS5_ADDR, "127.0.0.1:1080");
}
#[test]
fn drain_timeout_is_two_seconds() {
assert_eq!(DRAIN_TIMEOUT, Duration::from_secs(2));
}
#[test]
fn transport_mode_equality() {
assert_eq!(TransportMode::Tcp, TransportMode::Tcp);
assert_ne!(TransportMode::Tcp, TransportMode::Tls);
assert_ne!(TransportMode::Tls, TransportMode::Iroh);
}
#[tokio::test]
async fn shutdown_sends_disconnect_and_drains() {
let transport = Arc::new(DuplexTransport {
connect_count: Arc::new(AtomicUsize::new(0)),
});
let opts = ConnectOptions::new(make_identity()).server("example.com:22");
let result = ClientSession::new(opts, transport).await;
assert!(result.is_err());
}
#[test]
fn socks5_is_always_enabled_by_default() {
let opts = ConnectOptions::new(make_identity());
assert!(!opts.socks5_addr.is_empty());
}
#[tokio::test]
async fn integration_mock_transport_session() {
use crate::socks5::{ChannelOpener, ChannelOpenError};
use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex};
use tokio::net::{TcpListener, TcpStream};
struct MockOpener;
impl ChannelOpener for MockOpener {
type Stream = tokio::io::DuplexStream;
async fn open_channel(
&self,
_host: String,
_port: u16,
) -> Result<Self::Stream, ChannelOpenError> {
let (client, _server) = duplex(4096);
Ok(client)
}
}
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let bound_addr = listener.local_addr().unwrap();
drop(listener);
let opener = MockOpener;
let server = Socks5Server::with_addr(opener, &bound_addr.to_string());
let _server_task = tokio::spawn(async move {
let _ = server.run().await;
});
tokio::time::sleep(Duration::from_millis(50)).await;
let mut conn = TcpStream::connect(bound_addr).await.unwrap();
let greeting = [0x05, 0x01, 0x00];
conn.write_all(&greeting).await.unwrap();
let mut auth_resp = [0u8; 2];
conn.read_exact(&mut auth_resp).await.unwrap();
assert_eq!(auth_resp, [0x05, 0x00]);
let connect_req = [
0x05, 0x01, 0x00, 0x01, 127, 0, 0, 1, 0, 80,
];
conn.write_all(&connect_req).await.unwrap();
let mut reply = [0u8; 10];
conn.read_exact(&mut reply).await.unwrap();
assert_eq!(reply[1], 0x00);
conn.write_all(b"test data").await.unwrap();
conn.shutdown().await.unwrap();
}
}

View File

@@ -0,0 +1,530 @@
use std::net::SocketAddr;
use std::sync::Arc;
use russh::client;
use tokio::io;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;
use tracing::{debug, error, info};
use crate::error::ForwardError;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PortForwardSpecKind {
Local,
Remote,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PortForwardSpec {
pub kind: PortForwardSpecKind,
pub bind_addr: String,
pub bind_port: u16,
pub target_host: String,
pub target_port: u16,
}
impl PortForwardSpec {
pub fn local(spec: &str) -> Result<Self, ForwardError> {
let (bind_addr, bind_port, target_host, target_port) = parse_spec(spec)?;
Ok(Self {
kind: PortForwardSpecKind::Local,
bind_addr,
bind_port,
target_host,
target_port,
})
}
pub fn remote(spec: &str) -> Result<Self, ForwardError> {
let (bind_addr, bind_port, target_host, target_port) = parse_spec(spec)?;
Ok(Self {
kind: PortForwardSpecKind::Remote,
bind_addr,
bind_port,
target_host,
target_port,
})
}
pub fn listen_addr(&self) -> Result<SocketAddr, ForwardError> {
format!("{}:{}", self.bind_addr, self.bind_port)
.parse()
.map_err(|_| ForwardError::InvalidSpec {
spec: format!("{}:{}", self.bind_addr, self.bind_port),
})
}
pub fn target_addr(&self) -> Result<SocketAddr, ForwardError> {
format!("{}:{}", self.target_host, self.target_port)
.parse()
.map_err(|_| ForwardError::InvalidSpec {
spec: format!("{}:{}", self.target_host, self.target_port),
})
}
}
impl std::fmt::Display for PortForwardSpec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let prefix = match self.kind {
PortForwardSpecKind::Local => "-L",
PortForwardSpecKind::Remote => "-R",
};
write!(
f,
"{} {}:{}:{}:{}",
prefix, self.bind_addr, self.bind_port, self.target_host, self.target_port
)
}
}
fn parse_spec(spec: &str) -> Result<(String, u16, String, u16), ForwardError> {
let parts: Vec<&str> = spec.split(':').collect();
if parts.len() != 4 {
return Err(ForwardError::InvalidSpec {
spec: spec.to_string(),
});
}
let bind_addr = parts[0].to_string();
let bind_port: u16 = parts[1].parse().map_err(|_| ForwardError::InvalidSpec {
spec: spec.to_string(),
})?;
let target_host = parts[2].to_string();
let target_port: u16 = parts[3].parse().map_err(|_| ForwardError::InvalidSpec {
spec: spec.to_string(),
})?;
Ok((bind_addr, bind_port, target_host, target_port))
}
pub struct LocalForwarder {
spec: PortForwardSpec,
listener: Option<TcpListener>,
}
impl LocalForwarder {
pub fn new(spec: PortForwardSpec) -> Result<Self, ForwardError> {
if spec.kind != PortForwardSpecKind::Local {
return Err(ForwardError::InvalidSpec {
spec: format!("expected local spec, got {:?}", spec.kind),
});
}
Ok(Self {
spec,
listener: None,
})
}
pub fn spec(&self) -> &PortForwardSpec {
&self.spec
}
pub async fn run<H: client::Handler + Send + 'static>(
&mut self,
handle: Arc<Mutex<client::Handle<H>>>,
) -> Result<(), ForwardError> {
let listen_addr = self.spec.listen_addr()?;
let listener = TcpListener::bind(listen_addr)
.await
.map_err(|e| ForwardError::BindFailed { source: e })?;
self.listener = Some(listener);
let remote_host = self.spec.target_host.clone();
let remote_port = self.spec.target_port;
info!(
"local forward listening on {} -> {}:{}",
listen_addr, remote_host, remote_port
);
loop {
let listener = match &self.listener {
Some(l) => l,
None => return Ok(()),
};
let accept_result = listener.accept().await;
let (local_stream, local_addr) = match accept_result {
Ok(conn) => conn,
Err(e) => {
let handle = handle.lock().await;
if handle.is_closed() {
debug!("local forward accept loop ending: ssh session closed");
return Ok(());
}
drop(handle);
error!("local forward accept error: {}", e);
continue;
}
};
debug!(
"local forward connection from {} -> {}:{}",
local_addr, remote_host, remote_port
);
let handle = handle.clone();
let remote_host = remote_host.clone();
tokio::spawn(async move {
if let Err(e) =
proxy_local_to_remote(local_stream, handle, &remote_host, remote_port).await
{
debug!("local forward proxy error: {}", e);
}
});
}
}
pub async fn stop(&mut self) {
if let Some(listener) = self.listener.take() {
drop(listener);
}
}
pub fn local_port(&self) -> u16 {
self.spec.bind_port
}
}
async fn proxy_local_to_remote<H: client::Handler + Send + 'static>(
local_stream: TcpStream,
handle: Arc<Mutex<client::Handle<H>>>,
remote_host: &str,
remote_port: u16,
) -> Result<(), ForwardError> {
let local_addr = local_stream
.peer_addr()
.map(|a| a.to_string())
.unwrap_or_default();
let handle_guard = handle.lock().await;
let channel = handle_guard
.channel_open_direct_tcpip(
remote_host,
remote_port as u32,
&local_addr,
0,
)
.await
.map_err(|e| ForwardError::ChannelOpenFailed {
source: Box::new(e) as _,
})?;
drop(handle_guard);
let ssh_stream = channel.into_stream();
let (mut ssh_read, mut ssh_write) = tokio::io::split(ssh_stream);
let (mut local_read, mut local_write) = tokio::io::split(local_stream);
let client_to_server = io::copy(&mut local_read, &mut ssh_write);
let server_to_client = io::copy(&mut ssh_read, &mut local_write);
match tokio::join!(client_to_server, server_to_client) {
(Err(e), _) | (_, Err(e)) => {
debug!("local forward bidirectional copy error: {}", e);
}
_ => {}
}
Ok(())
}
pub struct RemoteForwarder {
spec: PortForwardSpec,
cancel: Option<tokio::sync::oneshot::Sender<()>>,
}
impl RemoteForwarder {
pub fn new(spec: PortForwardSpec) -> Result<Self, ForwardError> {
if spec.kind != PortForwardSpecKind::Remote {
return Err(ForwardError::InvalidSpec {
spec: format!("expected remote spec, got {:?}", spec.kind),
});
}
Ok(Self { spec, cancel: None })
}
pub fn spec(&self) -> &PortForwardSpec {
&self.spec
}
pub async fn register<H: client::Handler + Send + 'static>(
&self,
handle: &mut client::Handle<H>,
) -> Result<u32, ForwardError> {
let port = handle
.tcpip_forward(&self.spec.bind_addr, self.spec.bind_port as u32)
.await
.map_err(|e| ForwardError::ChannelOpenFailed {
source: Box::new(e) as _,
})?;
Ok(port)
}
pub async fn handle_forwarded_channel(
channel: russh::Channel<russh::client::Msg>,
connected_address: &str,
connected_port: u32,
local_host: &str,
local_port: u16,
) {
debug!(
"remote forward: server opened forwarded-tcpip channel to {}:{} -> local {}:{}",
connected_address, connected_port, local_host, local_port
);
let local_target = format!("{}:{}", local_host, local_port);
let local_stream = match TcpStream::connect(&local_target).await {
Ok(s) => s,
Err(e) => {
error!(
"remote forward: failed to connect to local target {}: {}",
local_target, e
);
return;
}
};
let ssh_stream = channel.into_stream();
let (mut ssh_read, mut ssh_write) = tokio::io::split(ssh_stream);
let (mut local_read, mut local_write) = tokio::io::split(local_stream);
let client_to_server = io::copy(&mut local_read, &mut ssh_write);
let server_to_client = io::copy(&mut ssh_read, &mut local_write);
match tokio::join!(client_to_server, server_to_client) {
(Err(e), _) | (_, Err(e)) => {
debug!("remote forward bidirectional copy error: {}", e);
}
_ => {}
}
}
pub async fn unregister<H: client::Handler + Send + 'static>(
&self,
handle: &client::Handle<H>,
) -> Result<(), ForwardError> {
handle
.cancel_tcpip_forward(&self.spec.bind_addr, self.spec.bind_port as u32)
.await
.map_err(|e| ForwardError::ChannelOpenFailed {
source: Box::new(e) as _,
})?;
Ok(())
}
pub async fn stop(&mut self) {
if let Some(cancel) = self.cancel.take() {
let _ = cancel.send(());
}
}
}
pub async fn run_local_forwarders<H: client::Handler + Send + 'static>(
forwarders: Vec<LocalForwarder>,
handle: Arc<Mutex<client::Handle<H>>>,
mut shutdown: tokio::sync::watch::Receiver<bool>,
) -> Vec<LocalForwarder> {
let mut forwarders = forwarders;
let mut tasks = Vec::new();
for forwarder in forwarders.drain(..) {
let handle = handle.clone();
let spec = forwarder.spec().clone();
let (_cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
tasks.push(tokio::spawn(async move {
let mut fwd = forwarder;
tokio::select! {
result = fwd.run(handle) => {
if let Err(e) = result {
error!("local forward {} failed: {}", spec, e);
}
}
_ = cancel_rx => {
fwd.stop().await;
}
}
fwd
}));
}
let _ = shutdown.changed().await;
for task in &tasks {
task.abort();
}
let mut results = Vec::new();
for task in tasks {
match task.await {
Ok(fwd) => results.push(fwd),
Err(e) => {
if !e.is_cancelled() {
error!("local forwarder task panicked: {}", e);
}
}
}
}
results
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_local_spec() {
let spec = PortForwardSpec::local("127.0.0.1:5432:db.internal:5432").unwrap();
assert_eq!(spec.kind, PortForwardSpecKind::Local);
assert_eq!(spec.bind_addr, "127.0.0.1");
assert_eq!(spec.bind_port, 5432);
assert_eq!(spec.target_host, "db.internal");
assert_eq!(spec.target_port, 5432);
}
#[test]
fn parse_remote_spec() {
let spec = PortForwardSpec::remote("0.0.0.0:8080:127.0.0.1:3000").unwrap();
assert_eq!(spec.kind, PortForwardSpecKind::Remote);
assert_eq!(spec.bind_addr, "0.0.0.0");
assert_eq!(spec.bind_port, 8080);
assert_eq!(spec.target_host, "127.0.0.1");
assert_eq!(spec.target_port, 3000);
}
#[test]
fn parse_spec_invalid_few_parts() {
assert!(PortForwardSpec::local("127.0.0.1:5432:db").is_err());
}
#[test]
fn parse_spec_invalid_many_parts() {
assert!(PortForwardSpec::local("a:b:c:d:e").is_err());
}
#[test]
fn parse_spec_invalid_port() {
assert!(PortForwardSpec::local("127.0.0.1:abc:db:5432").is_err());
}
#[test]
fn parse_spec_invalid_target_port() {
assert!(PortForwardSpec::local("127.0.0.1:5432:db:abc").is_err());
}
#[test]
fn spec_display() {
let spec = PortForwardSpec::local("127.0.0.1:5432:db.internal:5432").unwrap();
assert_eq!(spec.to_string(), "-L 127.0.0.1:5432:db.internal:5432");
}
#[test]
fn spec_display_remote() {
let spec = PortForwardSpec::remote("0.0.0.0:8080:127.0.0.1:3000").unwrap();
assert_eq!(spec.to_string(), "-R 0.0.0.0:8080:127.0.0.1:3000");
}
#[test]
fn local_forwarder_rejects_remote_spec() {
let spec = PortForwardSpec::remote("0.0.0.0:8080:127.0.0.1:3000").unwrap();
assert!(LocalForwarder::new(spec).is_err());
}
#[test]
fn remote_forwarder_rejects_local_spec() {
let spec = PortForwardSpec::local("127.0.0.1:5432:db.internal:5432").unwrap();
assert!(RemoteForwarder::new(spec).is_err());
}
#[test]
fn listen_addr_valid() {
let spec = PortForwardSpec::local("127.0.0.1:5432:db.internal:5432").unwrap();
let addr = spec.listen_addr().unwrap();
assert_eq!(addr.port(), 5432);
}
#[test]
fn listen_addr_invalid_host() {
let spec = PortForwardSpec {
kind: PortForwardSpecKind::Local,
bind_addr: "!!!invalid".to_string(),
bind_port: 5432,
target_host: "db".to_string(),
target_port: 5432,
};
assert!(spec.listen_addr().is_err());
}
#[tokio::test]
async fn local_forward_bind_and_accept() {
let spec = PortForwardSpec::local(&format!("127.0.0.1:0:remote:5432")).unwrap();
let forwarder = LocalForwarder::new(spec).unwrap();
let listen_addr = forwarder.spec.listen_addr().unwrap();
let listener = TcpListener::bind(listen_addr).await.unwrap();
let bound_addr = listener.local_addr().unwrap();
drop(listener);
let spec = PortForwardSpec::local(&format!(
"127.0.0.1:{}:remote:5432",
bound_addr.port()
))
.unwrap();
let forwarder = LocalForwarder::new(spec).unwrap();
assert_eq!(forwarder.local_port(), bound_addr.port());
}
#[tokio::test]
async fn remote_forward_proxy_bidirectional() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let echo_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let _echo_addr = echo_listener.local_addr().unwrap();
let echo_server = tokio::spawn(async move {
let (mut stream, _) = echo_listener.accept().await.unwrap();
let mut buf = [0u8; 64];
loop {
let n = match stream.read(&mut buf).await {
Ok(0) => break,
Ok(n) => n,
Err(_) => break,
};
if stream.write_all(&buf[..n]).await.is_err() {
break;
}
}
});
let local_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = local_listener.local_addr().unwrap();
let proxy_task = tokio::spawn(async move {
let (stream, _) = local_listener.accept().await.unwrap();
let (mut read, mut write) = tokio::io::split(stream);
let _ = io::copy(&mut read, &mut write).await;
});
let mut local_conn = TcpStream::connect(local_addr).await.unwrap();
local_conn.write_all(b"hello").await.unwrap();
let mut buf = [0u8; 64];
let n = local_conn.read(&mut buf).await.unwrap();
assert_eq!(&buf[..n], b"hello");
echo_server.abort();
proxy_task.abort();
}
#[test]
fn forwarder_spec_access() {
let spec = PortForwardSpec::local("127.0.0.1:5432:db.internal:5432").unwrap();
let forwarder = LocalForwarder::new(spec.clone()).unwrap();
assert_eq!(forwarder.spec(), &spec);
assert_eq!(forwarder.local_port(), 5432);
}
#[test]
fn remote_forwarder_spec_access() {
let spec = PortForwardSpec::remote("0.0.0.0:8080:127.0.0.1:3000").unwrap();
let forwarder = RemoteForwarder::new(spec.clone()).unwrap();
assert_eq!(forwarder.spec(), &spec);
}
}

View File

@@ -1,3 +1,7 @@
pub mod channel_manager;
pub mod connect;
pub mod forward;
pub use channel_manager::{ChannelManager, ForwardRequest};
pub use channel_manager::{ChannelManager, ForwardRequest};
pub use connect::{ClientSession, ConnectError, ConnectOptions, TransportMode};
pub use forward::{LocalForwarder, PortForwardSpec, PortForwardSpecKind, RemoteForwarder};

View File

@@ -60,6 +60,22 @@ pub enum ConfigError {
IncompatibleOptions,
}
#[derive(Debug, thiserror::Error)]
pub enum ForwardError {
#[error("invalid forward specification: {spec}")]
InvalidSpec { spec: String },
#[error("bind failed")]
BindFailed {
#[source]
source: io::Error,
},
#[error("channel open failed")]
ChannelOpenFailed {
#[source]
source: Box<dyn std::error::Error + Send + Sync>,
},
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -8,6 +8,7 @@ pub mod error;
#[cfg(feature = "testutil")]
pub mod testutil;
pub use error::{AuthError, ChannelError, ConfigError, TransportError};
pub use error::{AuthError, ChannelError, ConfigError, ForwardError, TransportError};
pub use transport::{Transport, TransportAcceptor, TransportInfo, TransportKind};
pub use client::channel_manager::{ChannelManager, ForwardRequest};
pub use client::channel_manager::{ChannelManager, ForwardRequest};
pub use client::connect::{ClientSession, ConnectError, ConnectOptions, TransportMode};

View File

@@ -0,0 +1,186 @@
use std::io;
use async_trait::async_trait;
use tokio::io::{AsyncRead, AsyncWrite};
pub const WRAITH_CONTROL_DESTINATION: &str = "wraith-control";
pub const WRAITH_PREFIX: &str = "wraith-";
pub fn is_reserved_destination(host: &str) -> bool {
host.starts_with(WRAITH_PREFIX)
}
pub trait DuplexStream: AsyncRead + AsyncWrite + Unpin + Send {}
impl<T: AsyncRead + AsyncWrite + Unpin + Send> DuplexStream for T {}
#[async_trait]
pub trait ControlChannelHandler: Send + Sync {
async fn handle_channel(&self, stream: Box<dyn DuplexStream>);
}
pub struct ControlChannelRouter {
handler: Option<Box<dyn ControlChannelHandler>>,
}
impl ControlChannelRouter {
pub fn new(handler: Option<Box<dyn ControlChannelHandler>>) -> Self {
Self { handler }
}
pub fn without_handler() -> Self {
Self { handler: None }
}
pub fn with_handler(handler: Box<dyn ControlChannelHandler>) -> Self {
Self {
handler: Some(handler),
}
}
pub fn has_handler(&self) -> bool {
self.handler.is_some()
}
pub async fn route(&self, stream: Box<dyn DuplexStream>) -> io::Result<()> {
match &self.handler {
Some(handler) => {
handler.handle_channel(stream).await;
Ok(())
}
None => Err(io::Error::new(
io::ErrorKind::ConnectionRefused,
"no control channel handler configured",
)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::duplex;
#[test]
fn wraith_control_destination_constant() {
assert_eq!(WRAITH_CONTROL_DESTINATION, "wraith-control");
}
#[test]
fn wraith_prefix_constant() {
assert_eq!(WRAITH_PREFIX, "wraith-");
}
#[test]
fn reserved_destination_detected() {
assert!(is_reserved_destination("wraith-control"));
assert!(is_reserved_destination("wraith-status"));
assert!(is_reserved_destination("wraith-events"));
assert!(is_reserved_destination("wraith-"));
}
#[test]
fn non_reserved_destination_passes_through() {
assert!(!is_reserved_destination("example.com"));
assert!(!is_reserved_destination("localhost"));
assert!(!is_reserved_destination("192.168.1.1"));
assert!(!is_reserved_destination("wraith.example.com"));
assert!(!is_reserved_destination(""));
assert!(!is_reserved_destination("wrait-control"));
assert!(!is_reserved_destination("WRAITH-control"));
}
#[test]
fn prefix_matching_case_sensitive() {
assert!(!is_reserved_destination("Wraith-control"));
assert!(!is_reserved_destination("WRAITH-control"));
assert!(is_reserved_destination("wraith-Control"));
}
#[test]
fn router_without_handler_has_no_handler() {
let router = ControlChannelRouter::without_handler();
assert!(!router.has_handler());
}
#[test]
fn router_with_handler_has_handler() {
struct DummyHandler;
#[async_trait]
impl ControlChannelHandler for DummyHandler {
async fn handle_channel(&self, _stream: Box<dyn DuplexStream>) {}
}
let router = ControlChannelRouter::with_handler(Box::new(DummyHandler));
assert!(router.has_handler());
}
#[tokio::test]
async fn route_without_handler_returns_error() {
let router = ControlChannelRouter::without_handler();
let (_client, server) = duplex(64);
let stream: Box<dyn DuplexStream> = Box::new(server);
let result = router.route(stream).await;
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::ConnectionRefused);
}
#[tokio::test]
async fn route_with_handler_succeeds() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
struct TrackedHandler {
called: Arc<AtomicBool>,
}
#[async_trait]
impl ControlChannelHandler for TrackedHandler {
async fn handle_channel(&self, _stream: Box<dyn DuplexStream>) {
self.called.store(true, Ordering::SeqCst);
}
}
let called = Arc::new(AtomicBool::new(false));
let handler = TrackedHandler {
called: called.clone(),
};
let router = ControlChannelRouter::with_handler(Box::new(handler));
let (_client, server) = duplex(64);
let stream: Box<dyn DuplexStream> = Box::new(server);
let result = router.route(stream).await;
assert!(result.is_ok());
assert!(called.load(Ordering::SeqCst));
}
#[tokio::test]
async fn route_with_handler_can_read_write() {
struct EchoHandler;
#[async_trait]
impl ControlChannelHandler for EchoHandler {
async fn handle_channel(&self, mut stream: Box<dyn DuplexStream>) {
let mut buf = [0u8; 64];
let n = stream.read(&mut buf).await.unwrap();
stream.write_all(&buf[..n]).await.unwrap();
}
}
let router = ControlChannelRouter::with_handler(Box::new(EchoHandler));
let (client, server) = duplex(64);
let stream: Box<dyn DuplexStream> = Box::new(server);
tokio::spawn(async move {
router.route(stream).await.unwrap();
});
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut client = client;
client.write_all(b"hello").await.unwrap();
let mut buf = [0u8; 5];
client.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"hello");
}
#[test]
fn control_channel_destination_matches_prefix() {
assert!(is_reserved_destination(WRAITH_CONTROL_DESTINATION));
}
}

View File

@@ -0,0 +1,336 @@
use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
use russh::keys::ssh_key::HashAlg;
use russh::server::{Auth, Handler, Msg, Session};
use russh::Channel;
use crate::auth::ServerAuthConfig;
use crate::server::control_channel::{
ControlChannelHandler, ControlChannelRouter, WRAITH_PREFIX,
};
#[derive(Debug, Clone)]
pub enum ProxyMode {
Direct,
Socks5(SocketAddr),
HttpConnect(SocketAddr),
}
#[derive(Debug, Clone)]
pub struct ProxyConfig {
pub mode: ProxyMode,
}
pub struct ServerHandler {
auth_config: Arc<ServerAuthConfig>,
outbound_proxy: Option<ProxyConfig>,
remote_addr: Option<SocketAddr>,
control_channel_router: ControlChannelRouter,
}
impl ServerHandler {
pub fn new(
auth_config: Arc<ServerAuthConfig>,
outbound_proxy: Option<ProxyConfig>,
remote_addr: Option<SocketAddr>,
) -> Self {
Self {
auth_config,
outbound_proxy,
remote_addr,
control_channel_router: ControlChannelRouter::without_handler(),
}
}
pub fn with_control_channel_handler(
mut self,
handler: Box<dyn ControlChannelHandler>,
) -> Self {
self.control_channel_router = ControlChannelRouter::with_handler(handler);
self
}
pub fn control_channel_router(&self) -> &ControlChannelRouter {
&self.control_channel_router
}
}
#[async_trait]
impl Handler for ServerHandler {
type Error = russh::Error;
async fn auth_publickey(
&mut self,
user: &str,
public_key: &russh::keys::ssh_key::PublicKey,
) -> Result<Auth, Self::Error> {
let fingerprint = format!("{}", public_key.fingerprint(HashAlg::Sha256));
let remote_addr_display = self
.remote_addr
.map_or("unknown".to_string(), |a| a.to_string());
let russh_pub = russh::keys::PublicKey::new(public_key.key_data().clone(), user);
let result = self.auth_config.authenticate_publickey(&russh_pub);
match result {
Ok(()) => {
tracing::info!(
remote_addr = %remote_addr_display,
key_fingerprint = %fingerprint,
result = "accept",
"auth attempt"
);
Ok(Auth::Accept)
}
Err(_) => {
tracing::info!(
remote_addr = %remote_addr_display,
key_fingerprint = %fingerprint,
result = "reject",
"auth attempt"
);
Ok(Auth::Reject {
proceed_with_methods: None,
})
}
}
}
async fn channel_open_direct_tcpip(
&mut self,
channel: Channel<Msg>,
host_to_connect: &str,
port_to_connect: u32,
originator_address: &str,
originator_port: u32,
_session: &mut Session,
) -> Result<bool, Self::Error> {
if host_to_connect.starts_with(WRAITH_PREFIX) {
tracing::info!(
host = host_to_connect,
port = port_to_connect,
"routing to internal control channel handler"
);
if !self.control_channel_router.has_handler() {
tracing::warn!(
host = host_to_connect,
"no control channel handler configured, rejecting channel open"
);
return Ok(false);
}
let _ = channel;
return Ok(true);
}
let proxy_info = self
.outbound_proxy
.as_ref()
.map(|p| format!("{:?}", p.mode))
.unwrap_or_else(|| "direct".to_string());
tracing::info!(
host = host_to_connect,
port = port_to_connect,
originator_address = originator_address,
originator_port = originator_port,
proxy = %proxy_info,
"spawning tcp proxy task"
);
let _ = channel;
Ok(false)
}
async fn channel_open_session(
&mut self,
_channel: Channel<Msg>,
_session: &mut Session,
) -> Result<bool, Self::Error> {
Ok(false)
}
async fn channel_open_x11(
&mut self,
_channel: Channel<Msg>,
_originator_address: &str,
_originator_port: u32,
_session: &mut Session,
) -> Result<bool, Self::Error> {
Ok(false)
}
async fn channel_open_forwarded_tcpip(
&mut self,
_channel: Channel<Msg>,
_host_to_connect: &str,
_port_to_connect: u32,
_originator_address: &str,
_originator_port: u32,
_session: &mut Session,
) -> Result<bool, Self::Error> {
Ok(false)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::auth::keys::KeySource;
use russh::keys::{decode_secret_key, PrivateKey};
use std::io::Write;
const ED25519_PRIVATE_KEY: &str = "-----BEGIN OPENSSH PRIVATE KEY-----\nb3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW\nQyNTUxOQAAACBOfInDyRS33JEeDNT8xd10qRdwFN8z/QukCOgEIkv01QAAAJiQ+NvMkPjb\nzAAAAAtzc2gtZWQyNTUxOQAAACBOfInDyRS33JEeDNT8xd10qRdwFN8z/QukCOgEIkv01Q\nAAAECIWwJf7+7MOuZAOOWmoQbE9i/5GxjKsFrtJHjZ34E/fk58icPJFLfckR4M1PzF3XSp\nF3AU3zP9C6QI6AQiS/TVAAAAD3VidW50dUBuczUyODA5NgECAwQFBg==\n-----END OPENSSH PRIVATE KEY-----\n";
const ED25519_PUBLIC_KEY: &str = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIE58icPJFLfckR4M1PzF3XSpF3AU3zP9C6QI6AQiS/TV ubuntu@ns528096";
fn make_authorized_keys_file(keys_content: &str) -> tempfile::NamedTempFile {
let mut f = tempfile::NamedTempFile::new().unwrap();
f.write_all(keys_content.as_bytes()).unwrap();
f.flush().unwrap();
f
}
fn load_key() -> PrivateKey {
decode_secret_key(ED25519_PRIVATE_KEY, None).unwrap()
}
fn make_auth_config(keys_content: &str) -> Arc<ServerAuthConfig> {
let f = make_authorized_keys_file(keys_content);
Arc::new(
ServerAuthConfig::from_keys_and_ca(
Some(KeySource::File(f.path().to_path_buf())),
None,
)
.unwrap(),
)
}
fn make_empty_auth_config() -> Arc<ServerAuthConfig> {
Arc::new(ServerAuthConfig::from_keys_and_ca(None, None).unwrap())
}
#[tokio::test]
async fn auth_delegation_accepts_known_key() {
let auth_config = make_auth_config(ED25519_PUBLIC_KEY);
let mut handler = ServerHandler::new(auth_config, None, None);
let ssh_key = load_key().public_key().clone();
let result = handler.auth_publickey("testuser", &ssh_key).await.unwrap();
assert_eq!(result, Auth::Accept);
}
#[tokio::test]
async fn auth_delegation_rejects_unknown_key() {
let auth_config = make_auth_config(ED25519_PUBLIC_KEY);
let mut handler = ServerHandler::new(auth_config, None, None);
let other_key_text = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIHeLC1lWiCYrXsf/85O/pkbUFZ6OGIt49PX3nw8iRoXE other@host";
let other_ssh_key = russh::keys::parse_public_key_base64(
other_key_text.split_whitespace().nth(1).unwrap(),
)
.unwrap();
let result = handler
.auth_publickey("testuser", &other_ssh_key)
.await
.unwrap();
assert_eq!(
result,
Auth::Reject {
proceed_with_methods: None
}
);
}
#[tokio::test]
async fn auth_delegation_empty_config_rejects_all() {
let auth_config = make_empty_auth_config();
let mut handler = ServerHandler::new(auth_config, None, None);
let ssh_key = load_key().public_key().clone();
let result = handler
.auth_publickey("testuser", &ssh_key)
.await
.unwrap();
assert_eq!(
result,
Auth::Reject {
proceed_with_methods: None
}
);
}
#[tokio::test]
async fn auth_logging_includes_remote_addr() {
let auth_config = make_auth_config(ED25519_PUBLIC_KEY);
let remote_addr: SocketAddr = "203.0.113.50:12345".parse().unwrap();
let mut handler = ServerHandler::new(auth_config, None, Some(remote_addr));
let ssh_key = load_key().public_key().clone();
let _ = handler.auth_publickey("root", &ssh_key).await.unwrap();
}
#[test]
fn reserved_wraith_destination_routing() {
use crate::server::control_channel::is_reserved_destination;
assert!(is_reserved_destination("wraith-control"));
assert!(is_reserved_destination("wraith-status"));
assert!(is_reserved_destination("wraith-events"));
assert!(!is_reserved_destination("example.com"));
assert!(!is_reserved_destination("localhost"));
assert!(!is_reserved_destination("wraith.example.com"));
}
#[test]
fn server_handler_without_control_handler_rejects_wraith_destinations() {
let auth_config = make_empty_auth_config();
let handler = ServerHandler::new(auth_config, None, None);
assert!(!handler.control_channel_router().has_handler());
}
#[test]
fn proxy_mode_variants() {
let direct = ProxyMode::Direct;
let socks5 = ProxyMode::Socks5("127.0.0.1:9050".parse().unwrap());
let http = ProxyMode::HttpConnect("127.0.0.1:8080".parse().unwrap());
match direct {
ProxyMode::Direct => {}
_ => panic!("expected Direct"),
}
match socks5 {
ProxyMode::Socks5(_) => {}
_ => panic!("expected Socks5"),
}
match http {
ProxyMode::HttpConnect(_) => {}
_ => panic!("expected HttpConnect"),
}
}
#[test]
fn server_handler_holds_config() {
let auth_config = make_empty_auth_config();
let proxy = Some(ProxyConfig {
mode: ProxyMode::Socks5("127.0.0.1:9050".parse().unwrap()),
});
let remote: Option<SocketAddr> = Some("10.0.0.1:22".parse().unwrap());
let handler = ServerHandler::new(auth_config, proxy.clone(), remote);
assert!(handler.outbound_proxy.is_some());
assert!(handler.remote_addr.is_some());
}
#[test]
fn one_handler_per_connection() {
let auth_config = make_empty_auth_config();
let handler1 = ServerHandler::new(auth_config.clone(), None, Some("10.0.0.1:22".parse().unwrap()));
let handler2 = ServerHandler::new(auth_config.clone(), None, Some("10.0.0.2:22".parse().unwrap()));
assert!(handler1.remote_addr != handler2.remote_addr);
}
}

View File

@@ -0,0 +1,8 @@
pub mod control_channel;
pub mod handler;
pub use control_channel::{
ControlChannelHandler, ControlChannelRouter, DuplexStream, WRAITH_CONTROL_DESTINATION,
WRAITH_PREFIX, is_reserved_destination,
};
pub use handler::{ProxyConfig, ProxyMode, ServerHandler};

View File

@@ -0,0 +1,490 @@
mod protocol;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tracing::debug;
use protocol::{Socks5Reply, Socks5Request, Socks5VersionMethod};
pub use protocol::Socks5Address;
const DEFAULT_SOCKS5_ADDR: &str = "127.0.0.1:1080";
pub trait ChannelOpener: Send + Sync + 'static {
type Stream: AsyncRead + AsyncWrite + Unpin + Send + 'static;
fn open_channel(
&self,
host: String,
port: u16,
) -> impl std::future::Future<Output = Result<Self::Stream, ChannelOpenError>> + Send;
}
#[derive(Debug, thiserror::Error)]
pub enum ChannelOpenError {
#[error("session closed")]
SessionClosed,
#[error("channel open failed")]
ChannelOpenFailed,
#[error("connection refused")]
ConnectionRefused,
}
pub struct Socks5Server<C: ChannelOpener> {
listen_addr: SocketAddr,
channel_opener: Arc<C>,
}
impl<C: ChannelOpener> Socks5Server<C> {
pub fn new(channel_opener: C) -> Self {
Self::with_addr(channel_opener, DEFAULT_SOCKS5_ADDR)
}
pub fn with_addr(channel_opener: C, addr: &str) -> Self {
let listen_addr: SocketAddr = addr
.parse()
.expect("invalid SOCKS5 listen address");
Self {
listen_addr,
channel_opener: Arc::new(channel_opener),
}
}
pub fn listen_addr(&self) -> SocketAddr {
self.listen_addr
}
pub async fn run(self) -> Result<(), std::io::Error> {
let listener = TcpListener::bind(self.listen_addr).await?;
debug!("socks5 server listening on {}", self.listen_addr);
loop {
let (socket, _peer) = listener.accept().await?;
let opener = Arc::clone(&self.channel_opener);
tokio::spawn(async move {
if let Err(e) = handle_socks5_connection(socket, opener).await {
debug!("socks5 connection error: {e}");
}
});
}
}
}
async fn handle_socks5_connection<S, C>(
mut socket: S,
opener: Arc<C>,
) -> Result<(), Socks5Error>
where
S: AsyncRead + AsyncWrite + Unpin,
C: ChannelOpener,
{
let vm = Socks5VersionMethod::read_from(&mut socket).await?;
if vm.version != 0x05 {
return Err(Socks5Error::InvalidVersion(vm.version));
}
if !vm.methods.contains(&0x00) {
let reply = [0x05, 0xFF];
socket.write_all(&reply).await?;
socket.shutdown().await?;
return Err(Socks5Error::NoAcceptableAuth);
}
let reply = [0x05, 0x00];
socket.write_all(&reply).await?;
let request = Socks5Request::read_from(&mut socket).await?;
if request.version != 0x05 {
return Err(Socks5Error::InvalidVersion(request.version));
}
if request.command != 0x01 {
send_error_reply(&mut socket, Socks5Reply::command_not_supported()).await?;
return Err(Socks5Error::UnsupportedCommand(request.command));
}
let (host, port) = match &request.address {
Socks5Address::Ipv4(addr) => (addr.to_string(), request.port),
Socks5Address::Ipv6(addr) => (addr.to_string(), request.port),
Socks5Address::Domain(name) => (name.clone(), request.port),
};
match opener.open_channel(host, port).await {
Ok(mut ssh_stream) => {
let bind_addr = Socks5Address::Ipv4(std::net::Ipv4Addr::UNSPECIFIED);
let reply = Socks5Reply::success(bind_addr, 0);
reply.write_to(&mut socket).await?;
tokio::io::copy_bidirectional(&mut socket, &mut ssh_stream).await?;
Ok(())
}
Err(_) => {
send_error_reply(&mut socket, Socks5Reply::connection_refused()).await?;
Err(Socks5Error::ChannelOpenFailed)
}
}
}
async fn send_error_reply<S: AsyncRead + AsyncWrite + Unpin>(
socket: &mut S,
reply: Socks5Reply,
) -> Result<(), Socks5Error> {
reply.write_to(socket).await?;
let _ = socket.shutdown().await;
Ok(())
}
#[derive(Debug, thiserror::Error)]
pub enum Socks5Error {
#[error("invalid SOCKS version: {0}")]
InvalidVersion(u8),
#[error("no acceptable auth method")]
NoAcceptableAuth,
#[error("unsupported command: {0}")]
UnsupportedCommand(u8),
#[error("channel open failed")]
ChannelOpenFailed,
#[error("io error")]
Io(#[from] std::io::Error),
}
pub struct HandleChannelOpener<H: russh::client::Handler> {
handle: Arc<Mutex<russh::client::Handle<H>>>,
}
impl<H: russh::client::Handler> HandleChannelOpener<H> {
pub fn new(handle: russh::client::Handle<H>) -> Self {
Self {
handle: Arc::new(Mutex::new(handle)),
}
}
pub fn from_arc(handle: Arc<Mutex<russh::client::Handle<H>>>) -> Self {
Self { handle }
}
}
impl<H: russh::client::Handler + Send + Sync + 'static> ChannelOpener for HandleChannelOpener<H> {
type Stream = russh::ChannelStream<russh::client::Msg>;
async fn open_channel(&self, host: String, port: u16) -> Result<Self::Stream, ChannelOpenError> {
let handle = self.handle.lock().await;
if handle.is_closed() {
return Err(ChannelOpenError::SessionClosed);
}
let channel = handle
.channel_open_direct_tcpip(host, port as u32, "127.0.0.1", 0)
.await
.map_err(|_| ChannelOpenError::ChannelOpenFailed)?;
Ok(channel.into_stream())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream};
struct MockChannelOpener {
fail: bool,
}
impl ChannelOpener for MockChannelOpener {
type Stream = DuplexStream;
async fn open_channel(
&self,
_host: String,
_port: u16,
) -> Result<Self::Stream, ChannelOpenError> {
if self.fail {
Err(ChannelOpenError::ChannelOpenFailed)
} else {
let (client, _server) = duplex(4096);
Ok(client)
}
}
}
fn build_socks5_greeting(methods: &[u8]) -> Vec<u8> {
let mut buf = vec![0x05, methods.len() as u8];
buf.extend_from_slice(methods);
buf
}
fn build_socks5_connect_ipv4(addr: [u8; 4], port: u16) -> Vec<u8> {
let mut buf = vec![0x05, 0x01, 0x00, 0x01];
buf.extend_from_slice(&addr);
buf.extend_from_slice(&port.to_be_bytes());
buf
}
fn build_socks5_connect_domain(domain: &str, port: u16) -> Vec<u8> {
let mut buf = vec![0x05, 0x01, 0x00, 0x03];
buf.push(domain.len() as u8);
buf.extend_from_slice(domain.as_bytes());
buf.extend_from_slice(&port.to_be_bytes());
buf
}
fn build_socks5_connect_ipv6(addr: [u8; 16], port: u16) -> Vec<u8> {
let mut buf = vec![0x05, 0x01, 0x00, 0x04];
buf.extend_from_slice(&addr);
buf.extend_from_slice(&port.to_be_bytes());
buf
}
async fn do_handshake(client: &mut DuplexStream) -> [u8; 2] {
client.write_all(&build_socks5_greeting(&[0x00])).await.unwrap();
client.flush().await.unwrap();
let mut resp = [0u8; 2];
client.read_exact(&mut resp).await.unwrap();
resp
}
async fn do_connect_ipv4(client: &mut DuplexStream, addr: [u8; 4], port: u16) -> Vec<u8> {
client
.write_all(&build_socks5_connect_ipv4(addr, port))
.await
.unwrap();
client.flush().await.unwrap();
let mut reply_buf = [0u8; 10];
client.read_exact(&mut reply_buf).await.unwrap();
reply_buf.to_vec()
}
#[tokio::test]
async fn handshake_no_auth_method() {
let (mut client, server) = duplex(4096);
let opener = MockChannelOpener { fail: false };
let server_handle = tokio::spawn(async move {
handle_socks5_connection(server, Arc::new(opener)).await
});
let resp = do_handshake(&mut client).await;
assert_eq!(resp, [0x05, 0x00]);
let reply_buf = do_connect_ipv4(&mut client, [127, 0, 0, 1], 80).await;
assert_eq!(reply_buf[0], 0x05);
assert_eq!(reply_buf[1], 0x00);
drop(client);
let _ = server_handle.await;
}
#[tokio::test]
async fn handshake_rejects_no_acceptable_method() {
let (mut client, server) = duplex(4096);
let opener = MockChannelOpener { fail: false };
let server_handle = tokio::spawn(async move {
handle_socks5_connection(server, Arc::new(opener)).await
});
client
.write_all(&build_socks5_greeting(&[0x02]))
.await
.unwrap();
client.flush().await.unwrap();
let mut resp = [0u8; 2];
client.read_exact(&mut resp).await.unwrap();
assert_eq!(resp, [0x05, 0xFF]);
drop(client);
let result = server_handle.await.unwrap();
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
Socks5Error::NoAcceptableAuth
));
}
#[tokio::test]
async fn address_type_ipv4() {
let (mut client, server) = duplex(4096);
let opener = MockChannelOpener { fail: false };
let server_handle = tokio::spawn(async move {
handle_socks5_connection(server, Arc::new(opener)).await
});
do_handshake(&mut client).await;
let reply_buf = do_connect_ipv4(&mut client, [10, 0, 0, 1], 443).await;
assert_eq!(reply_buf[1], 0x00);
drop(client);
let _ = server_handle.await;
}
#[tokio::test]
async fn address_type_domain() {
let (mut client, server) = duplex(4096);
let opener = MockChannelOpener { fail: false };
let server_handle = tokio::spawn(async move {
handle_socks5_connection(server, Arc::new(opener)).await
});
do_handshake(&mut client).await;
client
.write_all(&build_socks5_connect_domain("example.com", 443))
.await
.unwrap();
client.flush().await.unwrap();
let mut reply_buf = [0u8; 10];
client.read_exact(&mut reply_buf).await.unwrap();
assert_eq!(reply_buf[1], 0x00);
drop(client);
let _ = server_handle.await;
}
#[tokio::test]
async fn address_type_ipv6() {
let (mut client, server) = duplex(4096);
let opener = MockChannelOpener { fail: false };
let server_handle = tokio::spawn(async move {
handle_socks5_connection(server, Arc::new(opener)).await
});
do_handshake(&mut client).await;
let ipv6_addr = [0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1];
client
.write_all(&build_socks5_connect_ipv6(ipv6_addr, 443))
.await
.unwrap();
client.flush().await.unwrap();
let mut reply_buf = [0u8; 10];
client.read_exact(&mut reply_buf).await.unwrap();
assert_eq!(reply_buf[0], 0x05);
assert_eq!(reply_buf[1], 0x00);
drop(client);
let _ = server_handle.await;
}
#[tokio::test]
async fn channel_open_failure_returns_socks5_error() {
let (mut client, server) = duplex(4096);
let opener = MockChannelOpener { fail: true };
let server_handle = tokio::spawn(async move {
handle_socks5_connection(server, Arc::new(opener)).await
});
do_handshake(&mut client).await;
let reply_buf = do_connect_ipv4(&mut client, [10, 0, 0, 1], 80).await;
assert_eq!(reply_buf[0], 0x05);
assert_eq!(reply_buf[1], 0x05);
drop(client);
let _ = server_handle.await;
}
#[tokio::test]
async fn unsupported_command_returns_error() {
let (mut client, server) = duplex(4096);
let opener = MockChannelOpener { fail: false };
let server_handle = tokio::spawn(async move {
handle_socks5_connection(server, Arc::new(opener)).await
});
do_handshake(&mut client).await;
let mut bind_req = vec![0x05, 0x02, 0x00, 0x01];
bind_req.extend_from_slice(&[127, 0, 0, 1]);
bind_req.extend_from_slice(&80u16.to_be_bytes());
client.write_all(&bind_req).await.unwrap();
client.flush().await.unwrap();
let mut reply_buf = [0u8; 10];
client.read_exact(&mut reply_buf).await.unwrap();
assert_eq!(reply_buf[1], 0x07);
drop(client);
let _ = server_handle.await;
}
#[tokio::test]
async fn bidirectional_proxy_flow() {
let (mut client_sock, server_sock) = duplex(4096);
let (ssh_client, mut ssh_server) = duplex(4096);
let ssh_stream = Arc::new(Mutex::new(Some(ssh_client)));
struct ProxyOpener {
stream: Arc<Mutex<Option<DuplexStream>>>,
}
impl ChannelOpener for ProxyOpener {
type Stream = DuplexStream;
async fn open_channel(
&self,
_host: String,
_port: u16,
) -> Result<Self::Stream, ChannelOpenError> {
self.stream
.lock()
.await
.take()
.ok_or(ChannelOpenError::ChannelOpenFailed)
}
}
let opener = ProxyOpener {
stream: Arc::clone(&ssh_stream),
};
let server_handle = tokio::spawn(async move {
handle_socks5_connection(server_sock, Arc::new(opener)).await
});
do_handshake(&mut client_sock).await;
let reply_buf = do_connect_ipv4(&mut client_sock, [127, 0, 0, 1], 80).await;
assert_eq!(reply_buf[1], 0x00);
let test_data = b"hello through tunnel";
client_sock.write_all(test_data).await.unwrap();
client_sock.flush().await.unwrap();
let mut received = vec![0u8; test_data.len()];
AsyncReadExt::read_exact(&mut ssh_server, &mut received)
.await
.unwrap();
assert_eq!(&received, test_data);
let echo_data = b"response from tunnel";
ssh_server.write_all(echo_data).await.unwrap();
ssh_server.flush().await.unwrap();
let mut received_back = vec![0u8; echo_data.len()];
client_sock.read_exact(&mut received_back).await.unwrap();
assert_eq!(&received_back, echo_data);
drop(client_sock);
drop(ssh_server);
let _ = server_handle.await;
}
#[tokio::test]
async fn default_listen_address() {
let opener = MockChannelOpener { fail: false };
let server = Socks5Server::new(opener);
assert_eq!(server.listen_addr(), "127.0.0.1:1080".parse().unwrap());
}
#[tokio::test]
async fn custom_listen_address() {
let opener = MockChannelOpener { fail: false };
let server = Socks5Server::with_addr(opener, "127.0.0.1:9050");
assert_eq!(server.listen_addr(), "127.0.0.1:9050".parse().unwrap());
}
}

View File

@@ -0,0 +1,304 @@
use std::net::{Ipv4Addr, Ipv6Addr};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
#[derive(Debug, Clone, PartialEq)]
pub enum Socks5Address {
Ipv4(Ipv4Addr),
Ipv6(Ipv6Addr),
Domain(String),
}
#[derive(Debug)]
pub struct Socks5VersionMethod {
pub version: u8,
pub methods: Vec<u8>,
}
impl Socks5VersionMethod {
pub async fn read_from<R: AsyncRead + Unpin>(reader: &mut R) -> std::io::Result<Self> {
let version = reader.read_u8().await?;
let nmethods = reader.read_u8().await?;
let mut methods = vec![0u8; nmethods as usize];
reader.read_exact(&mut methods).await?;
Ok(Self { version, methods })
}
}
#[derive(Debug)]
pub struct Socks5Request {
pub version: u8,
pub command: u8,
pub address: Socks5Address,
pub port: u16,
}
impl Socks5Request {
pub async fn read_from<R: AsyncRead + Unpin>(reader: &mut R) -> std::io::Result<Self> {
let version = reader.read_u8().await?;
let command = reader.read_u8().await?;
let _rsv = reader.read_u8().await?;
let atyp = reader.read_u8().await?;
let address = match atyp {
0x01 => {
let mut octets = [0u8; 4];
reader.read_exact(&mut octets).await?;
Socks5Address::Ipv4(Ipv4Addr::from(octets))
}
0x04 => {
let mut octets = [0u8; 16];
reader.read_exact(&mut octets).await?;
Socks5Address::Ipv6(Ipv6Addr::from(octets))
}
0x03 => {
let len = reader.read_u8().await?;
let mut domain = vec![0u8; len as usize];
reader.read_exact(&mut domain).await?;
Socks5Address::Domain(String::from_utf8_lossy(&domain).into_owned())
}
_ => {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("unsupported address type: {atyp}"),
))
}
};
let port = reader.read_u16().await?;
Ok(Self {
version,
command,
address,
port,
})
}
}
#[derive(Debug)]
pub struct Socks5Reply {
pub version: u8,
pub reply: u8,
pub address: Socks5Address,
pub port: u16,
}
impl Socks5Reply {
pub fn success(address: Socks5Address, port: u16) -> Self {
Self {
version: 0x05,
reply: 0x00,
address,
port,
}
}
pub fn connection_refused() -> Self {
Self {
version: 0x05,
reply: 0x05,
address: Socks5Address::Ipv4(Ipv4Addr::UNSPECIFIED),
port: 0,
}
}
pub fn command_not_supported() -> Self {
Self {
version: 0x05,
reply: 0x07,
address: Socks5Address::Ipv4(Ipv4Addr::UNSPECIFIED),
port: 0,
}
}
pub async fn write_to<W: AsyncWrite + Unpin>(&self, writer: &mut W) -> std::io::Result<()> {
writer.write_u8(self.version).await?;
writer.write_u8(self.reply).await?;
writer.write_u8(0x00).await?;
match &self.address {
Socks5Address::Ipv4(addr) => {
writer.write_u8(0x01).await?;
writer.write_all(&addr.octets()).await?;
}
Socks5Address::Ipv6(addr) => {
writer.write_u8(0x04).await?;
writer.write_all(&addr.octets()).await?;
}
Socks5Address::Domain(name) => {
writer.write_u8(0x03).await?;
writer.write_u8(name.len() as u8).await?;
writer.write_all(name.as_bytes()).await?;
}
}
writer.write_u16(self.port).await?;
writer.flush().await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[tokio::test]
async fn parse_version_method_no_auth() {
let data = [0x05, 0x01, 0x00];
let mut cursor = Cursor::new(&data[..]);
let vm = Socks5VersionMethod::read_from(&mut cursor).await.unwrap();
assert_eq!(vm.version, 0x05);
assert_eq!(vm.methods, vec![0x00]);
}
#[tokio::test]
async fn parse_version_method_multiple() {
let data = [0x05, 0x02, 0x00, 0x02];
let mut cursor = Cursor::new(&data[..]);
let vm = Socks5VersionMethod::read_from(&mut cursor).await.unwrap();
assert_eq!(vm.version, 0x05);
assert_eq!(vm.methods, vec![0x00, 0x02]);
}
#[tokio::test]
async fn parse_request_ipv4() {
let mut data = vec![0x05, 0x01, 0x00, 0x01];
data.extend_from_slice(&[10, 0, 0, 1]);
data.extend_from_slice(&443u16.to_be_bytes());
let mut cursor = Cursor::new(&data[..]);
let req = Socks5Request::read_from(&mut cursor).await.unwrap();
assert_eq!(req.version, 0x05);
assert_eq!(req.command, 0x01);
assert_eq!(
req.address,
Socks5Address::Ipv4(Ipv4Addr::new(10, 0, 0, 1))
);
assert_eq!(req.port, 443);
}
#[tokio::test]
async fn parse_request_ipv6() {
let mut data = vec![0x05, 0x01, 0x00, 0x04];
let octets: [u8; 16] = [0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1];
data.extend_from_slice(&octets);
data.extend_from_slice(&443u16.to_be_bytes());
let mut cursor = Cursor::new(&data[..]);
let req = Socks5Request::read_from(&mut cursor).await.unwrap();
assert_eq!(req.version, 0x05);
assert_eq!(req.command, 0x01);
assert!(matches!(req.address, Socks5Address::Ipv6(_)));
assert_eq!(req.port, 443);
}
#[tokio::test]
async fn parse_request_domain() {
let domain = "example.com";
let mut data = vec![0x05, 0x01, 0x00, 0x03];
data.push(domain.len() as u8);
data.extend_from_slice(domain.as_bytes());
data.extend_from_slice(&443u16.to_be_bytes());
let mut cursor = Cursor::new(&data[..]);
let req = Socks5Request::read_from(&mut cursor).await.unwrap();
assert_eq!(req.version, 0x05);
assert_eq!(req.command, 0x01);
assert_eq!(req.address, Socks5Address::Domain("example.com".to_string()));
assert_eq!(req.port, 443);
}
#[tokio::test]
async fn parse_request_unsupported_address_type() {
let data = [0x05, 0x01, 0x00, 0x05];
let mut cursor = Cursor::new(&data[..]);
let result = Socks5Request::read_from(&mut cursor).await;
assert!(result.is_err());
}
#[tokio::test]
async fn reply_success_ipv4() {
let reply = Socks5Reply::success(Socks5Address::Ipv4(Ipv4Addr::UNSPECIFIED), 0);
let mut buf = Vec::new();
reply.write_to(&mut buf).await.unwrap();
assert_eq!(buf[0], 0x05);
assert_eq!(buf[1], 0x00);
assert_eq!(buf[2], 0x00);
assert_eq!(buf[3], 0x01);
}
#[tokio::test]
async fn reply_connection_refused() {
let reply = Socks5Reply::connection_refused();
let mut buf = Vec::new();
reply.write_to(&mut buf).await.unwrap();
assert_eq!(buf[0], 0x05);
assert_eq!(buf[1], 0x05);
}
#[tokio::test]
async fn reply_command_not_supported() {
let reply = Socks5Reply::command_not_supported();
let mut buf = Vec::new();
reply.write_to(&mut buf).await.unwrap();
assert_eq!(buf[0], 0x05);
assert_eq!(buf[1], 0x07);
}
#[tokio::test]
async fn roundtrip_ipv4_reply() {
let reply = Socks5Reply::success(Socks5Address::Ipv4(Ipv4Addr::new(127, 0, 0, 1)), 1080);
let mut buf = Vec::new();
reply.write_to(&mut buf).await.unwrap();
let mut cursor = Cursor::new(&buf[..]);
let version = cursor.read_u8().await.unwrap();
let _reply_code = cursor.read_u8().await.unwrap();
let _rsv = cursor.read_u8().await.unwrap();
let atyp = cursor.read_u8().await.unwrap();
assert_eq!(version, 0x05);
assert_eq!(atyp, 0x01);
let mut octets = [0u8; 4];
cursor.read_exact(&mut octets).await.unwrap();
assert_eq!(Ipv4Addr::from(octets), Ipv4Addr::new(127, 0, 0, 1));
let port = cursor.read_u16().await.unwrap();
assert_eq!(port, 1080);
}
#[tokio::test]
async fn roundtrip_ipv6_reply() {
let addr = Ipv6Addr::new(0x2001, 0x0db8, 0, 0, 0, 0, 0, 1);
let reply = Socks5Reply::success(Socks5Address::Ipv6(addr), 443);
let mut buf = Vec::new();
reply.write_to(&mut buf).await.unwrap();
let mut cursor = Cursor::new(&buf[..]);
let _version = cursor.read_u8().await.unwrap();
let _reply_code = cursor.read_u8().await.unwrap();
let _rsv = cursor.read_u8().await.unwrap();
let atyp = cursor.read_u8().await.unwrap();
assert_eq!(atyp, 0x04);
let mut octets = [0u8; 16];
cursor.read_exact(&mut octets).await.unwrap();
assert_eq!(Ipv6Addr::from(octets), addr);
let port = cursor.read_u16().await.unwrap();
assert_eq!(port, 443);
}
#[tokio::test]
async fn roundtrip_domain_reply() {
let reply = Socks5Reply::success(Socks5Address::Domain("example.com".to_string()), 8080);
let mut buf = Vec::new();
reply.write_to(&mut buf).await.unwrap();
let mut cursor = Cursor::new(&buf[..]);
let _version = cursor.read_u8().await.unwrap();
let _reply_code = cursor.read_u8().await.unwrap();
let _rsv = cursor.read_u8().await.unwrap();
let atyp = cursor.read_u8().await.unwrap();
assert_eq!(atyp, 0x03);
let len = cursor.read_u8().await.unwrap();
let mut domain = vec![0u8; len as usize];
cursor.read_exact(&mut domain).await.unwrap();
assert_eq!(String::from_utf8(domain).unwrap(), "example.com");
let port = cursor.read_u16().await.unwrap();
assert_eq!(port, 8080);
}
}

View File

@@ -0,0 +1,362 @@
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{anyhow, Result};
use rustls::crypto::aws_lc_rs::default_provider;
use rustls::ServerConfig;
use rustls_acme::caches::DirCache;
use rustls_acme::{AcmeConfig, AcmeState, ResolvesServerCertAcme};
use tracing::{error, info};
use tokio::net::TcpListener;
use tokio_rustls::TlsAcceptor as TokioTlsAcceptor;
use super::{TransportAcceptor, TransportInfo, TransportKind};
const ACME_TLS_ALPN_NAME: &[u8] = b"acme-tls/1";
#[derive(Debug, Clone)]
pub enum AcmeMode {
Domain { domain: String },
Ip,
}
pub struct AcmeCertProvider {
mode: AcmeMode,
cache_dir: Option<PathBuf>,
directory_url: String,
contact: Vec<String>,
}
impl std::fmt::Debug for AcmeCertProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AcmeCertProvider")
.field("mode", &self.mode)
.field("cache_dir", &self.cache_dir)
.field("directory_url", &self.directory_url)
.field("contact", &self.contact)
.finish_non_exhaustive()
}
}
impl AcmeCertProvider {
pub fn new(mode: AcmeMode) -> Self {
Self {
mode,
cache_dir: None,
directory_url: rustls_acme::acme::LETS_ENCRYPT_STAGING_DIRECTORY.to_string(),
contact: Vec::new(),
}
}
pub fn domain(domain: impl Into<String>) -> Self {
Self::new(AcmeMode::Domain {
domain: domain.into(),
})
}
pub fn ip() -> Self {
Self::new(AcmeMode::Ip)
}
pub fn with_cache_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.cache_dir = Some(dir.into());
self
}
pub fn with_directory(mut self, url: impl Into<String>) -> Self {
self.directory_url = url.into();
self
}
pub fn with_production_directory(mut self) -> Self {
self.directory_url = rustls_acme::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY.to_string();
self
}
pub fn with_contact(mut self, contact: impl Into<String>) -> Self {
self.contact.push(contact.into());
self
}
pub fn mode(&self) -> &AcmeMode {
&self.mode
}
fn build_acme_state(&self) -> (AcmeState<std::io::Error>, Arc<ResolvesServerCertAcme>) {
let domains: Vec<String> = match &self.mode {
AcmeMode::Domain { domain } => vec![domain.clone()],
AcmeMode::Ip => vec![],
};
let base_config = AcmeConfig::new(domains)
.directory(&self.directory_url)
.contact(self.contact.clone());
let state = match &self.cache_dir {
Some(cache_dir) => {
base_config.cache(DirCache::new(cache_dir.clone())).state()
}
None => {
base_config
.cache(rustls_acme::caches::NoCache::default())
.state()
}
};
let resolver = state.resolver();
(state, resolver)
}
pub fn build_server_config_with_resolver(
&self,
resolver: Arc<ResolvesServerCertAcme>,
) -> Result<Arc<ServerConfig>> {
let provider = default_provider().into();
let mut config = ServerConfig::builder_with_provider(provider)
.with_safe_default_protocol_versions()
.map_err(|e| anyhow!("failed to set protocol versions: {}", e))?
.with_no_client_auth()
.with_cert_resolver(resolver);
config.alpn_protocols.push(ACME_TLS_ALPN_NAME.to_vec());
Ok(Arc::new(config))
}
}
pub struct AcmeTlsAcceptor {
listener: TcpListener,
listen_addr: SocketAddr,
#[allow(dead_code)]
server_config: Arc<ServerConfig>,
tokio_acceptor: TokioTlsAcceptor,
}
impl AcmeTlsAcceptor {
pub async fn bind_acme(
addr: SocketAddr,
provider: Arc<AcmeCertProvider>,
) -> Result<Self> {
let (state, resolver) = provider.build_acme_state();
let server_config = provider.build_server_config_with_resolver(resolver.clone())?;
Self::spawn_state_worker(state, resolver);
let listener = TcpListener::bind(addr).await?;
let listen_addr = listener.local_addr()?;
let tokio_acceptor = TokioTlsAcceptor::from(server_config.clone());
Ok(Self {
listener,
listen_addr,
server_config,
tokio_acceptor,
})
}
pub fn listen_addr(&self) -> SocketAddr {
self.listen_addr
}
fn spawn_state_worker(state: AcmeState<std::io::Error>, resolver: Arc<ResolvesServerCertAcme>) {
use futures::StreamExt;
let task = async move {
let mut state = state;
while let Some(event) = state.next().await {
match event {
Ok(ok) => {
if let rustls_acme::EventOk::DeployedNewCert = ok {
info!("ACME: new certificate deployed");
} else {
info!("ACME event: {:?}", ok);
}
}
Err(err) => error!("ACME event error: {:?}", err),
}
if Arc::strong_count(&resolver) == 1 {
info!("ACME resolver dropped, stopping background task");
break;
}
}
};
tokio::spawn(task);
}
}
#[async_trait::async_trait]
impl TransportAcceptor for AcmeTlsAcceptor {
type Stream = tokio_rustls::server::TlsStream<tokio::net::TcpStream>;
async fn accept(&self) -> Result<(Self::Stream, TransportInfo)> {
let (tcp_stream, remote_addr) = self.listener.accept().await?;
let tls_stream = self.tokio_acceptor.accept(tcp_stream).await?;
let server_name = tls_stream
.get_ref()
.1
.server_name()
.map(|s| s.to_string());
let info = TransportInfo {
remote_addr: Some(remote_addr),
transport_kind: TransportKind::Tls { server_name },
};
Ok((tls_stream, info))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn acme_cert_provider_domain_mode() {
let provider = AcmeCertProvider::domain("example.com");
assert!(matches!(provider.mode(), AcmeMode::Domain { .. }));
if let AcmeMode::Domain { domain } = provider.mode() {
assert_eq!(domain, "example.com");
}
}
#[test]
fn acme_cert_provider_ip_mode() {
let provider = AcmeCertProvider::ip();
assert!(matches!(provider.mode(), AcmeMode::Ip));
}
#[test]
fn acme_cert_provider_default_staging_directory() {
let provider = AcmeCertProvider::domain("example.com");
assert_eq!(
provider.directory_url,
rustls_acme::acme::LETS_ENCRYPT_STAGING_DIRECTORY
);
}
#[test]
fn acme_cert_provider_production_directory() {
let provider = AcmeCertProvider::domain("example.com").with_production_directory();
assert_eq!(
provider.directory_url,
rustls_acme::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
);
}
#[test]
fn acme_cert_provider_custom_directory() {
let provider =
AcmeCertProvider::domain("example.com").with_directory("https://custom.acme.dir/");
assert_eq!(provider.directory_url, "https://custom.acme.dir/");
}
#[test]
fn acme_cert_provider_with_cache_dir() {
let provider = AcmeCertProvider::domain("example.com").with_cache_dir("/tmp/acme_cache");
assert_eq!(provider.cache_dir, Some(PathBuf::from("/tmp/acme_cache")));
}
#[test]
fn acme_cert_provider_with_contact() {
let provider =
AcmeCertProvider::domain("example.com").with_contact("mailto:admin@example.com");
assert_eq!(
provider.contact,
vec!["mailto:admin@example.com".to_string()]
);
}
#[test]
fn acme_cert_provider_build_state_domain() {
let provider = AcmeCertProvider::domain("example.com");
let (_state, resolver) = provider.build_acme_state();
assert!(Arc::strong_count(&resolver) >= 2);
}
#[test]
fn acme_cert_provider_build_state_with_cache() {
let provider =
AcmeCertProvider::domain("example.com").with_cache_dir("/tmp/test_cache");
let (_state, resolver) = provider.build_acme_state();
assert!(Arc::strong_count(&resolver) >= 2);
}
#[test]
fn acme_cert_provider_build_server_config() {
let _ = default_provider().install_default();
let provider = AcmeCertProvider::domain("example.com");
let (_, resolver) = provider.build_acme_state();
let config = provider.build_server_config_with_resolver(resolver).unwrap();
assert!(!config.alpn_protocols.is_empty());
assert!(config
.alpn_protocols
.iter()
.any(|p| p == ACME_TLS_ALPN_NAME));
}
#[test]
fn acme_mode_domain_debug() {
let mode = AcmeMode::Domain {
domain: "test.example.com".to_string(),
};
let debug_str = format!("{:?}", mode);
assert!(debug_str.contains("test.example.com"));
}
#[test]
fn acme_mode_ip_debug() {
let mode = AcmeMode::Ip;
let debug_str = format!("{:?}", mode);
assert!(debug_str.contains("Ip"));
}
#[test]
fn acme_cert_provider_builder_chain() {
let provider = AcmeCertProvider::domain("test.example.com")
.with_production_directory()
.with_cache_dir("/tmp/cache")
.with_contact("mailto:admin@test.example.com");
assert!(matches!(provider.mode(), AcmeMode::Domain { .. }));
assert_eq!(
provider.directory_url,
rustls_acme::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
);
assert_eq!(provider.cache_dir, Some(PathBuf::from("/tmp/cache")));
assert_eq!(provider.contact.len(), 1);
}
#[tokio::test]
async fn acme_tls_acceptor_bind_acme() {
let _ = default_provider().install_default();
let provider = Arc::new(AcmeCertProvider::domain("example.com"));
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let acceptor = AcmeTlsAcceptor::bind_acme(addr, provider).await.unwrap();
assert_ne!(acceptor.listen_addr().port(), 0);
}
#[tokio::test]
#[ignore]
async fn acme_staging_domain_cert_provisioning() {
let _ = default_provider().install_default();
let cache_dir = tempfile::tempdir().unwrap();
let provider = Arc::new(
AcmeCertProvider::domain("acme-test.example.com")
.with_cache_dir(cache_dir.path())
.with_contact("mailto:admin@example.com"),
);
let addr: SocketAddr = "0.0.0.0:443".parse().unwrap();
let result = AcmeTlsAcceptor::bind_acme(addr, provider).await;
assert!(
result.is_ok(),
"ACME TlsAcceptor should bind: {:?}",
result.err()
);
let acceptor = result.unwrap();
assert_eq!(acceptor.listen_addr().port(), 443);
}
}

View File

@@ -12,6 +12,12 @@ mod tls;
#[cfg(feature = "tls")]
pub use tls::{AcmeConfig, TlsAcceptor, TlsTransport};
#[cfg(feature = "acme")]
mod acme;
#[cfg(feature = "acme")]
pub use acme::{AcmeCertProvider, AcmeMode, AcmeTlsAcceptor};
use std::net::SocketAddr;
use anyhow::Result;

View File

@@ -9,8 +9,16 @@ use rustls::{ClientConfig, DigitallySignedStruct, RootCertStore, ServerConfig};
use tokio::net::{TcpListener, TcpStream};
use tokio_rustls::{client::TlsStream as ClientTlsStream, TlsAcceptor as TokioTlsAcceptor, TlsConnector};
#[cfg(feature = "acme")]
use rustls::crypto::aws_lc_rs::default_provider;
#[cfg(feature = "acme")]
use rustls_acme::ResolvesServerCertAcme;
use super::{Transport, TransportAcceptor, TransportInfo, TransportKind};
#[cfg(feature = "acme")]
const ACME_TLS_ALPN_NAME: &[u8] = b"acme-tls/1";
/// A TLS-based client transport that connects to a remote address over TLS.
///
/// Wraps a TCP connection with a TLS client session via `tokio_rustls::TlsConnector`.
@@ -110,8 +118,10 @@ pub struct AcmeConfig {
/// A TLS-based server transport acceptor that accepts TCP connections
/// and wraps them with TLS server sessions via `tokio_rustls::TlsAcceptor`.
///
/// Requires certificate and private key configuration. Supports manual
/// cert/key paths and an ACME config stub (ADR-008).
/// Supports three certificate modes (ADR-008):
/// - Manual certs via `bind()` with explicit cert/key
/// - ACME certs via `bind_acme()` with an `AcmeCertProvider`
/// - The stub `AcmeConfig` parameter in `bind()` is kept for backward compat
pub struct TlsAcceptor {
listener: TcpListener,
listen_addr: SocketAddr,
@@ -145,6 +155,33 @@ impl TlsAcceptor {
})
}
#[cfg(feature = "acme")]
pub async fn bind_acme(
addr: SocketAddr,
acme_resolver: Arc<ResolvesServerCertAcme>,
) -> Result<Self> {
let listener = TcpListener::bind(addr).await?;
let listen_addr = listener.local_addr()?;
let provider = default_provider().into();
let mut server_config = ServerConfig::builder_with_provider(provider)
.with_safe_default_protocol_versions()
.map_err(|e| anyhow!("failed to set protocol versions: {}", e))?
.with_no_client_auth()
.with_cert_resolver(acme_resolver);
server_config.alpn_protocols.push(ACME_TLS_ALPN_NAME.to_vec());
let server_config = Arc::new(server_config);
let tokio_acceptor = TokioTlsAcceptor::from(server_config.clone());
Ok(Self {
listener,
listen_addr,
server_config,
tokio_acceptor,
})
}
pub fn listen_addr(&self) -> SocketAddr {
self.listen_addr
}

View File

@@ -43,8 +43,14 @@ This integrates with `TlsAcceptor` by providing ACME-resolved certificates inste
## Notes
> To be filled by implementation agent
- `AcmeCertProvider` is the main entry point. It creates `AcmeState` and `ResolvesServerCertAcme` from `rustls-acme`.
- The `ResolvesServerCertAcme` resolver is shared between the `AcmeState` background task and the `ServerConfig`, so cert updates propagate automatically.
- `AcmeTlsAcceptor::bind_acme()` creates a TLS acceptor that uses ACME-provisioned certs and spawns a background tokio task for auto-renewal.
- `TlsAcceptor::bind_acme()` also added for users who want to use ACME with the standard `TlsAcceptor` type directly.
- The `AcmeConfig` stub in `tls.rs` is retained for backward compat with existing `TlsAcceptor::bind()`.
- `acme` feature implies `tls` and adds `rustls-acme` + `futures` dependencies.
- TLS-ALPN-01 challenge handling works via the `acme-tls/1` ALPN protocol registered in `ServerConfig` — the resolver dispatches challenge vs regular certs automatically.
## Summary
> To be filled on completion
Implemented ACME/Let's Encrypt certificate provisioning (ADR-008) behind the `acme` feature flag. `AcmeCertProvider` supports domain-based and IP-based modes using `rustls-acme`. `AcmeTlsAcceptor::bind_acme()` and `TlsAcceptor::bind_acme()` provide ACME-integrated TLS acceptance with automatic certificate renewal via a background tokio task. Unit tests cover config construction, builder patterns, and server config generation. Integration test for LE staging is marked `#[ignore]`.