Compare commits
9 Commits
feat/clien
...
feat/serve
| Author | SHA1 | Date | |
|---|---|---|---|
| 24b70f5651 | |||
| 992d478630 | |||
| e3f33a24c3 | |||
| 5fec0b53d9 | |||
| 2efd4cf7c5 | |||
| 4e4afd5020 | |||
| 7336c0f13c | |||
| 975778bfb1 | |||
| 24b92227e7 |
@@ -10,7 +10,7 @@ name = "wraith_core"
|
||||
default = []
|
||||
tls = ["dep:tokio-rustls", "dep:rustls", "dep:rustls-pki-types", "dep:webpki-roots"]
|
||||
iroh = ["dep:iroh", "dep:url"]
|
||||
acme = ["dep:rustls-acme", "tls"]
|
||||
acme = ["dep:rustls-acme", "dep:futures", "tls"]
|
||||
testutil = []
|
||||
transport-traits = []
|
||||
|
||||
@@ -25,6 +25,7 @@ tokio-rustls = { version = "0.26", optional = true }
|
||||
rustls = { version = "0.23", optional = true, features = ["aws_lc_rs"] }
|
||||
rustls-pki-types = { version = "1", optional = true }
|
||||
rustls-acme = { version = "0.12", optional = true }
|
||||
futures = { version = "0.3", optional = true }
|
||||
webpki-roots = { version = "0.26", optional = true }
|
||||
iroh = { version = "0.34", optional = true }
|
||||
url = { version = "2", optional = true }
|
||||
|
||||
530
crates/wraith-core/src/client/forward.rs
Normal file
530
crates/wraith-core/src/client/forward.rs
Normal file
@@ -0,0 +1,530 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use russh::client;
|
||||
use tokio::io;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use crate::error::ForwardError;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum PortForwardSpecKind {
|
||||
Local,
|
||||
Remote,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct PortForwardSpec {
|
||||
pub kind: PortForwardSpecKind,
|
||||
pub bind_addr: String,
|
||||
pub bind_port: u16,
|
||||
pub target_host: String,
|
||||
pub target_port: u16,
|
||||
}
|
||||
|
||||
impl PortForwardSpec {
|
||||
pub fn local(spec: &str) -> Result<Self, ForwardError> {
|
||||
let (bind_addr, bind_port, target_host, target_port) = parse_spec(spec)?;
|
||||
Ok(Self {
|
||||
kind: PortForwardSpecKind::Local,
|
||||
bind_addr,
|
||||
bind_port,
|
||||
target_host,
|
||||
target_port,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn remote(spec: &str) -> Result<Self, ForwardError> {
|
||||
let (bind_addr, bind_port, target_host, target_port) = parse_spec(spec)?;
|
||||
Ok(Self {
|
||||
kind: PortForwardSpecKind::Remote,
|
||||
bind_addr,
|
||||
bind_port,
|
||||
target_host,
|
||||
target_port,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn listen_addr(&self) -> Result<SocketAddr, ForwardError> {
|
||||
format!("{}:{}", self.bind_addr, self.bind_port)
|
||||
.parse()
|
||||
.map_err(|_| ForwardError::InvalidSpec {
|
||||
spec: format!("{}:{}", self.bind_addr, self.bind_port),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn target_addr(&self) -> Result<SocketAddr, ForwardError> {
|
||||
format!("{}:{}", self.target_host, self.target_port)
|
||||
.parse()
|
||||
.map_err(|_| ForwardError::InvalidSpec {
|
||||
spec: format!("{}:{}", self.target_host, self.target_port),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for PortForwardSpec {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let prefix = match self.kind {
|
||||
PortForwardSpecKind::Local => "-L",
|
||||
PortForwardSpecKind::Remote => "-R",
|
||||
};
|
||||
write!(
|
||||
f,
|
||||
"{} {}:{}:{}:{}",
|
||||
prefix, self.bind_addr, self.bind_port, self.target_host, self.target_port
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_spec(spec: &str) -> Result<(String, u16, String, u16), ForwardError> {
|
||||
let parts: Vec<&str> = spec.split(':').collect();
|
||||
if parts.len() != 4 {
|
||||
return Err(ForwardError::InvalidSpec {
|
||||
spec: spec.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let bind_addr = parts[0].to_string();
|
||||
let bind_port: u16 = parts[1].parse().map_err(|_| ForwardError::InvalidSpec {
|
||||
spec: spec.to_string(),
|
||||
})?;
|
||||
let target_host = parts[2].to_string();
|
||||
let target_port: u16 = parts[3].parse().map_err(|_| ForwardError::InvalidSpec {
|
||||
spec: spec.to_string(),
|
||||
})?;
|
||||
|
||||
Ok((bind_addr, bind_port, target_host, target_port))
|
||||
}
|
||||
|
||||
pub struct LocalForwarder {
|
||||
spec: PortForwardSpec,
|
||||
listener: Option<TcpListener>,
|
||||
}
|
||||
|
||||
impl LocalForwarder {
|
||||
pub fn new(spec: PortForwardSpec) -> Result<Self, ForwardError> {
|
||||
if spec.kind != PortForwardSpecKind::Local {
|
||||
return Err(ForwardError::InvalidSpec {
|
||||
spec: format!("expected local spec, got {:?}", spec.kind),
|
||||
});
|
||||
}
|
||||
Ok(Self {
|
||||
spec,
|
||||
listener: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn spec(&self) -> &PortForwardSpec {
|
||||
&self.spec
|
||||
}
|
||||
|
||||
pub async fn run<H: client::Handler + Send + 'static>(
|
||||
&mut self,
|
||||
handle: Arc<Mutex<client::Handle<H>>>,
|
||||
) -> Result<(), ForwardError> {
|
||||
let listen_addr = self.spec.listen_addr()?;
|
||||
let listener: TcpListener = TcpListener::bind(listen_addr)
|
||||
.await
|
||||
.map_err(|e| ForwardError::BindFailed { source: e })?;
|
||||
self.listener = Some(listener);
|
||||
let remote_host = self.spec.target_host.clone();
|
||||
let remote_port = self.spec.target_port;
|
||||
|
||||
info!(
|
||||
"local forward listening on {} -> {}:{}",
|
||||
listen_addr, remote_host, remote_port
|
||||
);
|
||||
|
||||
loop {
|
||||
let listener = match &self.listener {
|
||||
Some(l) => l,
|
||||
None => return Ok(()),
|
||||
};
|
||||
let accept_result = listener.accept().await;
|
||||
let (local_stream, local_addr) = match accept_result {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
let handle = handle.lock().await;
|
||||
if handle.is_closed() {
|
||||
debug!("local forward accept loop ending: ssh session closed");
|
||||
return Ok(());
|
||||
}
|
||||
drop(handle);
|
||||
error!("local forward accept error: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
debug!(
|
||||
"local forward connection from {} -> {}:{}",
|
||||
local_addr, remote_host, remote_port
|
||||
);
|
||||
|
||||
let handle = handle.clone();
|
||||
let remote_host = remote_host.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) =
|
||||
proxy_local_to_remote(local_stream, handle, &remote_host, remote_port).await
|
||||
{
|
||||
debug!("local forward proxy error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stop(&mut self) {
|
||||
if let Some(listener) = self.listener.take() {
|
||||
drop(listener);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn local_port(&self) -> u16 {
|
||||
self.spec.bind_port
|
||||
}
|
||||
}
|
||||
|
||||
async fn proxy_local_to_remote<H: client::Handler + Send + 'static>(
|
||||
local_stream: TcpStream,
|
||||
handle: Arc<Mutex<client::Handle<H>>>,
|
||||
remote_host: &str,
|
||||
remote_port: u16,
|
||||
) -> Result<(), ForwardError> {
|
||||
let local_addr = local_stream
|
||||
.peer_addr()
|
||||
.map(|a| a.to_string())
|
||||
.unwrap_or_default();
|
||||
|
||||
let handle_guard = handle.lock().await;
|
||||
let channel = handle_guard
|
||||
.channel_open_direct_tcpip(
|
||||
remote_host,
|
||||
remote_port as u32,
|
||||
&local_addr,
|
||||
0,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| ForwardError::ChannelOpenFailed {
|
||||
source: Box::new(e) as _,
|
||||
})?;
|
||||
drop(handle_guard);
|
||||
|
||||
let ssh_stream = channel.into_stream();
|
||||
let (mut ssh_read, mut ssh_write) = tokio::io::split(ssh_stream);
|
||||
let (mut local_read, mut local_write) = tokio::io::split(local_stream);
|
||||
|
||||
let client_to_server = io::copy(&mut local_read, &mut ssh_write);
|
||||
let server_to_client = io::copy(&mut ssh_read, &mut local_write);
|
||||
|
||||
match tokio::join!(client_to_server, server_to_client) {
|
||||
(Err(e), _) | (_, Err(e)) => {
|
||||
debug!("local forward bidirectional copy error: {}", e);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub struct RemoteForwarder {
|
||||
spec: PortForwardSpec,
|
||||
cancel: Option<tokio::sync::oneshot::Sender<()>>,
|
||||
}
|
||||
|
||||
impl RemoteForwarder {
|
||||
pub fn new(spec: PortForwardSpec) -> Result<Self, ForwardError> {
|
||||
if spec.kind != PortForwardSpecKind::Remote {
|
||||
return Err(ForwardError::InvalidSpec {
|
||||
spec: format!("expected remote spec, got {:?}", spec.kind),
|
||||
});
|
||||
}
|
||||
Ok(Self { spec, cancel: None })
|
||||
}
|
||||
|
||||
pub fn spec(&self) -> &PortForwardSpec {
|
||||
&self.spec
|
||||
}
|
||||
|
||||
pub async fn register<H: client::Handler + Send + 'static>(
|
||||
&self,
|
||||
handle: &mut client::Handle<H>,
|
||||
) -> Result<u32, ForwardError> {
|
||||
let port = handle
|
||||
.tcpip_forward(&self.spec.bind_addr, self.spec.bind_port as u32)
|
||||
.await
|
||||
.map_err(|e| ForwardError::ChannelOpenFailed {
|
||||
source: Box::new(e) as _,
|
||||
})?;
|
||||
Ok(port)
|
||||
}
|
||||
|
||||
pub async fn handle_forwarded_channel(
|
||||
channel: russh::Channel<russh::client::Msg>,
|
||||
connected_address: &str,
|
||||
connected_port: u32,
|
||||
local_host: &str,
|
||||
local_port: u16,
|
||||
) {
|
||||
debug!(
|
||||
"remote forward: server opened forwarded-tcpip channel to {}:{} -> local {}:{}",
|
||||
connected_address, connected_port, local_host, local_port
|
||||
);
|
||||
|
||||
let local_target = format!("{}:{}", local_host, local_port);
|
||||
let local_stream = match TcpStream::connect(&local_target).await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
error!(
|
||||
"remote forward: failed to connect to local target {}: {}",
|
||||
local_target, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let ssh_stream = channel.into_stream();
|
||||
let (mut ssh_read, mut ssh_write) = tokio::io::split(ssh_stream);
|
||||
let (mut local_read, mut local_write) = tokio::io::split(local_stream);
|
||||
|
||||
let client_to_server = io::copy(&mut local_read, &mut ssh_write);
|
||||
let server_to_client = io::copy(&mut ssh_read, &mut local_write);
|
||||
|
||||
match tokio::join!(client_to_server, server_to_client) {
|
||||
(Err(e), _) | (_, Err(e)) => {
|
||||
debug!("remote forward bidirectional copy error: {}", e);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn unregister<H: client::Handler + Send + 'static>(
|
||||
&self,
|
||||
handle: &client::Handle<H>,
|
||||
) -> Result<(), ForwardError> {
|
||||
handle
|
||||
.cancel_tcpip_forward(&self.spec.bind_addr, self.spec.bind_port as u32)
|
||||
.await
|
||||
.map_err(|e| ForwardError::ChannelOpenFailed {
|
||||
source: Box::new(e) as _,
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn stop(&mut self) {
|
||||
if let Some(cancel) = self.cancel.take() {
|
||||
let _ = cancel.send(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_local_forwarders<H: client::Handler + Send + 'static>(
|
||||
forwarders: Vec<LocalForwarder>,
|
||||
handle: Arc<Mutex<client::Handle<H>>>,
|
||||
mut shutdown: tokio::sync::watch::Receiver<bool>,
|
||||
) -> Vec<LocalForwarder> {
|
||||
let mut forwarders = forwarders;
|
||||
let mut tasks = Vec::new();
|
||||
|
||||
for forwarder in forwarders.drain(..) {
|
||||
let handle = handle.clone();
|
||||
let spec = forwarder.spec().clone();
|
||||
let (_cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
|
||||
tasks.push(tokio::spawn(async move {
|
||||
let mut fwd = forwarder;
|
||||
tokio::select! {
|
||||
result = fwd.run(handle) => {
|
||||
if let Err(e) = result {
|
||||
error!("local forward {} failed: {}", spec, e);
|
||||
}
|
||||
}
|
||||
_ = cancel_rx => {
|
||||
fwd.stop().await;
|
||||
}
|
||||
}
|
||||
fwd
|
||||
}));
|
||||
}
|
||||
|
||||
let _ = shutdown.changed().await;
|
||||
|
||||
for task in &tasks {
|
||||
task.abort();
|
||||
}
|
||||
|
||||
let mut results = Vec::new();
|
||||
for task in tasks {
|
||||
match task.await {
|
||||
Ok(fwd) => results.push(fwd),
|
||||
Err(e) => {
|
||||
if !e.is_cancelled() {
|
||||
error!("local forwarder task panicked: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
results
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parse_local_spec() {
|
||||
let spec = PortForwardSpec::local("127.0.0.1:5432:db.internal:5432").unwrap();
|
||||
assert_eq!(spec.kind, PortForwardSpecKind::Local);
|
||||
assert_eq!(spec.bind_addr, "127.0.0.1");
|
||||
assert_eq!(spec.bind_port, 5432);
|
||||
assert_eq!(spec.target_host, "db.internal");
|
||||
assert_eq!(spec.target_port, 5432);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_remote_spec() {
|
||||
let spec = PortForwardSpec::remote("0.0.0.0:8080:127.0.0.1:3000").unwrap();
|
||||
assert_eq!(spec.kind, PortForwardSpecKind::Remote);
|
||||
assert_eq!(spec.bind_addr, "0.0.0.0");
|
||||
assert_eq!(spec.bind_port, 8080);
|
||||
assert_eq!(spec.target_host, "127.0.0.1");
|
||||
assert_eq!(spec.target_port, 3000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_spec_invalid_few_parts() {
|
||||
assert!(PortForwardSpec::local("127.0.0.1:5432:db").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_spec_invalid_many_parts() {
|
||||
assert!(PortForwardSpec::local("a:b:c:d:e").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_spec_invalid_port() {
|
||||
assert!(PortForwardSpec::local("127.0.0.1:abc:db:5432").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_spec_invalid_target_port() {
|
||||
assert!(PortForwardSpec::local("127.0.0.1:5432:db:abc").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn spec_display() {
|
||||
let spec = PortForwardSpec::local("127.0.0.1:5432:db.internal:5432").unwrap();
|
||||
assert_eq!(spec.to_string(), "-L 127.0.0.1:5432:db.internal:5432");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn spec_display_remote() {
|
||||
let spec = PortForwardSpec::remote("0.0.0.0:8080:127.0.0.1:3000").unwrap();
|
||||
assert_eq!(spec.to_string(), "-R 0.0.0.0:8080:127.0.0.1:3000");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn local_forwarder_rejects_remote_spec() {
|
||||
let spec = PortForwardSpec::remote("0.0.0.0:8080:127.0.0.1:3000").unwrap();
|
||||
assert!(LocalForwarder::new(spec).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_forwarder_rejects_local_spec() {
|
||||
let spec = PortForwardSpec::local("127.0.0.1:5432:db.internal:5432").unwrap();
|
||||
assert!(RemoteForwarder::new(spec).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn listen_addr_valid() {
|
||||
let spec = PortForwardSpec::local("127.0.0.1:5432:db.internal:5432").unwrap();
|
||||
let addr = spec.listen_addr().unwrap();
|
||||
assert_eq!(addr.port(), 5432);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn listen_addr_invalid_host() {
|
||||
let spec = PortForwardSpec {
|
||||
kind: PortForwardSpecKind::Local,
|
||||
bind_addr: "!!!invalid".to_string(),
|
||||
bind_port: 5432,
|
||||
target_host: "db".to_string(),
|
||||
target_port: 5432,
|
||||
};
|
||||
assert!(spec.listen_addr().is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn local_forward_bind_and_accept() {
|
||||
let spec = PortForwardSpec::local(&format!("127.0.0.1:0:remote:5432")).unwrap();
|
||||
let forwarder = LocalForwarder::new(spec).unwrap();
|
||||
|
||||
let listen_addr = forwarder.spec.listen_addr().unwrap();
|
||||
let listener = TcpListener::bind(listen_addr).await.unwrap();
|
||||
let bound_addr = listener.local_addr().unwrap();
|
||||
drop(listener);
|
||||
|
||||
let spec = PortForwardSpec::local(&format!(
|
||||
"127.0.0.1:{}:remote:5432",
|
||||
bound_addr.port()
|
||||
))
|
||||
.unwrap();
|
||||
let forwarder = LocalForwarder::new(spec).unwrap();
|
||||
assert_eq!(forwarder.local_port(), bound_addr.port());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_forward_proxy_bidirectional() {
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
let echo_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let _echo_addr = echo_listener.local_addr().unwrap();
|
||||
|
||||
let echo_server = tokio::spawn(async move {
|
||||
let (mut stream, _) = echo_listener.accept().await.unwrap();
|
||||
let mut buf = [0u8; 64];
|
||||
loop {
|
||||
let n = match stream.read(&mut buf).await {
|
||||
Ok(0) => break,
|
||||
Ok(n) => n,
|
||||
Err(_) => break,
|
||||
};
|
||||
if stream.write_all(&buf[..n]).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let local_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let local_addr = local_listener.local_addr().unwrap();
|
||||
|
||||
let proxy_task = tokio::spawn(async move {
|
||||
let (stream, _) = local_listener.accept().await.unwrap();
|
||||
let (mut read, mut write) = tokio::io::split(stream);
|
||||
let _ = io::copy(&mut read, &mut write).await;
|
||||
});
|
||||
|
||||
let mut local_conn = TcpStream::connect(local_addr).await.unwrap();
|
||||
local_conn.write_all(b"hello").await.unwrap();
|
||||
let mut buf = [0u8; 64];
|
||||
let n = local_conn.read(&mut buf).await.unwrap();
|
||||
assert_eq!(&buf[..n], b"hello");
|
||||
|
||||
echo_server.abort();
|
||||
proxy_task.abort();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn forwarder_spec_access() {
|
||||
let spec = PortForwardSpec::local("127.0.0.1:5432:db.internal:5432").unwrap();
|
||||
let forwarder = LocalForwarder::new(spec.clone()).unwrap();
|
||||
assert_eq!(forwarder.spec(), &spec);
|
||||
assert_eq!(forwarder.local_port(), 5432);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_forwarder_spec_access() {
|
||||
let spec = PortForwardSpec::remote("0.0.0.0:8080:127.0.0.1:3000").unwrap();
|
||||
let forwarder = RemoteForwarder::new(spec.clone()).unwrap();
|
||||
assert_eq!(forwarder.spec(), &spec);
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,5 @@
|
||||
pub mod channel_manager;
|
||||
pub mod forward;
|
||||
|
||||
pub use channel_manager::{ChannelManager, ForwardRequest};
|
||||
pub use channel_manager::{ChannelManager, ForwardRequest};
|
||||
pub use forward::{LocalForwarder, PortForwardSpec, PortForwardSpecKind, RemoteForwarder};
|
||||
@@ -60,6 +60,22 @@ pub enum ConfigError {
|
||||
IncompatibleOptions,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ForwardError {
|
||||
#[error("invalid forward spec: {spec}")]
|
||||
InvalidSpec { spec: String },
|
||||
#[error("bind failed")]
|
||||
BindFailed {
|
||||
#[source]
|
||||
source: io::Error,
|
||||
},
|
||||
#[error("channel open failed")]
|
||||
ChannelOpenFailed {
|
||||
#[source]
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
},
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -8,6 +8,6 @@ pub mod error;
|
||||
#[cfg(feature = "testutil")]
|
||||
pub mod testutil;
|
||||
|
||||
pub use error::{AuthError, ChannelError, ConfigError, TransportError};
|
||||
pub use error::{AuthError, ChannelError, ConfigError, ForwardError, TransportError};
|
||||
pub use transport::{Transport, TransportAcceptor, TransportInfo, TransportKind};
|
||||
pub use client::channel_manager::{ChannelManager, ForwardRequest};
|
||||
512
crates/wraith-core/src/server/handler.rs
Normal file
512
crates/wraith-core/src/server/handler.rs
Normal file
@@ -0,0 +1,512 @@
|
||||
use std::net::{IpAddr, SocketAddr};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use russh::keys::ssh_key::HashAlg;
|
||||
use russh::server::{Auth, Handler, Msg, Session};
|
||||
use russh::Channel;
|
||||
|
||||
use crate::auth::ServerAuthConfig;
|
||||
use crate::server::rate_limit::{AuthAttemptLimiter, ConnectionRateLimiter};
|
||||
|
||||
const WRAITH_PREFIX: &str = "wraith-";
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ProxyMode {
|
||||
Direct,
|
||||
Socks5(SocketAddr),
|
||||
HttpConnect(SocketAddr),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ProxyConfig {
|
||||
pub mode: ProxyMode,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub enum TransportKind {
|
||||
Tcp,
|
||||
Tls,
|
||||
Iroh,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TransportKind {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
TransportKind::Tcp => write!(f, "tcp"),
|
||||
TransportKind::Tls => write!(f, "tls"),
|
||||
TransportKind::Iroh => write!(f, "iroh"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ServerHandler {
|
||||
auth_config: Arc<ServerAuthConfig>,
|
||||
outbound_proxy: Option<ProxyConfig>,
|
||||
remote_addr: Option<SocketAddr>,
|
||||
transport: TransportKind,
|
||||
connection_limiter: Arc<ConnectionRateLimiter>,
|
||||
connection_allowed: bool,
|
||||
auth_limiter: AuthAttemptLimiter,
|
||||
connected_at: Instant,
|
||||
}
|
||||
|
||||
impl ServerHandler {
|
||||
pub fn new(
|
||||
auth_config: Arc<ServerAuthConfig>,
|
||||
outbound_proxy: Option<ProxyConfig>,
|
||||
remote_addr: Option<SocketAddr>,
|
||||
transport: TransportKind,
|
||||
connection_limiter: Arc<ConnectionRateLimiter>,
|
||||
max_auth_attempts: usize,
|
||||
) -> Self {
|
||||
let allowed = if let Some(addr) = remote_addr {
|
||||
let ip = addr.ip();
|
||||
if connection_limiter.check(ip) {
|
||||
connection_limiter.on_connect(ip);
|
||||
tracing::info!(
|
||||
remote_addr = %addr,
|
||||
transport = %transport,
|
||||
"connection opened"
|
||||
);
|
||||
true
|
||||
} else {
|
||||
tracing::info!(
|
||||
remote_addr = %addr,
|
||||
transport = %transport,
|
||||
"connection rejected"
|
||||
);
|
||||
false
|
||||
}
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
||||
Self {
|
||||
auth_config,
|
||||
outbound_proxy,
|
||||
remote_addr,
|
||||
transport,
|
||||
connection_limiter,
|
||||
connection_allowed: allowed,
|
||||
auth_limiter: AuthAttemptLimiter::new(max_auth_attempts),
|
||||
connected_at: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_connection_allowed(&self) -> bool {
|
||||
self.connection_allowed
|
||||
}
|
||||
|
||||
pub fn remote_ip(&self) -> Option<IpAddr> {
|
||||
self.remote_addr.map(|a| a.ip())
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ServerHandler {
|
||||
fn drop(&mut self) {
|
||||
if let Some(addr) = self.remote_addr {
|
||||
if self.connection_allowed {
|
||||
self.connection_limiter.on_disconnect(addr.ip());
|
||||
}
|
||||
let duration = self.connected_at.elapsed();
|
||||
tracing::info!(
|
||||
remote_addr = %addr,
|
||||
duration_secs = duration.as_secs_f64(),
|
||||
"connection closed"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Handler for ServerHandler {
|
||||
type Error = russh::Error;
|
||||
|
||||
async fn auth_publickey(
|
||||
&mut self,
|
||||
user: &str,
|
||||
public_key: &russh::keys::ssh_key::PublicKey,
|
||||
) -> Result<Auth, Self::Error> {
|
||||
if !self.auth_limiter.check() {
|
||||
let remote_addr_display = self
|
||||
.remote_addr
|
||||
.map_or("unknown".to_string(), |a| a.to_string());
|
||||
let fingerprint = format!("{}", public_key.fingerprint(HashAlg::Sha256));
|
||||
tracing::info!(
|
||||
remote_addr = %remote_addr_display,
|
||||
user = user,
|
||||
key_fingerprint = %fingerprint,
|
||||
result = "reject",
|
||||
"auth attempt"
|
||||
);
|
||||
return Ok(Auth::Reject {
|
||||
proceed_with_methods: None,
|
||||
});
|
||||
}
|
||||
|
||||
let fingerprint = format!("{}", public_key.fingerprint(HashAlg::Sha256));
|
||||
let remote_addr_display = self
|
||||
.remote_addr
|
||||
.map_or("unknown".to_string(), |a| a.to_string());
|
||||
|
||||
let russh_pub = russh::keys::PublicKey::new(public_key.key_data().clone(), user);
|
||||
let result = self.auth_config.authenticate_publickey(&russh_pub);
|
||||
|
||||
match result {
|
||||
Ok(()) => {
|
||||
tracing::info!(
|
||||
remote_addr = %remote_addr_display,
|
||||
user = user,
|
||||
key_fingerprint = %fingerprint,
|
||||
result = "accept",
|
||||
"auth attempt"
|
||||
);
|
||||
Ok(Auth::Accept)
|
||||
}
|
||||
Err(_) => {
|
||||
self.auth_limiter.on_failure();
|
||||
tracing::info!(
|
||||
remote_addr = %remote_addr_display,
|
||||
user = user,
|
||||
key_fingerprint = %fingerprint,
|
||||
result = "reject",
|
||||
"auth attempt"
|
||||
);
|
||||
Ok(Auth::Reject {
|
||||
proceed_with_methods: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn channel_open_direct_tcpip(
|
||||
&mut self,
|
||||
channel: Channel<Msg>,
|
||||
host_to_connect: &str,
|
||||
port_to_connect: u32,
|
||||
originator_address: &str,
|
||||
originator_port: u32,
|
||||
_session: &mut Session,
|
||||
) -> Result<bool, Self::Error> {
|
||||
if host_to_connect.starts_with(WRAITH_PREFIX) {
|
||||
tracing::info!(
|
||||
host = host_to_connect,
|
||||
port = port_to_connect,
|
||||
"routing to internal control channel handler"
|
||||
);
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let proxy_info = self
|
||||
.outbound_proxy
|
||||
.as_ref()
|
||||
.map(|p| format!("{:?}", p.mode))
|
||||
.unwrap_or_else(|| "direct".to_string());
|
||||
|
||||
tracing::info!(
|
||||
host = host_to_connect,
|
||||
port = port_to_connect,
|
||||
originator_address = originator_address,
|
||||
originator_port = originator_port,
|
||||
proxy = %proxy_info,
|
||||
"spawning tcp proxy task"
|
||||
);
|
||||
|
||||
let _ = channel;
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
async fn channel_open_session(
|
||||
&mut self,
|
||||
_channel: Channel<Msg>,
|
||||
_session: &mut Session,
|
||||
) -> Result<bool, Self::Error> {
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
async fn channel_open_x11(
|
||||
&mut self,
|
||||
_channel: Channel<Msg>,
|
||||
_originator_address: &str,
|
||||
_originator_port: u32,
|
||||
_session: &mut Session,
|
||||
) -> Result<bool, Self::Error> {
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
async fn channel_open_forwarded_tcpip(
|
||||
&mut self,
|
||||
_channel: Channel<Msg>,
|
||||
_host_to_connect: &str,
|
||||
_port_to_connect: u32,
|
||||
_originator_address: &str,
|
||||
_originator_port: u32,
|
||||
_session: &mut Session,
|
||||
) -> Result<bool, Self::Error> {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::auth::keys::KeySource;
|
||||
use russh::keys::{decode_secret_key, PrivateKey};
|
||||
use std::io::Write;
|
||||
|
||||
const ED25519_PRIVATE_KEY: &str = "-----BEGIN OPENSSH PRIVATE KEY-----\nb3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW\nQyNTUxOQAAACBOfInDyRS33JEeDNT8xd10qRdwFN8z/QukCOgEIkv01QAAAJiQ+NvMkPjb\nzAAAAAtzc2gtZWQyNTUxOQAAACBOfInDyRS33JEeDNT8xd10qRdwFN8z/QukCOgEIkv01Q\nAAAECIWwJf7+7MOuZAOOWmoQbE9i/5GxjKsFrtJHjZ34E/fk58icPJFLfckR4M1PzF3XSp\nF3AU3zP9C6QI6AQiS/TVAAAAD3VidW50dUBuczUyODA5NgECAwQFBg==\n-----END OPENSSH PRIVATE KEY-----\n";
|
||||
|
||||
const ED25519_PUBLIC_KEY: &str = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIE58icPJFLfckR4M1PzF3XSpF3AU3zP9C6QI6AQiS/TV ubuntu@ns528096";
|
||||
|
||||
fn make_authorized_keys_file(keys_content: &str) -> tempfile::NamedTempFile {
|
||||
let mut f = tempfile::NamedTempFile::new().unwrap();
|
||||
f.write_all(keys_content.as_bytes()).unwrap();
|
||||
f.flush().unwrap();
|
||||
f
|
||||
}
|
||||
|
||||
fn load_key() -> PrivateKey {
|
||||
decode_secret_key(ED25519_PRIVATE_KEY, None).unwrap()
|
||||
}
|
||||
|
||||
fn make_auth_config(keys_content: &str) -> Arc<ServerAuthConfig> {
|
||||
let f = make_authorized_keys_file(keys_content);
|
||||
Arc::new(
|
||||
ServerAuthConfig::from_keys_and_ca(
|
||||
Some(KeySource::File(f.path().to_path_buf())),
|
||||
None,
|
||||
)
|
||||
.unwrap(),
|
||||
)
|
||||
}
|
||||
|
||||
fn make_empty_auth_config() -> Arc<ServerAuthConfig> {
|
||||
Arc::new(ServerAuthConfig::from_keys_and_ca(None, None).unwrap())
|
||||
}
|
||||
|
||||
fn default_limiter() -> Arc<ConnectionRateLimiter> {
|
||||
Arc::new(ConnectionRateLimiter::new(0))
|
||||
}
|
||||
|
||||
fn make_handler(
|
||||
auth_config: Arc<ServerAuthConfig>,
|
||||
outbound_proxy: Option<ProxyConfig>,
|
||||
remote_addr: Option<SocketAddr>,
|
||||
) -> ServerHandler {
|
||||
ServerHandler::new(auth_config, outbound_proxy, remote_addr, TransportKind::Tcp, default_limiter(), 10)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn auth_delegation_accepts_known_key() {
|
||||
let auth_config = make_auth_config(ED25519_PUBLIC_KEY);
|
||||
let mut handler = make_handler(auth_config, None, None);
|
||||
|
||||
let ssh_key = load_key().public_key().clone();
|
||||
let result = handler.auth_publickey("testuser", &ssh_key).await.unwrap();
|
||||
assert_eq!(result, Auth::Accept);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn auth_delegation_rejects_unknown_key() {
|
||||
let auth_config = make_auth_config(ED25519_PUBLIC_KEY);
|
||||
let mut handler = make_handler(auth_config, None, None);
|
||||
|
||||
let other_key_text = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIHeLC1lWiCYrXsf/85O/pkbUFZ6OGIt49PX3nw8iRoXE other@host";
|
||||
let other_ssh_key = russh::keys::parse_public_key_base64(
|
||||
other_key_text.split_whitespace().nth(1).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let result = handler
|
||||
.auth_publickey("testuser", &other_ssh_key)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
result,
|
||||
Auth::Reject {
|
||||
proceed_with_methods: None
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn auth_delegation_empty_config_rejects_all() {
|
||||
let auth_config = make_empty_auth_config();
|
||||
let mut handler = make_handler(auth_config, None, None);
|
||||
|
||||
let ssh_key = load_key().public_key().clone();
|
||||
let result = handler
|
||||
.auth_publickey("testuser", &ssh_key)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
result,
|
||||
Auth::Reject {
|
||||
proceed_with_methods: None
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn auth_logging_includes_remote_addr() {
|
||||
let auth_config = make_auth_config(ED25519_PUBLIC_KEY);
|
||||
let remote_addr: SocketAddr = "203.0.113.50:12345".parse().unwrap();
|
||||
let mut handler = make_handler(auth_config, None, Some(remote_addr));
|
||||
|
||||
let ssh_key = load_key().public_key().clone();
|
||||
let _ = handler.auth_publickey("root", &ssh_key).await.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reserved_wraith_destination_routing() {
|
||||
assert!("wraith-control".starts_with(WRAITH_PREFIX));
|
||||
assert!("wraith-status".starts_with(WRAITH_PREFIX));
|
||||
assert!("wraith-events".starts_with(WRAITH_PREFIX));
|
||||
assert!(!"example.com".starts_with(WRAITH_PREFIX));
|
||||
assert!(!"localhost".starts_with(WRAITH_PREFIX));
|
||||
assert!(!"wraith.example.com".starts_with(WRAITH_PREFIX));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn proxy_mode_variants() {
|
||||
let direct = ProxyMode::Direct;
|
||||
let socks5 = ProxyMode::Socks5("127.0.0.1:9050".parse().unwrap());
|
||||
let http = ProxyMode::HttpConnect("127.0.0.1:8080".parse().unwrap());
|
||||
|
||||
match direct {
|
||||
ProxyMode::Direct => {}
|
||||
_ => panic!("expected Direct"),
|
||||
}
|
||||
match socks5 {
|
||||
ProxyMode::Socks5(_) => {}
|
||||
_ => panic!("expected Socks5"),
|
||||
}
|
||||
match http {
|
||||
ProxyMode::HttpConnect(_) => {}
|
||||
_ => panic!("expected HttpConnect"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn server_handler_holds_config() {
|
||||
let auth_config = make_empty_auth_config();
|
||||
let proxy = Some(ProxyConfig {
|
||||
mode: ProxyMode::Socks5("127.0.0.1:9050".parse().unwrap()),
|
||||
});
|
||||
let remote: Option<SocketAddr> = Some("10.0.0.1:22".parse().unwrap());
|
||||
|
||||
let handler = make_handler(auth_config, proxy.clone(), remote);
|
||||
assert!(handler.outbound_proxy.is_some());
|
||||
assert!(handler.remote_addr.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn one_handler_per_connection() {
|
||||
let auth_config = make_empty_auth_config();
|
||||
let handler1 = make_handler(auth_config.clone(), None, Some("10.0.0.1:22".parse().unwrap()));
|
||||
let handler2 = make_handler(auth_config.clone(), None, Some("10.0.0.2:22".parse().unwrap()));
|
||||
|
||||
assert!(handler1.remote_addr != handler2.remote_addr);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn auth_rate_limit_rejects_after_max_failures() {
|
||||
let auth_config = make_empty_auth_config();
|
||||
let limiter = Arc::new(ConnectionRateLimiter::new(0));
|
||||
let mut handler = ServerHandler::new(
|
||||
auth_config,
|
||||
None,
|
||||
Some("10.0.0.1:22".parse().unwrap()),
|
||||
TransportKind::Tcp,
|
||||
limiter,
|
||||
2,
|
||||
);
|
||||
|
||||
let ssh_key = load_key().public_key().clone();
|
||||
|
||||
let r1 = handler.auth_publickey("user", &ssh_key).await.unwrap();
|
||||
assert_eq!(r1, Auth::Reject { proceed_with_methods: None });
|
||||
|
||||
let r2 = handler.auth_publickey("user", &ssh_key).await.unwrap();
|
||||
assert_eq!(r2, Auth::Reject { proceed_with_methods: None });
|
||||
|
||||
assert!(!handler.auth_limiter.check());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connection_rate_limit_blocks_over_limit() {
|
||||
let limiter = Arc::new(ConnectionRateLimiter::new(1));
|
||||
let auth_config = make_empty_auth_config();
|
||||
let addr: SocketAddr = "10.0.0.1:22".parse().unwrap();
|
||||
|
||||
let h1 = ServerHandler::new(
|
||||
auth_config.clone(),
|
||||
None,
|
||||
Some(addr),
|
||||
TransportKind::Tcp,
|
||||
limiter.clone(),
|
||||
10,
|
||||
);
|
||||
assert!(h1.is_connection_allowed());
|
||||
|
||||
let h2 = ServerHandler::new(
|
||||
auth_config.clone(),
|
||||
None,
|
||||
Some(addr),
|
||||
TransportKind::Tcp,
|
||||
limiter.clone(),
|
||||
10,
|
||||
);
|
||||
assert!(!h2.is_connection_allowed());
|
||||
|
||||
drop(h1);
|
||||
|
||||
let h3 = ServerHandler::new(
|
||||
auth_config,
|
||||
None,
|
||||
Some(addr),
|
||||
TransportKind::Tcp,
|
||||
limiter,
|
||||
10,
|
||||
);
|
||||
assert!(h3.is_connection_allowed());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn transport_kind_display() {
|
||||
assert_eq!(TransportKind::Tcp.to_string(), "tcp");
|
||||
assert_eq!(TransportKind::Tls.to_string(), "tls");
|
||||
assert_eq!(TransportKind::Iroh.to_string(), "iroh");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn auth_log_includes_user_field() {
|
||||
let auth_config = make_empty_auth_config();
|
||||
let mut handler = ServerHandler::new(
|
||||
auth_config,
|
||||
None,
|
||||
Some("203.0.113.50:12345".parse().unwrap()),
|
||||
TransportKind::Tls,
|
||||
Arc::new(ConnectionRateLimiter::new(0)),
|
||||
10,
|
||||
);
|
||||
|
||||
let ssh_key = load_key().public_key().clone();
|
||||
let _ = handler.auth_publickey("root", &ssh_key).await.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connection_closed_logs_duration_on_drop() {
|
||||
let auth_config = make_empty_auth_config();
|
||||
let _handler = ServerHandler::new(
|
||||
auth_config,
|
||||
None,
|
||||
Some("203.0.113.50:12345".parse().unwrap()),
|
||||
TransportKind::Tcp,
|
||||
Arc::new(ConnectionRateLimiter::new(0)),
|
||||
10,
|
||||
);
|
||||
}
|
||||
}
|
||||
5
crates/wraith-core/src/server/mod.rs
Normal file
5
crates/wraith-core/src/server/mod.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
pub mod handler;
|
||||
pub mod rate_limit;
|
||||
|
||||
pub use handler::{ProxyConfig, ProxyMode, ServerHandler, TransportKind};
|
||||
pub use rate_limit::{AuthAttemptLimiter, ConnectionRateLimiter};
|
||||
193
crates/wraith-core/src/server/rate_limit.rs
Normal file
193
crates/wraith-core/src/server/rate_limit.rs
Normal file
@@ -0,0 +1,193 @@
|
||||
use std::collections::HashMap;
|
||||
use std::net::IpAddr;
|
||||
use std::sync::Mutex;
|
||||
|
||||
pub struct ConnectionRateLimiter {
|
||||
max_per_ip: usize,
|
||||
active: Mutex<HashMap<IpAddr, usize>>,
|
||||
}
|
||||
|
||||
impl ConnectionRateLimiter {
|
||||
pub fn new(max_per_ip: usize) -> Self {
|
||||
Self {
|
||||
max_per_ip,
|
||||
active: Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn check(&self, ip: IpAddr) -> bool {
|
||||
if self.max_per_ip == 0 {
|
||||
return true;
|
||||
}
|
||||
let active = self.active.lock().unwrap();
|
||||
let count = active.get(&ip).copied().unwrap_or(0);
|
||||
count < self.max_per_ip
|
||||
}
|
||||
|
||||
pub fn on_connect(&self, ip: IpAddr) {
|
||||
let mut active = self.active.lock().unwrap();
|
||||
*active.entry(ip).or_insert(0) += 1;
|
||||
}
|
||||
|
||||
pub fn on_disconnect(&self, ip: IpAddr) {
|
||||
let mut active = self.active.lock().unwrap();
|
||||
if let Some(count) = active.get_mut(&ip) {
|
||||
if *count > 1 {
|
||||
*count -= 1;
|
||||
} else {
|
||||
active.remove(&ip);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AuthAttemptLimiter {
|
||||
max_attempts: usize,
|
||||
failures: usize,
|
||||
}
|
||||
|
||||
impl AuthAttemptLimiter {
|
||||
pub fn new(max_attempts: usize) -> Self {
|
||||
Self {
|
||||
max_attempts,
|
||||
failures: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn check(&self) -> bool {
|
||||
if self.max_attempts == 0 {
|
||||
return true;
|
||||
}
|
||||
self.failures < self.max_attempts
|
||||
}
|
||||
|
||||
pub fn on_failure(&mut self) {
|
||||
self.failures += 1;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
|
||||
|
||||
fn ip(n: u8) -> IpAddr {
|
||||
IpAddr::V4(Ipv4Addr::new(192, 168, 1, n))
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connection_limiter_allows_when_under_limit() {
|
||||
let limiter = ConnectionRateLimiter::new(3);
|
||||
assert!(limiter.check(ip(1)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connection_limiter_blocks_when_at_limit() {
|
||||
let limiter = ConnectionRateLimiter::new(2);
|
||||
limiter.on_connect(ip(1));
|
||||
limiter.on_connect(ip(1));
|
||||
assert!(!limiter.check(ip(1)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connection_limiter_allows_after_disconnect() {
|
||||
let limiter = ConnectionRateLimiter::new(2);
|
||||
limiter.on_connect(ip(1));
|
||||
limiter.on_connect(ip(1));
|
||||
assert!(!limiter.check(ip(1)));
|
||||
limiter.on_disconnect(ip(1));
|
||||
assert!(limiter.check(ip(1)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connection_limiter_unlimited_when_zero() {
|
||||
let limiter = ConnectionRateLimiter::new(0);
|
||||
for _ in 0..100 {
|
||||
limiter.on_connect(ip(1));
|
||||
}
|
||||
assert!(limiter.check(ip(1)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connection_limiter_tracks_per_ip_independently() {
|
||||
let limiter = ConnectionRateLimiter::new(1);
|
||||
limiter.on_connect(ip(1));
|
||||
assert!(!limiter.check(ip(1)));
|
||||
assert!(limiter.check(ip(2)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connection_limiter_ipv6() {
|
||||
let limiter = ConnectionRateLimiter::new(1);
|
||||
let ip6 = IpAddr::V6(Ipv6Addr::LOCALHOST);
|
||||
limiter.on_connect(ip6);
|
||||
assert!(!limiter.check(ip6));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connection_limiter_disconnect_removes_zero_entry() {
|
||||
let limiter = ConnectionRateLimiter::new(3);
|
||||
limiter.on_connect(ip(1));
|
||||
limiter.on_disconnect(ip(1));
|
||||
{
|
||||
let active = limiter.active.lock().unwrap();
|
||||
assert!(!active.contains_key(&ip(1)));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn auth_limiter_allows_when_under_limit() {
|
||||
let limiter = AuthAttemptLimiter::new(3);
|
||||
assert!(limiter.check());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn auth_limiter_blocks_after_max_failures() {
|
||||
let mut limiter = AuthAttemptLimiter::new(2);
|
||||
limiter.on_failure();
|
||||
limiter.on_failure();
|
||||
assert!(!limiter.check());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn auth_limiter_unlimited_when_zero() {
|
||||
let mut limiter = AuthAttemptLimiter::new(0);
|
||||
for _ in 0..100 {
|
||||
limiter.on_failure();
|
||||
}
|
||||
assert!(limiter.check());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn auth_limiter_still_allows_at_one_below_limit() {
|
||||
let mut limiter = AuthAttemptLimiter::new(3);
|
||||
limiter.on_failure();
|
||||
limiter.on_failure();
|
||||
assert!(limiter.check());
|
||||
limiter.on_failure();
|
||||
assert!(!limiter.check());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connection_limiter_thread_safety() {
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
|
||||
let limiter = Arc::new(ConnectionRateLimiter::new(100));
|
||||
let mut handles = vec![];
|
||||
|
||||
for i in 0..10 {
|
||||
let lim = Arc::clone(&limiter);
|
||||
handles.push(thread::spawn(move || {
|
||||
let ip_addr = ip((i % 3) as u8 + 1);
|
||||
lim.on_connect(ip_addr);
|
||||
assert!(lim.check(ip_addr));
|
||||
lim.on_disconnect(ip_addr);
|
||||
}));
|
||||
}
|
||||
|
||||
for h in handles {
|
||||
h.join().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
490
crates/wraith-core/src/socks5/mod.rs
Normal file
490
crates/wraith-core/src/socks5/mod.rs
Normal file
@@ -0,0 +1,490 @@
|
||||
mod protocol;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::debug;
|
||||
|
||||
use protocol::{Socks5Reply, Socks5Request, Socks5VersionMethod};
|
||||
|
||||
pub use protocol::Socks5Address;
|
||||
|
||||
const DEFAULT_SOCKS5_ADDR: &str = "127.0.0.1:1080";
|
||||
|
||||
pub trait ChannelOpener: Send + Sync + 'static {
|
||||
type Stream: AsyncRead + AsyncWrite + Unpin + Send + 'static;
|
||||
|
||||
fn open_channel(
|
||||
&self,
|
||||
host: String,
|
||||
port: u16,
|
||||
) -> impl std::future::Future<Output = Result<Self::Stream, ChannelOpenError>> + Send;
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ChannelOpenError {
|
||||
#[error("session closed")]
|
||||
SessionClosed,
|
||||
#[error("channel open failed")]
|
||||
ChannelOpenFailed,
|
||||
#[error("connection refused")]
|
||||
ConnectionRefused,
|
||||
}
|
||||
|
||||
pub struct Socks5Server<C: ChannelOpener> {
|
||||
listen_addr: SocketAddr,
|
||||
channel_opener: Arc<C>,
|
||||
}
|
||||
|
||||
impl<C: ChannelOpener> Socks5Server<C> {
|
||||
pub fn new(channel_opener: C) -> Self {
|
||||
Self::with_addr(channel_opener, DEFAULT_SOCKS5_ADDR)
|
||||
}
|
||||
|
||||
pub fn with_addr(channel_opener: C, addr: &str) -> Self {
|
||||
let listen_addr: SocketAddr = addr
|
||||
.parse()
|
||||
.expect("invalid SOCKS5 listen address");
|
||||
Self {
|
||||
listen_addr,
|
||||
channel_opener: Arc::new(channel_opener),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn listen_addr(&self) -> SocketAddr {
|
||||
self.listen_addr
|
||||
}
|
||||
|
||||
pub async fn run(self) -> Result<(), std::io::Error> {
|
||||
let listener = TcpListener::bind(self.listen_addr).await?;
|
||||
debug!("socks5 server listening on {}", self.listen_addr);
|
||||
loop {
|
||||
let (socket, _peer) = listener.accept().await?;
|
||||
let opener = Arc::clone(&self.channel_opener);
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = handle_socks5_connection(socket, opener).await {
|
||||
debug!("socks5 connection error: {e}");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_socks5_connection<S, C>(
|
||||
mut socket: S,
|
||||
opener: Arc<C>,
|
||||
) -> Result<(), Socks5Error>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
C: ChannelOpener,
|
||||
{
|
||||
let vm = Socks5VersionMethod::read_from(&mut socket).await?;
|
||||
if vm.version != 0x05 {
|
||||
return Err(Socks5Error::InvalidVersion(vm.version));
|
||||
}
|
||||
if !vm.methods.contains(&0x00) {
|
||||
let reply = [0x05, 0xFF];
|
||||
socket.write_all(&reply).await?;
|
||||
socket.shutdown().await?;
|
||||
return Err(Socks5Error::NoAcceptableAuth);
|
||||
}
|
||||
let reply = [0x05, 0x00];
|
||||
socket.write_all(&reply).await?;
|
||||
|
||||
let request = Socks5Request::read_from(&mut socket).await?;
|
||||
if request.version != 0x05 {
|
||||
return Err(Socks5Error::InvalidVersion(request.version));
|
||||
}
|
||||
if request.command != 0x01 {
|
||||
send_error_reply(&mut socket, Socks5Reply::command_not_supported()).await?;
|
||||
return Err(Socks5Error::UnsupportedCommand(request.command));
|
||||
}
|
||||
|
||||
let (host, port) = match &request.address {
|
||||
Socks5Address::Ipv4(addr) => (addr.to_string(), request.port),
|
||||
Socks5Address::Ipv6(addr) => (addr.to_string(), request.port),
|
||||
Socks5Address::Domain(name) => (name.clone(), request.port),
|
||||
};
|
||||
|
||||
match opener.open_channel(host, port).await {
|
||||
Ok(mut ssh_stream) => {
|
||||
let bind_addr = Socks5Address::Ipv4(std::net::Ipv4Addr::UNSPECIFIED);
|
||||
let reply = Socks5Reply::success(bind_addr, 0);
|
||||
reply.write_to(&mut socket).await?;
|
||||
tokio::io::copy_bidirectional(&mut socket, &mut ssh_stream).await?;
|
||||
Ok(())
|
||||
}
|
||||
Err(_) => {
|
||||
send_error_reply(&mut socket, Socks5Reply::connection_refused()).await?;
|
||||
Err(Socks5Error::ChannelOpenFailed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_error_reply<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
socket: &mut S,
|
||||
reply: Socks5Reply,
|
||||
) -> Result<(), Socks5Error> {
|
||||
reply.write_to(socket).await?;
|
||||
let _ = socket.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum Socks5Error {
|
||||
#[error("invalid SOCKS version: {0}")]
|
||||
InvalidVersion(u8),
|
||||
#[error("no acceptable auth method")]
|
||||
NoAcceptableAuth,
|
||||
#[error("unsupported command: {0}")]
|
||||
UnsupportedCommand(u8),
|
||||
#[error("channel open failed")]
|
||||
ChannelOpenFailed,
|
||||
#[error("io error")]
|
||||
Io(#[from] std::io::Error),
|
||||
}
|
||||
|
||||
pub struct HandleChannelOpener<H: russh::client::Handler> {
|
||||
handle: Arc<Mutex<russh::client::Handle<H>>>,
|
||||
}
|
||||
|
||||
impl<H: russh::client::Handler> HandleChannelOpener<H> {
|
||||
pub fn new(handle: russh::client::Handle<H>) -> Self {
|
||||
Self {
|
||||
handle: Arc::new(Mutex::new(handle)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_arc(handle: Arc<Mutex<russh::client::Handle<H>>>) -> Self {
|
||||
Self { handle }
|
||||
}
|
||||
}
|
||||
|
||||
impl<H: russh::client::Handler + Send + Sync + 'static> ChannelOpener for HandleChannelOpener<H> {
|
||||
type Stream = russh::ChannelStream<russh::client::Msg>;
|
||||
|
||||
async fn open_channel(&self, host: String, port: u16) -> Result<Self::Stream, ChannelOpenError> {
|
||||
let handle = self.handle.lock().await;
|
||||
if handle.is_closed() {
|
||||
return Err(ChannelOpenError::SessionClosed);
|
||||
}
|
||||
let channel = handle
|
||||
.channel_open_direct_tcpip(host, port as u32, "127.0.0.1", 0)
|
||||
.await
|
||||
.map_err(|_| ChannelOpenError::ChannelOpenFailed)?;
|
||||
Ok(channel.into_stream())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream};
|
||||
|
||||
struct MockChannelOpener {
|
||||
fail: bool,
|
||||
}
|
||||
|
||||
impl ChannelOpener for MockChannelOpener {
|
||||
type Stream = DuplexStream;
|
||||
|
||||
async fn open_channel(
|
||||
&self,
|
||||
_host: String,
|
||||
_port: u16,
|
||||
) -> Result<Self::Stream, ChannelOpenError> {
|
||||
if self.fail {
|
||||
Err(ChannelOpenError::ChannelOpenFailed)
|
||||
} else {
|
||||
let (client, _server) = duplex(4096);
|
||||
Ok(client)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn build_socks5_greeting(methods: &[u8]) -> Vec<u8> {
|
||||
let mut buf = vec![0x05, methods.len() as u8];
|
||||
buf.extend_from_slice(methods);
|
||||
buf
|
||||
}
|
||||
|
||||
fn build_socks5_connect_ipv4(addr: [u8; 4], port: u16) -> Vec<u8> {
|
||||
let mut buf = vec![0x05, 0x01, 0x00, 0x01];
|
||||
buf.extend_from_slice(&addr);
|
||||
buf.extend_from_slice(&port.to_be_bytes());
|
||||
buf
|
||||
}
|
||||
|
||||
fn build_socks5_connect_domain(domain: &str, port: u16) -> Vec<u8> {
|
||||
let mut buf = vec![0x05, 0x01, 0x00, 0x03];
|
||||
buf.push(domain.len() as u8);
|
||||
buf.extend_from_slice(domain.as_bytes());
|
||||
buf.extend_from_slice(&port.to_be_bytes());
|
||||
buf
|
||||
}
|
||||
|
||||
fn build_socks5_connect_ipv6(addr: [u8; 16], port: u16) -> Vec<u8> {
|
||||
let mut buf = vec![0x05, 0x01, 0x00, 0x04];
|
||||
buf.extend_from_slice(&addr);
|
||||
buf.extend_from_slice(&port.to_be_bytes());
|
||||
buf
|
||||
}
|
||||
|
||||
async fn do_handshake(client: &mut DuplexStream) -> [u8; 2] {
|
||||
client.write_all(&build_socks5_greeting(&[0x00])).await.unwrap();
|
||||
client.flush().await.unwrap();
|
||||
let mut resp = [0u8; 2];
|
||||
client.read_exact(&mut resp).await.unwrap();
|
||||
resp
|
||||
}
|
||||
|
||||
async fn do_connect_ipv4(client: &mut DuplexStream, addr: [u8; 4], port: u16) -> Vec<u8> {
|
||||
client
|
||||
.write_all(&build_socks5_connect_ipv4(addr, port))
|
||||
.await
|
||||
.unwrap();
|
||||
client.flush().await.unwrap();
|
||||
let mut reply_buf = [0u8; 10];
|
||||
client.read_exact(&mut reply_buf).await.unwrap();
|
||||
reply_buf.to_vec()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn handshake_no_auth_method() {
|
||||
let (mut client, server) = duplex(4096);
|
||||
let opener = MockChannelOpener { fail: false };
|
||||
|
||||
let server_handle = tokio::spawn(async move {
|
||||
handle_socks5_connection(server, Arc::new(opener)).await
|
||||
});
|
||||
|
||||
let resp = do_handshake(&mut client).await;
|
||||
assert_eq!(resp, [0x05, 0x00]);
|
||||
|
||||
let reply_buf = do_connect_ipv4(&mut client, [127, 0, 0, 1], 80).await;
|
||||
assert_eq!(reply_buf[0], 0x05);
|
||||
assert_eq!(reply_buf[1], 0x00);
|
||||
|
||||
drop(client);
|
||||
let _ = server_handle.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn handshake_rejects_no_acceptable_method() {
|
||||
let (mut client, server) = duplex(4096);
|
||||
let opener = MockChannelOpener { fail: false };
|
||||
|
||||
let server_handle = tokio::spawn(async move {
|
||||
handle_socks5_connection(server, Arc::new(opener)).await
|
||||
});
|
||||
|
||||
client
|
||||
.write_all(&build_socks5_greeting(&[0x02]))
|
||||
.await
|
||||
.unwrap();
|
||||
client.flush().await.unwrap();
|
||||
|
||||
let mut resp = [0u8; 2];
|
||||
client.read_exact(&mut resp).await.unwrap();
|
||||
assert_eq!(resp, [0x05, 0xFF]);
|
||||
|
||||
drop(client);
|
||||
let result = server_handle.await.unwrap();
|
||||
assert!(result.is_err());
|
||||
assert!(matches!(
|
||||
result.unwrap_err(),
|
||||
Socks5Error::NoAcceptableAuth
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn address_type_ipv4() {
|
||||
let (mut client, server) = duplex(4096);
|
||||
let opener = MockChannelOpener { fail: false };
|
||||
|
||||
let server_handle = tokio::spawn(async move {
|
||||
handle_socks5_connection(server, Arc::new(opener)).await
|
||||
});
|
||||
|
||||
do_handshake(&mut client).await;
|
||||
let reply_buf = do_connect_ipv4(&mut client, [10, 0, 0, 1], 443).await;
|
||||
assert_eq!(reply_buf[1], 0x00);
|
||||
|
||||
drop(client);
|
||||
let _ = server_handle.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn address_type_domain() {
|
||||
let (mut client, server) = duplex(4096);
|
||||
let opener = MockChannelOpener { fail: false };
|
||||
|
||||
let server_handle = tokio::spawn(async move {
|
||||
handle_socks5_connection(server, Arc::new(opener)).await
|
||||
});
|
||||
|
||||
do_handshake(&mut client).await;
|
||||
|
||||
client
|
||||
.write_all(&build_socks5_connect_domain("example.com", 443))
|
||||
.await
|
||||
.unwrap();
|
||||
client.flush().await.unwrap();
|
||||
|
||||
let mut reply_buf = [0u8; 10];
|
||||
client.read_exact(&mut reply_buf).await.unwrap();
|
||||
assert_eq!(reply_buf[1], 0x00);
|
||||
|
||||
drop(client);
|
||||
let _ = server_handle.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn address_type_ipv6() {
|
||||
let (mut client, server) = duplex(4096);
|
||||
let opener = MockChannelOpener { fail: false };
|
||||
|
||||
let server_handle = tokio::spawn(async move {
|
||||
handle_socks5_connection(server, Arc::new(opener)).await
|
||||
});
|
||||
|
||||
do_handshake(&mut client).await;
|
||||
|
||||
let ipv6_addr = [0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1];
|
||||
client
|
||||
.write_all(&build_socks5_connect_ipv6(ipv6_addr, 443))
|
||||
.await
|
||||
.unwrap();
|
||||
client.flush().await.unwrap();
|
||||
|
||||
let mut reply_buf = [0u8; 10];
|
||||
client.read_exact(&mut reply_buf).await.unwrap();
|
||||
assert_eq!(reply_buf[0], 0x05);
|
||||
assert_eq!(reply_buf[1], 0x00);
|
||||
|
||||
drop(client);
|
||||
let _ = server_handle.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn channel_open_failure_returns_socks5_error() {
|
||||
let (mut client, server) = duplex(4096);
|
||||
let opener = MockChannelOpener { fail: true };
|
||||
|
||||
let server_handle = tokio::spawn(async move {
|
||||
handle_socks5_connection(server, Arc::new(opener)).await
|
||||
});
|
||||
|
||||
do_handshake(&mut client).await;
|
||||
let reply_buf = do_connect_ipv4(&mut client, [10, 0, 0, 1], 80).await;
|
||||
assert_eq!(reply_buf[0], 0x05);
|
||||
assert_eq!(reply_buf[1], 0x05);
|
||||
|
||||
drop(client);
|
||||
let _ = server_handle.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unsupported_command_returns_error() {
|
||||
let (mut client, server) = duplex(4096);
|
||||
let opener = MockChannelOpener { fail: false };
|
||||
|
||||
let server_handle = tokio::spawn(async move {
|
||||
handle_socks5_connection(server, Arc::new(opener)).await
|
||||
});
|
||||
|
||||
do_handshake(&mut client).await;
|
||||
|
||||
let mut bind_req = vec![0x05, 0x02, 0x00, 0x01];
|
||||
bind_req.extend_from_slice(&[127, 0, 0, 1]);
|
||||
bind_req.extend_from_slice(&80u16.to_be_bytes());
|
||||
client.write_all(&bind_req).await.unwrap();
|
||||
client.flush().await.unwrap();
|
||||
|
||||
let mut reply_buf = [0u8; 10];
|
||||
client.read_exact(&mut reply_buf).await.unwrap();
|
||||
assert_eq!(reply_buf[1], 0x07);
|
||||
|
||||
drop(client);
|
||||
let _ = server_handle.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn bidirectional_proxy_flow() {
|
||||
let (mut client_sock, server_sock) = duplex(4096);
|
||||
let (ssh_client, mut ssh_server) = duplex(4096);
|
||||
|
||||
let ssh_stream = Arc::new(Mutex::new(Some(ssh_client)));
|
||||
|
||||
struct ProxyOpener {
|
||||
stream: Arc<Mutex<Option<DuplexStream>>>,
|
||||
}
|
||||
|
||||
impl ChannelOpener for ProxyOpener {
|
||||
type Stream = DuplexStream;
|
||||
|
||||
async fn open_channel(
|
||||
&self,
|
||||
_host: String,
|
||||
_port: u16,
|
||||
) -> Result<Self::Stream, ChannelOpenError> {
|
||||
self.stream
|
||||
.lock()
|
||||
.await
|
||||
.take()
|
||||
.ok_or(ChannelOpenError::ChannelOpenFailed)
|
||||
}
|
||||
}
|
||||
|
||||
let opener = ProxyOpener {
|
||||
stream: Arc::clone(&ssh_stream),
|
||||
};
|
||||
|
||||
let server_handle = tokio::spawn(async move {
|
||||
handle_socks5_connection(server_sock, Arc::new(opener)).await
|
||||
});
|
||||
|
||||
do_handshake(&mut client_sock).await;
|
||||
let reply_buf = do_connect_ipv4(&mut client_sock, [127, 0, 0, 1], 80).await;
|
||||
assert_eq!(reply_buf[1], 0x00);
|
||||
|
||||
let test_data = b"hello through tunnel";
|
||||
client_sock.write_all(test_data).await.unwrap();
|
||||
client_sock.flush().await.unwrap();
|
||||
|
||||
let mut received = vec![0u8; test_data.len()];
|
||||
AsyncReadExt::read_exact(&mut ssh_server, &mut received)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(&received, test_data);
|
||||
|
||||
let echo_data = b"response from tunnel";
|
||||
ssh_server.write_all(echo_data).await.unwrap();
|
||||
ssh_server.flush().await.unwrap();
|
||||
|
||||
let mut received_back = vec![0u8; echo_data.len()];
|
||||
client_sock.read_exact(&mut received_back).await.unwrap();
|
||||
assert_eq!(&received_back, echo_data);
|
||||
|
||||
drop(client_sock);
|
||||
drop(ssh_server);
|
||||
let _ = server_handle.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn default_listen_address() {
|
||||
let opener = MockChannelOpener { fail: false };
|
||||
let server = Socks5Server::new(opener);
|
||||
assert_eq!(server.listen_addr(), "127.0.0.1:1080".parse().unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn custom_listen_address() {
|
||||
let opener = MockChannelOpener { fail: false };
|
||||
let server = Socks5Server::with_addr(opener, "127.0.0.1:9050");
|
||||
assert_eq!(server.listen_addr(), "127.0.0.1:9050".parse().unwrap());
|
||||
}
|
||||
}
|
||||
304
crates/wraith-core/src/socks5/protocol.rs
Normal file
304
crates/wraith-core/src/socks5/protocol.rs
Normal file
@@ -0,0 +1,304 @@
|
||||
use std::net::{Ipv4Addr, Ipv6Addr};
|
||||
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum Socks5Address {
|
||||
Ipv4(Ipv4Addr),
|
||||
Ipv6(Ipv6Addr),
|
||||
Domain(String),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Socks5VersionMethod {
|
||||
pub version: u8,
|
||||
pub methods: Vec<u8>,
|
||||
}
|
||||
|
||||
impl Socks5VersionMethod {
|
||||
pub async fn read_from<R: AsyncRead + Unpin>(reader: &mut R) -> std::io::Result<Self> {
|
||||
let version = reader.read_u8().await?;
|
||||
let nmethods = reader.read_u8().await?;
|
||||
let mut methods = vec![0u8; nmethods as usize];
|
||||
reader.read_exact(&mut methods).await?;
|
||||
Ok(Self { version, methods })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Socks5Request {
|
||||
pub version: u8,
|
||||
pub command: u8,
|
||||
pub address: Socks5Address,
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
impl Socks5Request {
|
||||
pub async fn read_from<R: AsyncRead + Unpin>(reader: &mut R) -> std::io::Result<Self> {
|
||||
let version = reader.read_u8().await?;
|
||||
let command = reader.read_u8().await?;
|
||||
let _rsv = reader.read_u8().await?;
|
||||
let atyp = reader.read_u8().await?;
|
||||
|
||||
let address = match atyp {
|
||||
0x01 => {
|
||||
let mut octets = [0u8; 4];
|
||||
reader.read_exact(&mut octets).await?;
|
||||
Socks5Address::Ipv4(Ipv4Addr::from(octets))
|
||||
}
|
||||
0x04 => {
|
||||
let mut octets = [0u8; 16];
|
||||
reader.read_exact(&mut octets).await?;
|
||||
Socks5Address::Ipv6(Ipv6Addr::from(octets))
|
||||
}
|
||||
0x03 => {
|
||||
let len = reader.read_u8().await?;
|
||||
let mut domain = vec![0u8; len as usize];
|
||||
reader.read_exact(&mut domain).await?;
|
||||
Socks5Address::Domain(String::from_utf8_lossy(&domain).into_owned())
|
||||
}
|
||||
_ => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("unsupported address type: {atyp}"),
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
let port = reader.read_u16().await?;
|
||||
|
||||
Ok(Self {
|
||||
version,
|
||||
command,
|
||||
address,
|
||||
port,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Socks5Reply {
|
||||
pub version: u8,
|
||||
pub reply: u8,
|
||||
pub address: Socks5Address,
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
impl Socks5Reply {
|
||||
pub fn success(address: Socks5Address, port: u16) -> Self {
|
||||
Self {
|
||||
version: 0x05,
|
||||
reply: 0x00,
|
||||
address,
|
||||
port,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn connection_refused() -> Self {
|
||||
Self {
|
||||
version: 0x05,
|
||||
reply: 0x05,
|
||||
address: Socks5Address::Ipv4(Ipv4Addr::UNSPECIFIED),
|
||||
port: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn command_not_supported() -> Self {
|
||||
Self {
|
||||
version: 0x05,
|
||||
reply: 0x07,
|
||||
address: Socks5Address::Ipv4(Ipv4Addr::UNSPECIFIED),
|
||||
port: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn write_to<W: AsyncWrite + Unpin>(&self, writer: &mut W) -> std::io::Result<()> {
|
||||
writer.write_u8(self.version).await?;
|
||||
writer.write_u8(self.reply).await?;
|
||||
writer.write_u8(0x00).await?;
|
||||
match &self.address {
|
||||
Socks5Address::Ipv4(addr) => {
|
||||
writer.write_u8(0x01).await?;
|
||||
writer.write_all(&addr.octets()).await?;
|
||||
}
|
||||
Socks5Address::Ipv6(addr) => {
|
||||
writer.write_u8(0x04).await?;
|
||||
writer.write_all(&addr.octets()).await?;
|
||||
}
|
||||
Socks5Address::Domain(name) => {
|
||||
writer.write_u8(0x03).await?;
|
||||
writer.write_u8(name.len() as u8).await?;
|
||||
writer.write_all(name.as_bytes()).await?;
|
||||
}
|
||||
}
|
||||
writer.write_u16(self.port).await?;
|
||||
writer.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::io::Cursor;
|
||||
|
||||
#[tokio::test]
|
||||
async fn parse_version_method_no_auth() {
|
||||
let data = [0x05, 0x01, 0x00];
|
||||
let mut cursor = Cursor::new(&data[..]);
|
||||
let vm = Socks5VersionMethod::read_from(&mut cursor).await.unwrap();
|
||||
assert_eq!(vm.version, 0x05);
|
||||
assert_eq!(vm.methods, vec![0x00]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn parse_version_method_multiple() {
|
||||
let data = [0x05, 0x02, 0x00, 0x02];
|
||||
let mut cursor = Cursor::new(&data[..]);
|
||||
let vm = Socks5VersionMethod::read_from(&mut cursor).await.unwrap();
|
||||
assert_eq!(vm.version, 0x05);
|
||||
assert_eq!(vm.methods, vec![0x00, 0x02]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn parse_request_ipv4() {
|
||||
let mut data = vec![0x05, 0x01, 0x00, 0x01];
|
||||
data.extend_from_slice(&[10, 0, 0, 1]);
|
||||
data.extend_from_slice(&443u16.to_be_bytes());
|
||||
let mut cursor = Cursor::new(&data[..]);
|
||||
let req = Socks5Request::read_from(&mut cursor).await.unwrap();
|
||||
assert_eq!(req.version, 0x05);
|
||||
assert_eq!(req.command, 0x01);
|
||||
assert_eq!(
|
||||
req.address,
|
||||
Socks5Address::Ipv4(Ipv4Addr::new(10, 0, 0, 1))
|
||||
);
|
||||
assert_eq!(req.port, 443);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn parse_request_ipv6() {
|
||||
let mut data = vec![0x05, 0x01, 0x00, 0x04];
|
||||
let octets: [u8; 16] = [0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1];
|
||||
data.extend_from_slice(&octets);
|
||||
data.extend_from_slice(&443u16.to_be_bytes());
|
||||
let mut cursor = Cursor::new(&data[..]);
|
||||
let req = Socks5Request::read_from(&mut cursor).await.unwrap();
|
||||
assert_eq!(req.version, 0x05);
|
||||
assert_eq!(req.command, 0x01);
|
||||
assert!(matches!(req.address, Socks5Address::Ipv6(_)));
|
||||
assert_eq!(req.port, 443);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn parse_request_domain() {
|
||||
let domain = "example.com";
|
||||
let mut data = vec![0x05, 0x01, 0x00, 0x03];
|
||||
data.push(domain.len() as u8);
|
||||
data.extend_from_slice(domain.as_bytes());
|
||||
data.extend_from_slice(&443u16.to_be_bytes());
|
||||
let mut cursor = Cursor::new(&data[..]);
|
||||
let req = Socks5Request::read_from(&mut cursor).await.unwrap();
|
||||
assert_eq!(req.version, 0x05);
|
||||
assert_eq!(req.command, 0x01);
|
||||
assert_eq!(req.address, Socks5Address::Domain("example.com".to_string()));
|
||||
assert_eq!(req.port, 443);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn parse_request_unsupported_address_type() {
|
||||
let data = [0x05, 0x01, 0x00, 0x05];
|
||||
let mut cursor = Cursor::new(&data[..]);
|
||||
let result = Socks5Request::read_from(&mut cursor).await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reply_success_ipv4() {
|
||||
let reply = Socks5Reply::success(Socks5Address::Ipv4(Ipv4Addr::UNSPECIFIED), 0);
|
||||
let mut buf = Vec::new();
|
||||
reply.write_to(&mut buf).await.unwrap();
|
||||
assert_eq!(buf[0], 0x05);
|
||||
assert_eq!(buf[1], 0x00);
|
||||
assert_eq!(buf[2], 0x00);
|
||||
assert_eq!(buf[3], 0x01);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reply_connection_refused() {
|
||||
let reply = Socks5Reply::connection_refused();
|
||||
let mut buf = Vec::new();
|
||||
reply.write_to(&mut buf).await.unwrap();
|
||||
assert_eq!(buf[0], 0x05);
|
||||
assert_eq!(buf[1], 0x05);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reply_command_not_supported() {
|
||||
let reply = Socks5Reply::command_not_supported();
|
||||
let mut buf = Vec::new();
|
||||
reply.write_to(&mut buf).await.unwrap();
|
||||
assert_eq!(buf[0], 0x05);
|
||||
assert_eq!(buf[1], 0x07);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn roundtrip_ipv4_reply() {
|
||||
let reply = Socks5Reply::success(Socks5Address::Ipv4(Ipv4Addr::new(127, 0, 0, 1)), 1080);
|
||||
let mut buf = Vec::new();
|
||||
reply.write_to(&mut buf).await.unwrap();
|
||||
|
||||
let mut cursor = Cursor::new(&buf[..]);
|
||||
let version = cursor.read_u8().await.unwrap();
|
||||
let _reply_code = cursor.read_u8().await.unwrap();
|
||||
let _rsv = cursor.read_u8().await.unwrap();
|
||||
let atyp = cursor.read_u8().await.unwrap();
|
||||
assert_eq!(version, 0x05);
|
||||
assert_eq!(atyp, 0x01);
|
||||
let mut octets = [0u8; 4];
|
||||
cursor.read_exact(&mut octets).await.unwrap();
|
||||
assert_eq!(Ipv4Addr::from(octets), Ipv4Addr::new(127, 0, 0, 1));
|
||||
let port = cursor.read_u16().await.unwrap();
|
||||
assert_eq!(port, 1080);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn roundtrip_ipv6_reply() {
|
||||
let addr = Ipv6Addr::new(0x2001, 0x0db8, 0, 0, 0, 0, 0, 1);
|
||||
let reply = Socks5Reply::success(Socks5Address::Ipv6(addr), 443);
|
||||
let mut buf = Vec::new();
|
||||
reply.write_to(&mut buf).await.unwrap();
|
||||
|
||||
let mut cursor = Cursor::new(&buf[..]);
|
||||
let _version = cursor.read_u8().await.unwrap();
|
||||
let _reply_code = cursor.read_u8().await.unwrap();
|
||||
let _rsv = cursor.read_u8().await.unwrap();
|
||||
let atyp = cursor.read_u8().await.unwrap();
|
||||
assert_eq!(atyp, 0x04);
|
||||
let mut octets = [0u8; 16];
|
||||
cursor.read_exact(&mut octets).await.unwrap();
|
||||
assert_eq!(Ipv6Addr::from(octets), addr);
|
||||
let port = cursor.read_u16().await.unwrap();
|
||||
assert_eq!(port, 443);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn roundtrip_domain_reply() {
|
||||
let reply = Socks5Reply::success(Socks5Address::Domain("example.com".to_string()), 8080);
|
||||
let mut buf = Vec::new();
|
||||
reply.write_to(&mut buf).await.unwrap();
|
||||
|
||||
let mut cursor = Cursor::new(&buf[..]);
|
||||
let _version = cursor.read_u8().await.unwrap();
|
||||
let _reply_code = cursor.read_u8().await.unwrap();
|
||||
let _rsv = cursor.read_u8().await.unwrap();
|
||||
let atyp = cursor.read_u8().await.unwrap();
|
||||
assert_eq!(atyp, 0x03);
|
||||
let len = cursor.read_u8().await.unwrap();
|
||||
let mut domain = vec![0u8; len as usize];
|
||||
cursor.read_exact(&mut domain).await.unwrap();
|
||||
assert_eq!(String::from_utf8(domain).unwrap(), "example.com");
|
||||
let port = cursor.read_u16().await.unwrap();
|
||||
assert_eq!(port, 8080);
|
||||
}
|
||||
}
|
||||
362
crates/wraith-core/src/transport/acme.rs
Normal file
362
crates/wraith-core/src/transport/acme.rs
Normal file
@@ -0,0 +1,362 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use rustls::crypto::aws_lc_rs::default_provider;
|
||||
use rustls::ServerConfig;
|
||||
use rustls_acme::caches::DirCache;
|
||||
use rustls_acme::{AcmeConfig, AcmeState, ResolvesServerCertAcme};
|
||||
use tracing::{error, info};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_rustls::TlsAcceptor as TokioTlsAcceptor;
|
||||
|
||||
use super::{TransportAcceptor, TransportInfo, TransportKind};
|
||||
|
||||
const ACME_TLS_ALPN_NAME: &[u8] = b"acme-tls/1";
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum AcmeMode {
|
||||
Domain { domain: String },
|
||||
Ip,
|
||||
}
|
||||
|
||||
pub struct AcmeCertProvider {
|
||||
mode: AcmeMode,
|
||||
cache_dir: Option<PathBuf>,
|
||||
directory_url: String,
|
||||
contact: Vec<String>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for AcmeCertProvider {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("AcmeCertProvider")
|
||||
.field("mode", &self.mode)
|
||||
.field("cache_dir", &self.cache_dir)
|
||||
.field("directory_url", &self.directory_url)
|
||||
.field("contact", &self.contact)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl AcmeCertProvider {
|
||||
pub fn new(mode: AcmeMode) -> Self {
|
||||
Self {
|
||||
mode,
|
||||
cache_dir: None,
|
||||
directory_url: rustls_acme::acme::LETS_ENCRYPT_STAGING_DIRECTORY.to_string(),
|
||||
contact: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn domain(domain: impl Into<String>) -> Self {
|
||||
Self::new(AcmeMode::Domain {
|
||||
domain: domain.into(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn ip() -> Self {
|
||||
Self::new(AcmeMode::Ip)
|
||||
}
|
||||
|
||||
pub fn with_cache_dir(mut self, dir: impl Into<PathBuf>) -> Self {
|
||||
self.cache_dir = Some(dir.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_directory(mut self, url: impl Into<String>) -> Self {
|
||||
self.directory_url = url.into();
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_production_directory(mut self) -> Self {
|
||||
self.directory_url = rustls_acme::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY.to_string();
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_contact(mut self, contact: impl Into<String>) -> Self {
|
||||
self.contact.push(contact.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn mode(&self) -> &AcmeMode {
|
||||
&self.mode
|
||||
}
|
||||
|
||||
fn build_acme_state(&self) -> (AcmeState<std::io::Error>, Arc<ResolvesServerCertAcme>) {
|
||||
let domains: Vec<String> = match &self.mode {
|
||||
AcmeMode::Domain { domain } => vec![domain.clone()],
|
||||
AcmeMode::Ip => vec![],
|
||||
};
|
||||
|
||||
let base_config = AcmeConfig::new(domains)
|
||||
.directory(&self.directory_url)
|
||||
.contact(self.contact.clone());
|
||||
|
||||
let state = match &self.cache_dir {
|
||||
Some(cache_dir) => {
|
||||
base_config.cache(DirCache::new(cache_dir.clone())).state()
|
||||
}
|
||||
None => {
|
||||
base_config
|
||||
.cache(rustls_acme::caches::NoCache::default())
|
||||
.state()
|
||||
}
|
||||
};
|
||||
|
||||
let resolver = state.resolver();
|
||||
(state, resolver)
|
||||
}
|
||||
|
||||
pub fn build_server_config_with_resolver(
|
||||
&self,
|
||||
resolver: Arc<ResolvesServerCertAcme>,
|
||||
) -> Result<Arc<ServerConfig>> {
|
||||
let provider = default_provider().into();
|
||||
let mut config = ServerConfig::builder_with_provider(provider)
|
||||
.with_safe_default_protocol_versions()
|
||||
.map_err(|e| anyhow!("failed to set protocol versions: {}", e))?
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(resolver);
|
||||
config.alpn_protocols.push(ACME_TLS_ALPN_NAME.to_vec());
|
||||
Ok(Arc::new(config))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AcmeTlsAcceptor {
|
||||
listener: TcpListener,
|
||||
listen_addr: SocketAddr,
|
||||
#[allow(dead_code)]
|
||||
server_config: Arc<ServerConfig>,
|
||||
tokio_acceptor: TokioTlsAcceptor,
|
||||
}
|
||||
|
||||
impl AcmeTlsAcceptor {
|
||||
pub async fn bind_acme(
|
||||
addr: SocketAddr,
|
||||
provider: Arc<AcmeCertProvider>,
|
||||
) -> Result<Self> {
|
||||
let (state, resolver) = provider.build_acme_state();
|
||||
|
||||
let server_config = provider.build_server_config_with_resolver(resolver.clone())?;
|
||||
|
||||
Self::spawn_state_worker(state, resolver);
|
||||
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
let listen_addr = listener.local_addr()?;
|
||||
|
||||
let tokio_acceptor = TokioTlsAcceptor::from(server_config.clone());
|
||||
|
||||
Ok(Self {
|
||||
listener,
|
||||
listen_addr,
|
||||
server_config,
|
||||
tokio_acceptor,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn listen_addr(&self) -> SocketAddr {
|
||||
self.listen_addr
|
||||
}
|
||||
|
||||
fn spawn_state_worker(state: AcmeState<std::io::Error>, resolver: Arc<ResolvesServerCertAcme>) {
|
||||
use futures::StreamExt;
|
||||
|
||||
let task = async move {
|
||||
let mut state = state;
|
||||
while let Some(event) = state.next().await {
|
||||
match event {
|
||||
Ok(ok) => {
|
||||
if let rustls_acme::EventOk::DeployedNewCert = ok {
|
||||
info!("ACME: new certificate deployed");
|
||||
} else {
|
||||
info!("ACME event: {:?}", ok);
|
||||
}
|
||||
}
|
||||
Err(err) => error!("ACME event error: {:?}", err),
|
||||
}
|
||||
if Arc::strong_count(&resolver) == 1 {
|
||||
info!("ACME resolver dropped, stopping background task");
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
tokio::spawn(task);
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl TransportAcceptor for AcmeTlsAcceptor {
|
||||
type Stream = tokio_rustls::server::TlsStream<tokio::net::TcpStream>;
|
||||
|
||||
async fn accept(&self) -> Result<(Self::Stream, TransportInfo)> {
|
||||
let (tcp_stream, remote_addr) = self.listener.accept().await?;
|
||||
let tls_stream = self.tokio_acceptor.accept(tcp_stream).await?;
|
||||
|
||||
let server_name = tls_stream
|
||||
.get_ref()
|
||||
.1
|
||||
.server_name()
|
||||
.map(|s| s.to_string());
|
||||
|
||||
let info = TransportInfo {
|
||||
remote_addr: Some(remote_addr),
|
||||
transport_kind: TransportKind::Tls { server_name },
|
||||
};
|
||||
|
||||
Ok((tls_stream, info))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_domain_mode() {
|
||||
let provider = AcmeCertProvider::domain("example.com");
|
||||
assert!(matches!(provider.mode(), AcmeMode::Domain { .. }));
|
||||
if let AcmeMode::Domain { domain } = provider.mode() {
|
||||
assert_eq!(domain, "example.com");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_ip_mode() {
|
||||
let provider = AcmeCertProvider::ip();
|
||||
assert!(matches!(provider.mode(), AcmeMode::Ip));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_default_staging_directory() {
|
||||
let provider = AcmeCertProvider::domain("example.com");
|
||||
assert_eq!(
|
||||
provider.directory_url,
|
||||
rustls_acme::acme::LETS_ENCRYPT_STAGING_DIRECTORY
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_production_directory() {
|
||||
let provider = AcmeCertProvider::domain("example.com").with_production_directory();
|
||||
assert_eq!(
|
||||
provider.directory_url,
|
||||
rustls_acme::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_custom_directory() {
|
||||
let provider =
|
||||
AcmeCertProvider::domain("example.com").with_directory("https://custom.acme.dir/");
|
||||
assert_eq!(provider.directory_url, "https://custom.acme.dir/");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_with_cache_dir() {
|
||||
let provider = AcmeCertProvider::domain("example.com").with_cache_dir("/tmp/acme_cache");
|
||||
assert_eq!(provider.cache_dir, Some(PathBuf::from("/tmp/acme_cache")));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_with_contact() {
|
||||
let provider =
|
||||
AcmeCertProvider::domain("example.com").with_contact("mailto:admin@example.com");
|
||||
assert_eq!(
|
||||
provider.contact,
|
||||
vec!["mailto:admin@example.com".to_string()]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_build_state_domain() {
|
||||
let provider = AcmeCertProvider::domain("example.com");
|
||||
let (_state, resolver) = provider.build_acme_state();
|
||||
assert!(Arc::strong_count(&resolver) >= 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_build_state_with_cache() {
|
||||
let provider =
|
||||
AcmeCertProvider::domain("example.com").with_cache_dir("/tmp/test_cache");
|
||||
let (_state, resolver) = provider.build_acme_state();
|
||||
assert!(Arc::strong_count(&resolver) >= 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_build_server_config() {
|
||||
let _ = default_provider().install_default();
|
||||
let provider = AcmeCertProvider::domain("example.com");
|
||||
let (_, resolver) = provider.build_acme_state();
|
||||
let config = provider.build_server_config_with_resolver(resolver).unwrap();
|
||||
assert!(!config.alpn_protocols.is_empty());
|
||||
assert!(config
|
||||
.alpn_protocols
|
||||
.iter()
|
||||
.any(|p| p == ACME_TLS_ALPN_NAME));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_mode_domain_debug() {
|
||||
let mode = AcmeMode::Domain {
|
||||
domain: "test.example.com".to_string(),
|
||||
};
|
||||
let debug_str = format!("{:?}", mode);
|
||||
assert!(debug_str.contains("test.example.com"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_mode_ip_debug() {
|
||||
let mode = AcmeMode::Ip;
|
||||
let debug_str = format!("{:?}", mode);
|
||||
assert!(debug_str.contains("Ip"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_builder_chain() {
|
||||
let provider = AcmeCertProvider::domain("test.example.com")
|
||||
.with_production_directory()
|
||||
.with_cache_dir("/tmp/cache")
|
||||
.with_contact("mailto:admin@test.example.com");
|
||||
assert!(matches!(provider.mode(), AcmeMode::Domain { .. }));
|
||||
assert_eq!(
|
||||
provider.directory_url,
|
||||
rustls_acme::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
|
||||
);
|
||||
assert_eq!(provider.cache_dir, Some(PathBuf::from("/tmp/cache")));
|
||||
assert_eq!(provider.contact.len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn acme_tls_acceptor_bind_acme() {
|
||||
let _ = default_provider().install_default();
|
||||
let provider = Arc::new(AcmeCertProvider::domain("example.com"));
|
||||
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
|
||||
let acceptor = AcmeTlsAcceptor::bind_acme(addr, provider).await.unwrap();
|
||||
assert_ne!(acceptor.listen_addr().port(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore]
|
||||
async fn acme_staging_domain_cert_provisioning() {
|
||||
let _ = default_provider().install_default();
|
||||
|
||||
let cache_dir = tempfile::tempdir().unwrap();
|
||||
let provider = Arc::new(
|
||||
AcmeCertProvider::domain("acme-test.example.com")
|
||||
.with_cache_dir(cache_dir.path())
|
||||
.with_contact("mailto:admin@example.com"),
|
||||
);
|
||||
|
||||
let addr: SocketAddr = "0.0.0.0:443".parse().unwrap();
|
||||
let result = AcmeTlsAcceptor::bind_acme(addr, provider).await;
|
||||
assert!(
|
||||
result.is_ok(),
|
||||
"ACME TlsAcceptor should bind: {:?}",
|
||||
result.err()
|
||||
);
|
||||
|
||||
let acceptor = result.unwrap();
|
||||
assert_eq!(acceptor.listen_addr().port(), 443);
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,12 @@ mod tls;
|
||||
#[cfg(feature = "tls")]
|
||||
pub use tls::{AcmeConfig, TlsAcceptor, TlsTransport};
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
mod acme;
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
pub use acme::{AcmeCertProvider, AcmeMode, AcmeTlsAcceptor};
|
||||
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
@@ -9,8 +9,16 @@ use rustls::{ClientConfig, DigitallySignedStruct, RootCertStore, ServerConfig};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio_rustls::{client::TlsStream as ClientTlsStream, TlsAcceptor as TokioTlsAcceptor, TlsConnector};
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
use rustls::crypto::aws_lc_rs::default_provider;
|
||||
#[cfg(feature = "acme")]
|
||||
use rustls_acme::ResolvesServerCertAcme;
|
||||
|
||||
use super::{Transport, TransportAcceptor, TransportInfo, TransportKind};
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
const ACME_TLS_ALPN_NAME: &[u8] = b"acme-tls/1";
|
||||
|
||||
/// A TLS-based client transport that connects to a remote address over TLS.
|
||||
///
|
||||
/// Wraps a TCP connection with a TLS client session via `tokio_rustls::TlsConnector`.
|
||||
@@ -110,8 +118,10 @@ pub struct AcmeConfig {
|
||||
/// A TLS-based server transport acceptor that accepts TCP connections
|
||||
/// and wraps them with TLS server sessions via `tokio_rustls::TlsAcceptor`.
|
||||
///
|
||||
/// Requires certificate and private key configuration. Supports manual
|
||||
/// cert/key paths and an ACME config stub (ADR-008).
|
||||
/// Supports three certificate modes (ADR-008):
|
||||
/// - Manual certs via `bind()` with explicit cert/key
|
||||
/// - ACME certs via `bind_acme()` with an `AcmeCertProvider`
|
||||
/// - The stub `AcmeConfig` parameter in `bind()` is kept for backward compat
|
||||
pub struct TlsAcceptor {
|
||||
listener: TcpListener,
|
||||
listen_addr: SocketAddr,
|
||||
@@ -145,6 +155,33 @@ impl TlsAcceptor {
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
pub async fn bind_acme(
|
||||
addr: SocketAddr,
|
||||
acme_resolver: Arc<ResolvesServerCertAcme>,
|
||||
) -> Result<Self> {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
let listen_addr = listener.local_addr()?;
|
||||
|
||||
let provider = default_provider().into();
|
||||
let mut server_config = ServerConfig::builder_with_provider(provider)
|
||||
.with_safe_default_protocol_versions()
|
||||
.map_err(|e| anyhow!("failed to set protocol versions: {}", e))?
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(acme_resolver);
|
||||
server_config.alpn_protocols.push(ACME_TLS_ALPN_NAME.to_vec());
|
||||
|
||||
let server_config = Arc::new(server_config);
|
||||
let tokio_acceptor = TokioTlsAcceptor::from(server_config.clone());
|
||||
|
||||
Ok(Self {
|
||||
listener,
|
||||
listen_addr,
|
||||
server_config,
|
||||
tokio_acceptor,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn listen_addr(&self) -> SocketAddr {
|
||||
self.listen_addr
|
||||
}
|
||||
|
||||
@@ -43,8 +43,14 @@ This integrates with `TlsAcceptor` by providing ACME-resolved certificates inste
|
||||
|
||||
## Notes
|
||||
|
||||
> To be filled by implementation agent
|
||||
- `AcmeCertProvider` is the main entry point. It creates `AcmeState` and `ResolvesServerCertAcme` from `rustls-acme`.
|
||||
- The `ResolvesServerCertAcme` resolver is shared between the `AcmeState` background task and the `ServerConfig`, so cert updates propagate automatically.
|
||||
- `AcmeTlsAcceptor::bind_acme()` creates a TLS acceptor that uses ACME-provisioned certs and spawns a background tokio task for auto-renewal.
|
||||
- `TlsAcceptor::bind_acme()` also added for users who want to use ACME with the standard `TlsAcceptor` type directly.
|
||||
- The `AcmeConfig` stub in `tls.rs` is retained for backward compat with existing `TlsAcceptor::bind()`.
|
||||
- `acme` feature implies `tls` and adds `rustls-acme` + `futures` dependencies.
|
||||
- TLS-ALPN-01 challenge handling works via the `acme-tls/1` ALPN protocol registered in `ServerConfig` — the resolver dispatches challenge vs regular certs automatically.
|
||||
|
||||
## Summary
|
||||
|
||||
> To be filled on completion
|
||||
Implemented ACME/Let's Encrypt certificate provisioning (ADR-008) behind the `acme` feature flag. `AcmeCertProvider` supports domain-based and IP-based modes using `rustls-acme`. `AcmeTlsAcceptor::bind_acme()` and `TlsAcceptor::bind_acme()` provide ACME-integrated TLS acceptance with automatic certificate renewal via a background tokio task. Unit tests cover config construction, builder patterns, and server config generation. Integration test for LE staging is marked `#[ignore]`.
|
||||
Reference in New Issue
Block a user