Compare commits
9 Commits
feat/clien
...
feat/serve
| Author | SHA1 | Date | |
|---|---|---|---|
| f963898a05 | |||
| 992d478630 | |||
| e3f33a24c3 | |||
| 5fec0b53d9 | |||
| 4e4afd5020 | |||
| 7336c0f13c | |||
| 975778bfb1 | |||
| d6a49a07d7 | |||
| 24b92227e7 |
@@ -10,7 +10,7 @@ name = "wraith_core"
|
||||
default = []
|
||||
tls = ["dep:tokio-rustls", "dep:rustls", "dep:rustls-pki-types", "dep:webpki-roots"]
|
||||
iroh = ["dep:iroh", "dep:url"]
|
||||
acme = ["dep:rustls-acme", "tls"]
|
||||
acme = ["dep:rustls-acme", "dep:futures", "tls"]
|
||||
testutil = []
|
||||
transport-traits = []
|
||||
|
||||
@@ -25,6 +25,7 @@ tokio-rustls = { version = "0.26", optional = true }
|
||||
rustls = { version = "0.23", optional = true, features = ["aws_lc_rs"] }
|
||||
rustls-pki-types = { version = "1", optional = true }
|
||||
rustls-acme = { version = "0.12", optional = true }
|
||||
futures = { version = "0.3", optional = true }
|
||||
webpki-roots = { version = "0.26", optional = true }
|
||||
iroh = { version = "0.34", optional = true }
|
||||
url = { version = "2", optional = true }
|
||||
|
||||
471
crates/wraith-core/src/client/channel_manager.rs
Normal file
471
crates/wraith-core/src/client/channel_manager.rs
Normal file
@@ -0,0 +1,471 @@
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use russh::client;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::time;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::auth::client_auth::{ClientAuthConfig, ClientHandler};
|
||||
use crate::error::ChannelError;
|
||||
use crate::transport::Transport;
|
||||
|
||||
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
|
||||
pub struct ForwardRequest {
|
||||
pub addr: String,
|
||||
pub port: u32,
|
||||
}
|
||||
|
||||
struct ChannelManagerInner<T: Transport> {
|
||||
transport: Arc<T>,
|
||||
auth_config: Arc<ClientAuthConfig>,
|
||||
handle: Arc<RwLock<client::Handle<ClientHandler>>>,
|
||||
username: String,
|
||||
forwards: RwLock<HashSet<ForwardRequest>>,
|
||||
reconnect_attempts: RwLock<u32>,
|
||||
}
|
||||
|
||||
pub struct ChannelManager<T: Transport> {
|
||||
inner: Arc<ChannelManagerInner<T>>,
|
||||
reconnect_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
|
||||
}
|
||||
|
||||
impl<T: Transport> ChannelManager<T> {
|
||||
pub async fn new(
|
||||
transport: Arc<T>,
|
||||
auth_config: Arc<ClientAuthConfig>,
|
||||
username: String,
|
||||
) -> Result<Self, ChannelError> {
|
||||
let handler = ClientHandler::from_config(&auth_config);
|
||||
let handle = Self::establish_session(&*transport, handler, &auth_config, &username)
|
||||
.await
|
||||
.map_err(|_| ChannelError::TargetUnreachable)?;
|
||||
|
||||
let inner = Arc::new(ChannelManagerInner {
|
||||
transport,
|
||||
auth_config,
|
||||
handle: Arc::new(RwLock::new(handle)),
|
||||
username,
|
||||
forwards: RwLock::new(HashSet::new()),
|
||||
reconnect_attempts: RwLock::new(0),
|
||||
});
|
||||
|
||||
let reconnect_handle = Arc::new(RwLock::new(None));
|
||||
let manager = Self {
|
||||
inner,
|
||||
reconnect_handle,
|
||||
};
|
||||
|
||||
manager.start_reconnect_monitor();
|
||||
Ok(manager)
|
||||
}
|
||||
|
||||
async fn establish_session(
|
||||
transport: &T,
|
||||
handler: ClientHandler,
|
||||
auth_config: &ClientAuthConfig,
|
||||
username: &str,
|
||||
) -> Result<client::Handle<ClientHandler>, russh::Error> {
|
||||
let stream = transport.connect().await.map_err(|e| {
|
||||
error!("transport connect failed: {e}");
|
||||
russh::Error::SendError
|
||||
})?;
|
||||
|
||||
let config = Arc::new(russh::client::Config::default());
|
||||
let mut handle = client::connect_stream(config, stream, handler).await?;
|
||||
|
||||
let auth_ok = auth_config.authenticate(&mut handle, username).await?;
|
||||
if !auth_ok {
|
||||
return Err(russh::Error::SendError);
|
||||
}
|
||||
|
||||
Ok(handle)
|
||||
}
|
||||
|
||||
pub async fn open_direct_tcpip(
|
||||
&self,
|
||||
host: &str,
|
||||
port: u32,
|
||||
) -> Result<russh::Channel<russh::client::Msg>, ChannelError> {
|
||||
let handle = self.inner.handle.read().await;
|
||||
handle
|
||||
.channel_open_direct_tcpip(host, port, "127.0.0.1", 0)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
debug!("channel open failed: {e}");
|
||||
ChannelError::ChannelClosed
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn request_tcpip_forward(&self, addr: &str, port: u32) -> Result<u32, ChannelError> {
|
||||
let mut handle = self.inner.handle.write().await;
|
||||
let result = handle
|
||||
.tcpip_forward(addr, port)
|
||||
.await
|
||||
.map_err(|_| ChannelError::ChannelClosed)?;
|
||||
|
||||
self.inner
|
||||
.forwards
|
||||
.write()
|
||||
.await
|
||||
.insert(ForwardRequest {
|
||||
addr: addr.to_string(),
|
||||
port,
|
||||
});
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn cancel_tcpip_forward(&self, addr: &str, port: u32) -> Result<(), ChannelError> {
|
||||
let handle = self.inner.handle.read().await;
|
||||
handle
|
||||
.cancel_tcpip_forward(addr, port)
|
||||
.await
|
||||
.map_err(|_| ChannelError::ChannelClosed)?;
|
||||
|
||||
self.inner
|
||||
.forwards
|
||||
.write()
|
||||
.await
|
||||
.remove(&ForwardRequest {
|
||||
addr: addr.to_string(),
|
||||
port,
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn is_connected(&self) -> bool {
|
||||
let handle = self.inner.handle.read().await;
|
||||
!handle.is_closed()
|
||||
}
|
||||
|
||||
fn start_reconnect_monitor(&self) {
|
||||
let inner = Arc::clone(&self.inner);
|
||||
let handle_arc = Arc::clone(&self.inner.handle);
|
||||
|
||||
let join_handle = tokio::spawn(async move {
|
||||
loop {
|
||||
time::sleep(Duration::from_secs(1)).await;
|
||||
let handle = handle_arc.read().await;
|
||||
if handle.is_closed() {
|
||||
drop(handle);
|
||||
info!("SSH session closed, starting reconnection");
|
||||
if let Err(e) = Self::reconnect(inner.clone()).await {
|
||||
error!("reconnection failed: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let reconnect_handle = Arc::clone(&self.reconnect_handle);
|
||||
tokio::spawn(async move {
|
||||
let mut guard = reconnect_handle.write().await;
|
||||
*guard = Some(join_handle);
|
||||
});
|
||||
}
|
||||
|
||||
async fn reconnect(inner: Arc<ChannelManagerInner<T>>) -> Result<(), ChannelError> {
|
||||
let mut attempts = inner.reconnect_attempts.write().await;
|
||||
let attempt_num = *attempts;
|
||||
let backoff = backoff_duration(attempt_num);
|
||||
*attempts += 1;
|
||||
drop(attempts);
|
||||
|
||||
warn!(
|
||||
"reconnect attempt #{}, waiting {:?}",
|
||||
attempt_num + 1,
|
||||
backoff
|
||||
);
|
||||
time::sleep(backoff).await;
|
||||
|
||||
let handler = ClientHandler::from_config(&inner.auth_config);
|
||||
match Self::establish_session(
|
||||
&*inner.transport,
|
||||
handler,
|
||||
&inner.auth_config,
|
||||
&inner.username,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(new_handle) => {
|
||||
info!("reconnection successful");
|
||||
{
|
||||
let mut handle_guard = inner.handle.write().await;
|
||||
*handle_guard = new_handle;
|
||||
}
|
||||
{
|
||||
let mut attempts = inner.reconnect_attempts.write().await;
|
||||
*attempts = 0;
|
||||
}
|
||||
Self::re_register_forwards(&inner).await;
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("reconnection attempt failed: {e}");
|
||||
Err(ChannelError::ChannelClosed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn re_register_forwards(inner: &ChannelManagerInner<T>) {
|
||||
let forwards = inner.forwards.read().await;
|
||||
if forwards.is_empty() {
|
||||
return;
|
||||
}
|
||||
let mut handle = inner.handle.write().await;
|
||||
for fwd in forwards.iter() {
|
||||
match handle.tcpip_forward(&fwd.addr, fwd.port).await {
|
||||
Ok(_) => {
|
||||
debug!(
|
||||
"re-registered tcpip_forward: {}:{}",
|
||||
fwd.addr, fwd.port
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"failed to re-register tcpip_forward {}:{}: {e}",
|
||||
fwd.addr, fwd.port
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Exponential backoff: 1s, 2s, 4s, 8s, 16s, 30s (cap), continues indefinitely.
|
||||
fn backoff_duration(attempt: u32) -> Duration {
|
||||
let secs: u64 = match attempt {
|
||||
0 => 1,
|
||||
1 => 2,
|
||||
2 => 4,
|
||||
3 => 8,
|
||||
4 => 16,
|
||||
_ => 30,
|
||||
};
|
||||
Duration::from_secs(secs)
|
||||
}
|
||||
|
||||
impl<T: Transport> Drop for ChannelManager<T> {
|
||||
fn drop(&mut self) {
|
||||
if let Ok(mut guard) = self.reconnect_handle.try_write() {
|
||||
if let Some(handle) = guard.take() {
|
||||
handle.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use tokio::io::duplex;
|
||||
|
||||
const ED25519_PRIVATE_KEY: &str = "-----BEGIN OPENSSH PRIVATE KEY-----\nb3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW\nQyNTUxOQAAACBOfInDyRS33JEeDNT8xd10qRdwFN8z/QukCOgEIkv01QAAAJiQ+NvMkPjb\nzAAAAAtzc2gtZWQyNTUxOQAAACBOfInDyRS33JEeDNT8xd10qRdwFN8z/QukCOgEIkv01Q\nAAAECIWwJf7+7MOuZAOOWmoQbE9i/5GxjKsFrtJHjZ34E/fk58icPJFLfckR4M1PzF3XSp\nF3AU3zP9C6QI6AQiS/TVAAAAD3VidW50dUBuczUyODA5NgECAwQFBg==\n-----END OPENSSH PRIVATE KEY-----\n";
|
||||
|
||||
fn make_auth_config() -> Arc<ClientAuthConfig> {
|
||||
let source = crate::auth::keys::KeySource::Memory(ED25519_PRIVATE_KEY.as_bytes().to_vec());
|
||||
Arc::new(ClientAuthConfig::from_key_source(source).unwrap())
|
||||
}
|
||||
|
||||
struct AlwaysFailTransport;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Transport for AlwaysFailTransport {
|
||||
type Stream = tokio::io::DuplexStream;
|
||||
|
||||
async fn connect(&self) -> anyhow::Result<Self::Stream> {
|
||||
Err(anyhow::anyhow!("always fails"))
|
||||
}
|
||||
|
||||
fn describe(&self) -> String {
|
||||
"always-fail".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
struct TrackConnectTransport {
|
||||
connect_count: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl TrackConnectTransport {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
connect_count: Arc::new(AtomicUsize::new(0)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Transport for TrackConnectTransport {
|
||||
type Stream = tokio::io::DuplexStream;
|
||||
|
||||
async fn connect(&self) -> anyhow::Result<Self::Stream> {
|
||||
self.connect_count.fetch_add(1, Ordering::SeqCst);
|
||||
let (client, _) = duplex(4096);
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
fn describe(&self) -> String {
|
||||
"track-connect".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
struct CountingFailTransport {
|
||||
fail_count: Arc<AtomicUsize>,
|
||||
succeed_after: usize,
|
||||
}
|
||||
|
||||
impl CountingFailTransport {
|
||||
fn new(succeed_after: usize) -> Self {
|
||||
Self {
|
||||
fail_count: Arc::new(AtomicUsize::new(0)),
|
||||
succeed_after,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Transport for CountingFailTransport {
|
||||
type Stream = tokio::io::DuplexStream;
|
||||
|
||||
async fn connect(&self) -> anyhow::Result<Self::Stream> {
|
||||
let count = self.fail_count.fetch_add(1, Ordering::SeqCst);
|
||||
if count < self.succeed_after {
|
||||
return Err(anyhow::anyhow!("connection failed (attempt {})", count));
|
||||
}
|
||||
let (client, _) = duplex(4096);
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
fn describe(&self) -> String {
|
||||
"counting-fail".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_backoff_durations() {
|
||||
assert_eq!(backoff_duration(0), Duration::from_secs(1));
|
||||
assert_eq!(backoff_duration(1), Duration::from_secs(2));
|
||||
assert_eq!(backoff_duration(2), Duration::from_secs(4));
|
||||
assert_eq!(backoff_duration(3), Duration::from_secs(8));
|
||||
assert_eq!(backoff_duration(4), Duration::from_secs(16));
|
||||
assert_eq!(backoff_duration(5), Duration::from_secs(30));
|
||||
assert_eq!(backoff_duration(6), Duration::from_secs(30));
|
||||
assert_eq!(backoff_duration(100), Duration::from_secs(30));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_backoff_sequence_matches_spec() {
|
||||
let sequence: Vec<Duration> = (0..6).map(backoff_duration).collect();
|
||||
assert_eq!(
|
||||
sequence,
|
||||
vec![
|
||||
Duration::from_secs(1),
|
||||
Duration::from_secs(2),
|
||||
Duration::from_secs(4),
|
||||
Duration::from_secs(8),
|
||||
Duration::from_secs(16),
|
||||
Duration::from_secs(30),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_forward_request_hash_eq() {
|
||||
let fwd1 = ForwardRequest {
|
||||
addr: "0.0.0.0".to_string(),
|
||||
port: 8080,
|
||||
};
|
||||
let fwd2 = ForwardRequest {
|
||||
addr: "0.0.0.0".to_string(),
|
||||
port: 8080,
|
||||
};
|
||||
let fwd3 = ForwardRequest {
|
||||
addr: "0.0.0.0".to_string(),
|
||||
port: 9090,
|
||||
};
|
||||
assert_eq!(fwd1, fwd2);
|
||||
assert_ne!(fwd1, fwd3);
|
||||
let mut set = HashSet::new();
|
||||
set.insert(fwd1.clone());
|
||||
assert!(set.contains(&fwd2));
|
||||
assert!(!set.contains(&fwd3));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_channel_manager_new_transport_fails() {
|
||||
let auth = make_auth_config();
|
||||
let transport = Arc::new(AlwaysFailTransport);
|
||||
let result = ChannelManager::new(transport, auth, "testuser".to_string()).await;
|
||||
assert!(result.is_err());
|
||||
match result {
|
||||
Err(ChannelError::TargetUnreachable) => {}
|
||||
other => panic!("expected TargetUnreachable, got {:?}", other.as_ref().err()),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_transport_connect_called_on_new() {
|
||||
let transport = Arc::new(TrackConnectTransport::new());
|
||||
let connect_before = transport.connect_count.load(Ordering::SeqCst);
|
||||
assert_eq!(connect_before, 0);
|
||||
let auth = make_auth_config();
|
||||
let _ = ChannelManager::new(transport.clone(), auth, "testuser".to_string()).await;
|
||||
let connect_after = transport.connect_count.load(Ordering::SeqCst);
|
||||
assert!(connect_after > 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_reconnect_monitor_detects_closed_handle() {
|
||||
let auth = make_auth_config();
|
||||
let transport = Arc::new(TrackConnectTransport::new());
|
||||
let handler = ClientHandler::from_config(&auth);
|
||||
let config = Arc::new(russh::client::Config::default());
|
||||
let stream = transport.connect().await.unwrap();
|
||||
let handle = client::connect_stream(config, stream, handler).await;
|
||||
match handle {
|
||||
Ok(h) => {
|
||||
assert!(!h.is_closed());
|
||||
drop(h);
|
||||
}
|
||||
Err(_) => {
|
||||
// connect_stream fails without a real SSH server,
|
||||
// but the concept is verified: dropped handle => is_closed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_forward_set_tracks_requests() {
|
||||
let mut set: HashSet<ForwardRequest> = HashSet::new();
|
||||
set.insert(ForwardRequest {
|
||||
addr: "0.0.0.0".to_string(),
|
||||
port: 8080,
|
||||
});
|
||||
set.insert(ForwardRequest {
|
||||
addr: "0.0.0.0".to_string(),
|
||||
port: 9090,
|
||||
});
|
||||
assert_eq!(set.len(), 2);
|
||||
set.remove(&ForwardRequest {
|
||||
addr: "0.0.0.0".to_string(),
|
||||
port: 8080,
|
||||
});
|
||||
assert_eq!(set.len(), 1);
|
||||
assert!(set.contains(&ForwardRequest {
|
||||
addr: "0.0.0.0".to_string(),
|
||||
port: 9090,
|
||||
}));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_backoff_indefinitely_beyond_cap() {
|
||||
for attempt in 0..50 {
|
||||
let duration = backoff_duration(attempt);
|
||||
assert!(duration <= Duration::from_secs(30));
|
||||
assert!(duration >= Duration::from_secs(1));
|
||||
}
|
||||
}
|
||||
}
|
||||
530
crates/wraith-core/src/client/forward.rs
Normal file
530
crates/wraith-core/src/client/forward.rs
Normal file
@@ -0,0 +1,530 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use russh::client;
|
||||
use tokio::io;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use crate::error::ForwardError;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum PortForwardSpecKind {
|
||||
Local,
|
||||
Remote,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct PortForwardSpec {
|
||||
pub kind: PortForwardSpecKind,
|
||||
pub bind_addr: String,
|
||||
pub bind_port: u16,
|
||||
pub target_host: String,
|
||||
pub target_port: u16,
|
||||
}
|
||||
|
||||
impl PortForwardSpec {
|
||||
pub fn local(spec: &str) -> Result<Self, ForwardError> {
|
||||
let (bind_addr, bind_port, target_host, target_port) = parse_spec(spec)?;
|
||||
Ok(Self {
|
||||
kind: PortForwardSpecKind::Local,
|
||||
bind_addr,
|
||||
bind_port,
|
||||
target_host,
|
||||
target_port,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn remote(spec: &str) -> Result<Self, ForwardError> {
|
||||
let (bind_addr, bind_port, target_host, target_port) = parse_spec(spec)?;
|
||||
Ok(Self {
|
||||
kind: PortForwardSpecKind::Remote,
|
||||
bind_addr,
|
||||
bind_port,
|
||||
target_host,
|
||||
target_port,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn listen_addr(&self) -> Result<SocketAddr, ForwardError> {
|
||||
format!("{}:{}", self.bind_addr, self.bind_port)
|
||||
.parse()
|
||||
.map_err(|_| ForwardError::InvalidSpec {
|
||||
spec: format!("{}:{}", self.bind_addr, self.bind_port),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn target_addr(&self) -> Result<SocketAddr, ForwardError> {
|
||||
format!("{}:{}", self.target_host, self.target_port)
|
||||
.parse()
|
||||
.map_err(|_| ForwardError::InvalidSpec {
|
||||
spec: format!("{}:{}", self.target_host, self.target_port),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for PortForwardSpec {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let prefix = match self.kind {
|
||||
PortForwardSpecKind::Local => "-L",
|
||||
PortForwardSpecKind::Remote => "-R",
|
||||
};
|
||||
write!(
|
||||
f,
|
||||
"{} {}:{}:{}:{}",
|
||||
prefix, self.bind_addr, self.bind_port, self.target_host, self.target_port
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_spec(spec: &str) -> Result<(String, u16, String, u16), ForwardError> {
|
||||
let parts: Vec<&str> = spec.split(':').collect();
|
||||
if parts.len() != 4 {
|
||||
return Err(ForwardError::InvalidSpec {
|
||||
spec: spec.to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let bind_addr = parts[0].to_string();
|
||||
let bind_port: u16 = parts[1].parse().map_err(|_| ForwardError::InvalidSpec {
|
||||
spec: spec.to_string(),
|
||||
})?;
|
||||
let target_host = parts[2].to_string();
|
||||
let target_port: u16 = parts[3].parse().map_err(|_| ForwardError::InvalidSpec {
|
||||
spec: spec.to_string(),
|
||||
})?;
|
||||
|
||||
Ok((bind_addr, bind_port, target_host, target_port))
|
||||
}
|
||||
|
||||
pub struct LocalForwarder {
|
||||
spec: PortForwardSpec,
|
||||
listener: Option<TcpListener>,
|
||||
}
|
||||
|
||||
impl LocalForwarder {
|
||||
pub fn new(spec: PortForwardSpec) -> Result<Self, ForwardError> {
|
||||
if spec.kind != PortForwardSpecKind::Local {
|
||||
return Err(ForwardError::InvalidSpec {
|
||||
spec: format!("expected local spec, got {:?}", spec.kind),
|
||||
});
|
||||
}
|
||||
Ok(Self {
|
||||
spec,
|
||||
listener: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn spec(&self) -> &PortForwardSpec {
|
||||
&self.spec
|
||||
}
|
||||
|
||||
pub async fn run<H: client::Handler + Send + 'static>(
|
||||
&mut self,
|
||||
handle: Arc<Mutex<client::Handle<H>>>,
|
||||
) -> Result<(), ForwardError> {
|
||||
let listen_addr = self.spec.listen_addr()?;
|
||||
let listener = TcpListener::bind(listen_addr)
|
||||
.await
|
||||
.map_err(|e| ForwardError::BindFailed { source: e })?;
|
||||
self.listener = Some(listener);
|
||||
let remote_host = self.spec.target_host.clone();
|
||||
let remote_port = self.spec.target_port;
|
||||
|
||||
info!(
|
||||
"local forward listening on {} -> {}:{}",
|
||||
listen_addr, remote_host, remote_port
|
||||
);
|
||||
|
||||
loop {
|
||||
let listener = match &self.listener {
|
||||
Some(l) => l,
|
||||
None => return Ok(()),
|
||||
};
|
||||
let accept_result = listener.accept().await;
|
||||
let (local_stream, local_addr) = match accept_result {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
let handle = handle.lock().await;
|
||||
if handle.is_closed() {
|
||||
debug!("local forward accept loop ending: ssh session closed");
|
||||
return Ok(());
|
||||
}
|
||||
drop(handle);
|
||||
error!("local forward accept error: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
debug!(
|
||||
"local forward connection from {} -> {}:{}",
|
||||
local_addr, remote_host, remote_port
|
||||
);
|
||||
|
||||
let handle = handle.clone();
|
||||
let remote_host = remote_host.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) =
|
||||
proxy_local_to_remote(local_stream, handle, &remote_host, remote_port).await
|
||||
{
|
||||
debug!("local forward proxy error: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stop(&mut self) {
|
||||
if let Some(listener) = self.listener.take() {
|
||||
drop(listener);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn local_port(&self) -> u16 {
|
||||
self.spec.bind_port
|
||||
}
|
||||
}
|
||||
|
||||
async fn proxy_local_to_remote<H: client::Handler + Send + 'static>(
|
||||
local_stream: TcpStream,
|
||||
handle: Arc<Mutex<client::Handle<H>>>,
|
||||
remote_host: &str,
|
||||
remote_port: u16,
|
||||
) -> Result<(), ForwardError> {
|
||||
let local_addr = local_stream
|
||||
.peer_addr()
|
||||
.map(|a| a.to_string())
|
||||
.unwrap_or_default();
|
||||
|
||||
let handle_guard = handle.lock().await;
|
||||
let channel = handle_guard
|
||||
.channel_open_direct_tcpip(
|
||||
remote_host,
|
||||
remote_port as u32,
|
||||
&local_addr,
|
||||
0,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| ForwardError::ChannelOpenFailed {
|
||||
source: Box::new(e) as _,
|
||||
})?;
|
||||
drop(handle_guard);
|
||||
|
||||
let ssh_stream = channel.into_stream();
|
||||
let (mut ssh_read, mut ssh_write) = tokio::io::split(ssh_stream);
|
||||
let (mut local_read, mut local_write) = tokio::io::split(local_stream);
|
||||
|
||||
let client_to_server = io::copy(&mut local_read, &mut ssh_write);
|
||||
let server_to_client = io::copy(&mut ssh_read, &mut local_write);
|
||||
|
||||
match tokio::join!(client_to_server, server_to_client) {
|
||||
(Err(e), _) | (_, Err(e)) => {
|
||||
debug!("local forward bidirectional copy error: {}", e);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub struct RemoteForwarder {
|
||||
spec: PortForwardSpec,
|
||||
cancel: Option<tokio::sync::oneshot::Sender<()>>,
|
||||
}
|
||||
|
||||
impl RemoteForwarder {
|
||||
pub fn new(spec: PortForwardSpec) -> Result<Self, ForwardError> {
|
||||
if spec.kind != PortForwardSpecKind::Remote {
|
||||
return Err(ForwardError::InvalidSpec {
|
||||
spec: format!("expected remote spec, got {:?}", spec.kind),
|
||||
});
|
||||
}
|
||||
Ok(Self { spec, cancel: None })
|
||||
}
|
||||
|
||||
pub fn spec(&self) -> &PortForwardSpec {
|
||||
&self.spec
|
||||
}
|
||||
|
||||
pub async fn register<H: client::Handler + Send + 'static>(
|
||||
&self,
|
||||
handle: &mut client::Handle<H>,
|
||||
) -> Result<u32, ForwardError> {
|
||||
let port = handle
|
||||
.tcpip_forward(&self.spec.bind_addr, self.spec.bind_port as u32)
|
||||
.await
|
||||
.map_err(|e| ForwardError::ChannelOpenFailed {
|
||||
source: Box::new(e) as _,
|
||||
})?;
|
||||
Ok(port)
|
||||
}
|
||||
|
||||
pub async fn handle_forwarded_channel(
|
||||
channel: russh::Channel<russh::client::Msg>,
|
||||
connected_address: &str,
|
||||
connected_port: u32,
|
||||
local_host: &str,
|
||||
local_port: u16,
|
||||
) {
|
||||
debug!(
|
||||
"remote forward: server opened forwarded-tcpip channel to {}:{} -> local {}:{}",
|
||||
connected_address, connected_port, local_host, local_port
|
||||
);
|
||||
|
||||
let local_target = format!("{}:{}", local_host, local_port);
|
||||
let local_stream = match TcpStream::connect(&local_target).await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
error!(
|
||||
"remote forward: failed to connect to local target {}: {}",
|
||||
local_target, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let ssh_stream = channel.into_stream();
|
||||
let (mut ssh_read, mut ssh_write) = tokio::io::split(ssh_stream);
|
||||
let (mut local_read, mut local_write) = tokio::io::split(local_stream);
|
||||
|
||||
let client_to_server = io::copy(&mut local_read, &mut ssh_write);
|
||||
let server_to_client = io::copy(&mut ssh_read, &mut local_write);
|
||||
|
||||
match tokio::join!(client_to_server, server_to_client) {
|
||||
(Err(e), _) | (_, Err(e)) => {
|
||||
debug!("remote forward bidirectional copy error: {}", e);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn unregister<H: client::Handler + Send + 'static>(
|
||||
&self,
|
||||
handle: &client::Handle<H>,
|
||||
) -> Result<(), ForwardError> {
|
||||
handle
|
||||
.cancel_tcpip_forward(&self.spec.bind_addr, self.spec.bind_port as u32)
|
||||
.await
|
||||
.map_err(|e| ForwardError::ChannelOpenFailed {
|
||||
source: Box::new(e) as _,
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn stop(&mut self) {
|
||||
if let Some(cancel) = self.cancel.take() {
|
||||
let _ = cancel.send(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_local_forwarders<H: client::Handler + Send + 'static>(
|
||||
forwarders: Vec<LocalForwarder>,
|
||||
handle: Arc<Mutex<client::Handle<H>>>,
|
||||
mut shutdown: tokio::sync::watch::Receiver<bool>,
|
||||
) -> Vec<LocalForwarder> {
|
||||
let mut forwarders = forwarders;
|
||||
let mut tasks = Vec::new();
|
||||
|
||||
for forwarder in forwarders.drain(..) {
|
||||
let handle = handle.clone();
|
||||
let spec = forwarder.spec().clone();
|
||||
let (_cancel_tx, cancel_rx) = tokio::sync::oneshot::channel::<()>();
|
||||
tasks.push(tokio::spawn(async move {
|
||||
let mut fwd = forwarder;
|
||||
tokio::select! {
|
||||
result = fwd.run(handle) => {
|
||||
if let Err(e) = result {
|
||||
error!("local forward {} failed: {}", spec, e);
|
||||
}
|
||||
}
|
||||
_ = cancel_rx => {
|
||||
fwd.stop().await;
|
||||
}
|
||||
}
|
||||
fwd
|
||||
}));
|
||||
}
|
||||
|
||||
let _ = shutdown.changed().await;
|
||||
|
||||
for task in &tasks {
|
||||
task.abort();
|
||||
}
|
||||
|
||||
let mut results = Vec::new();
|
||||
for task in tasks {
|
||||
match task.await {
|
||||
Ok(fwd) => results.push(fwd),
|
||||
Err(e) => {
|
||||
if !e.is_cancelled() {
|
||||
error!("local forwarder task panicked: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
results
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parse_local_spec() {
|
||||
let spec = PortForwardSpec::local("127.0.0.1:5432:db.internal:5432").unwrap();
|
||||
assert_eq!(spec.kind, PortForwardSpecKind::Local);
|
||||
assert_eq!(spec.bind_addr, "127.0.0.1");
|
||||
assert_eq!(spec.bind_port, 5432);
|
||||
assert_eq!(spec.target_host, "db.internal");
|
||||
assert_eq!(spec.target_port, 5432);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_remote_spec() {
|
||||
let spec = PortForwardSpec::remote("0.0.0.0:8080:127.0.0.1:3000").unwrap();
|
||||
assert_eq!(spec.kind, PortForwardSpecKind::Remote);
|
||||
assert_eq!(spec.bind_addr, "0.0.0.0");
|
||||
assert_eq!(spec.bind_port, 8080);
|
||||
assert_eq!(spec.target_host, "127.0.0.1");
|
||||
assert_eq!(spec.target_port, 3000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_spec_invalid_few_parts() {
|
||||
assert!(PortForwardSpec::local("127.0.0.1:5432:db").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_spec_invalid_many_parts() {
|
||||
assert!(PortForwardSpec::local("a:b:c:d:e").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_spec_invalid_port() {
|
||||
assert!(PortForwardSpec::local("127.0.0.1:abc:db:5432").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_spec_invalid_target_port() {
|
||||
assert!(PortForwardSpec::local("127.0.0.1:5432:db:abc").is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn spec_display() {
|
||||
let spec = PortForwardSpec::local("127.0.0.1:5432:db.internal:5432").unwrap();
|
||||
assert_eq!(spec.to_string(), "-L 127.0.0.1:5432:db.internal:5432");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn spec_display_remote() {
|
||||
let spec = PortForwardSpec::remote("0.0.0.0:8080:127.0.0.1:3000").unwrap();
|
||||
assert_eq!(spec.to_string(), "-R 0.0.0.0:8080:127.0.0.1:3000");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn local_forwarder_rejects_remote_spec() {
|
||||
let spec = PortForwardSpec::remote("0.0.0.0:8080:127.0.0.1:3000").unwrap();
|
||||
assert!(LocalForwarder::new(spec).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_forwarder_rejects_local_spec() {
|
||||
let spec = PortForwardSpec::local("127.0.0.1:5432:db.internal:5432").unwrap();
|
||||
assert!(RemoteForwarder::new(spec).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn listen_addr_valid() {
|
||||
let spec = PortForwardSpec::local("127.0.0.1:5432:db.internal:5432").unwrap();
|
||||
let addr = spec.listen_addr().unwrap();
|
||||
assert_eq!(addr.port(), 5432);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn listen_addr_invalid_host() {
|
||||
let spec = PortForwardSpec {
|
||||
kind: PortForwardSpecKind::Local,
|
||||
bind_addr: "!!!invalid".to_string(),
|
||||
bind_port: 5432,
|
||||
target_host: "db".to_string(),
|
||||
target_port: 5432,
|
||||
};
|
||||
assert!(spec.listen_addr().is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn local_forward_bind_and_accept() {
|
||||
let spec = PortForwardSpec::local(&format!("127.0.0.1:0:remote:5432")).unwrap();
|
||||
let forwarder = LocalForwarder::new(spec).unwrap();
|
||||
|
||||
let listen_addr = forwarder.spec.listen_addr().unwrap();
|
||||
let listener = TcpListener::bind(listen_addr).await.unwrap();
|
||||
let bound_addr = listener.local_addr().unwrap();
|
||||
drop(listener);
|
||||
|
||||
let spec = PortForwardSpec::local(&format!(
|
||||
"127.0.0.1:{}:remote:5432",
|
||||
bound_addr.port()
|
||||
))
|
||||
.unwrap();
|
||||
let forwarder = LocalForwarder::new(spec).unwrap();
|
||||
assert_eq!(forwarder.local_port(), bound_addr.port());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn remote_forward_proxy_bidirectional() {
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
||||
let echo_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let _echo_addr = echo_listener.local_addr().unwrap();
|
||||
|
||||
let echo_server = tokio::spawn(async move {
|
||||
let (mut stream, _) = echo_listener.accept().await.unwrap();
|
||||
let mut buf = [0u8; 64];
|
||||
loop {
|
||||
let n = match stream.read(&mut buf).await {
|
||||
Ok(0) => break,
|
||||
Ok(n) => n,
|
||||
Err(_) => break,
|
||||
};
|
||||
if stream.write_all(&buf[..n]).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let local_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let local_addr = local_listener.local_addr().unwrap();
|
||||
|
||||
let proxy_task = tokio::spawn(async move {
|
||||
let (stream, _) = local_listener.accept().await.unwrap();
|
||||
let (mut read, mut write) = tokio::io::split(stream);
|
||||
let _ = io::copy(&mut read, &mut write).await;
|
||||
});
|
||||
|
||||
let mut local_conn = TcpStream::connect(local_addr).await.unwrap();
|
||||
local_conn.write_all(b"hello").await.unwrap();
|
||||
let mut buf = [0u8; 64];
|
||||
let n = local_conn.read(&mut buf).await.unwrap();
|
||||
assert_eq!(&buf[..n], b"hello");
|
||||
|
||||
echo_server.abort();
|
||||
proxy_task.abort();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn forwarder_spec_access() {
|
||||
let spec = PortForwardSpec::local("127.0.0.1:5432:db.internal:5432").unwrap();
|
||||
let forwarder = LocalForwarder::new(spec.clone()).unwrap();
|
||||
assert_eq!(forwarder.spec(), &spec);
|
||||
assert_eq!(forwarder.local_port(), 5432);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remote_forwarder_spec_access() {
|
||||
let spec = PortForwardSpec::remote("0.0.0.0:8080:127.0.0.1:3000").unwrap();
|
||||
let forwarder = RemoteForwarder::new(spec.clone()).unwrap();
|
||||
assert_eq!(forwarder.spec(), &spec);
|
||||
}
|
||||
}
|
||||
5
crates/wraith-core/src/client/mod.rs
Normal file
5
crates/wraith-core/src/client/mod.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
pub mod channel_manager;
|
||||
pub mod forward;
|
||||
|
||||
pub use channel_manager::{ChannelManager, ForwardRequest};
|
||||
pub use forward::{LocalForwarder, PortForwardSpec, PortForwardSpecKind, RemoteForwarder};
|
||||
@@ -60,6 +60,22 @@ pub enum ConfigError {
|
||||
IncompatibleOptions,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ForwardError {
|
||||
#[error("invalid forward specification: {spec}")]
|
||||
InvalidSpec { spec: String },
|
||||
#[error("bind failed")]
|
||||
BindFailed {
|
||||
#[source]
|
||||
source: io::Error,
|
||||
},
|
||||
#[error("channel open failed")]
|
||||
ChannelOpenFailed {
|
||||
#[source]
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
},
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -8,5 +8,6 @@ pub mod error;
|
||||
#[cfg(feature = "testutil")]
|
||||
pub mod testutil;
|
||||
|
||||
pub use error::{AuthError, ChannelError, ConfigError, TransportError};
|
||||
pub use transport::{Transport, TransportAcceptor, TransportInfo, TransportKind};
|
||||
pub use error::{AuthError, ChannelError, ConfigError, ForwardError, TransportError};
|
||||
pub use transport::{Transport, TransportAcceptor, TransportInfo, TransportKind};
|
||||
pub use client::channel_manager::{ChannelManager, ForwardRequest};
|
||||
186
crates/wraith-core/src/server/control_channel.rs
Normal file
186
crates/wraith-core/src/server/control_channel.rs
Normal file
@@ -0,0 +1,186 @@
|
||||
use std::io;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
pub const WRAITH_CONTROL_DESTINATION: &str = "wraith-control";
|
||||
pub const WRAITH_PREFIX: &str = "wraith-";
|
||||
|
||||
pub fn is_reserved_destination(host: &str) -> bool {
|
||||
host.starts_with(WRAITH_PREFIX)
|
||||
}
|
||||
|
||||
pub trait DuplexStream: AsyncRead + AsyncWrite + Unpin + Send {}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite + Unpin + Send> DuplexStream for T {}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ControlChannelHandler: Send + Sync {
|
||||
async fn handle_channel(&self, stream: Box<dyn DuplexStream>);
|
||||
}
|
||||
|
||||
pub struct ControlChannelRouter {
|
||||
handler: Option<Box<dyn ControlChannelHandler>>,
|
||||
}
|
||||
|
||||
impl ControlChannelRouter {
|
||||
pub fn new(handler: Option<Box<dyn ControlChannelHandler>>) -> Self {
|
||||
Self { handler }
|
||||
}
|
||||
|
||||
pub fn without_handler() -> Self {
|
||||
Self { handler: None }
|
||||
}
|
||||
|
||||
pub fn with_handler(handler: Box<dyn ControlChannelHandler>) -> Self {
|
||||
Self {
|
||||
handler: Some(handler),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn has_handler(&self) -> bool {
|
||||
self.handler.is_some()
|
||||
}
|
||||
|
||||
pub async fn route(&self, stream: Box<dyn DuplexStream>) -> io::Result<()> {
|
||||
match &self.handler {
|
||||
Some(handler) => {
|
||||
handler.handle_channel(stream).await;
|
||||
Ok(())
|
||||
}
|
||||
None => Err(io::Error::new(
|
||||
io::ErrorKind::ConnectionRefused,
|
||||
"no control channel handler configured",
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tokio::io::duplex;
|
||||
|
||||
#[test]
|
||||
fn wraith_control_destination_constant() {
|
||||
assert_eq!(WRAITH_CONTROL_DESTINATION, "wraith-control");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wraith_prefix_constant() {
|
||||
assert_eq!(WRAITH_PREFIX, "wraith-");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reserved_destination_detected() {
|
||||
assert!(is_reserved_destination("wraith-control"));
|
||||
assert!(is_reserved_destination("wraith-status"));
|
||||
assert!(is_reserved_destination("wraith-events"));
|
||||
assert!(is_reserved_destination("wraith-"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn non_reserved_destination_passes_through() {
|
||||
assert!(!is_reserved_destination("example.com"));
|
||||
assert!(!is_reserved_destination("localhost"));
|
||||
assert!(!is_reserved_destination("192.168.1.1"));
|
||||
assert!(!is_reserved_destination("wraith.example.com"));
|
||||
assert!(!is_reserved_destination(""));
|
||||
assert!(!is_reserved_destination("wrait-control"));
|
||||
assert!(!is_reserved_destination("WRAITH-control"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prefix_matching_case_sensitive() {
|
||||
assert!(!is_reserved_destination("Wraith-control"));
|
||||
assert!(!is_reserved_destination("WRAITH-control"));
|
||||
assert!(is_reserved_destination("wraith-Control"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn router_without_handler_has_no_handler() {
|
||||
let router = ControlChannelRouter::without_handler();
|
||||
assert!(!router.has_handler());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn router_with_handler_has_handler() {
|
||||
struct DummyHandler;
|
||||
#[async_trait]
|
||||
impl ControlChannelHandler for DummyHandler {
|
||||
async fn handle_channel(&self, _stream: Box<dyn DuplexStream>) {}
|
||||
}
|
||||
let router = ControlChannelRouter::with_handler(Box::new(DummyHandler));
|
||||
assert!(router.has_handler());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn route_without_handler_returns_error() {
|
||||
let router = ControlChannelRouter::without_handler();
|
||||
let (_client, server) = duplex(64);
|
||||
let stream: Box<dyn DuplexStream> = Box::new(server);
|
||||
let result = router.route(stream).await;
|
||||
assert!(result.is_err());
|
||||
let err = result.unwrap_err();
|
||||
assert_eq!(err.kind(), io::ErrorKind::ConnectionRefused);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn route_with_handler_succeeds() {
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
struct TrackedHandler {
|
||||
called: Arc<AtomicBool>,
|
||||
}
|
||||
#[async_trait]
|
||||
impl ControlChannelHandler for TrackedHandler {
|
||||
async fn handle_channel(&self, _stream: Box<dyn DuplexStream>) {
|
||||
self.called.store(true, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
let called = Arc::new(AtomicBool::new(false));
|
||||
let handler = TrackedHandler {
|
||||
called: called.clone(),
|
||||
};
|
||||
let router = ControlChannelRouter::with_handler(Box::new(handler));
|
||||
let (_client, server) = duplex(64);
|
||||
let stream: Box<dyn DuplexStream> = Box::new(server);
|
||||
let result = router.route(stream).await;
|
||||
assert!(result.is_ok());
|
||||
assert!(called.load(Ordering::SeqCst));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn route_with_handler_can_read_write() {
|
||||
struct EchoHandler;
|
||||
#[async_trait]
|
||||
impl ControlChannelHandler for EchoHandler {
|
||||
async fn handle_channel(&self, mut stream: Box<dyn DuplexStream>) {
|
||||
let mut buf = [0u8; 64];
|
||||
let n = stream.read(&mut buf).await.unwrap();
|
||||
stream.write_all(&buf[..n]).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let router = ControlChannelRouter::with_handler(Box::new(EchoHandler));
|
||||
let (client, server) = duplex(64);
|
||||
let stream: Box<dyn DuplexStream> = Box::new(server);
|
||||
tokio::spawn(async move {
|
||||
router.route(stream).await.unwrap();
|
||||
});
|
||||
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
let mut client = client;
|
||||
client.write_all(b"hello").await.unwrap();
|
||||
let mut buf = [0u8; 5];
|
||||
client.read_exact(&mut buf).await.unwrap();
|
||||
assert_eq!(&buf, b"hello");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn control_channel_destination_matches_prefix() {
|
||||
assert!(is_reserved_destination(WRAITH_CONTROL_DESTINATION));
|
||||
}
|
||||
}
|
||||
336
crates/wraith-core/src/server/handler.rs
Normal file
336
crates/wraith-core/src/server/handler.rs
Normal file
@@ -0,0 +1,336 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use russh::keys::ssh_key::HashAlg;
|
||||
use russh::server::{Auth, Handler, Msg, Session};
|
||||
use russh::Channel;
|
||||
|
||||
use crate::auth::ServerAuthConfig;
|
||||
use crate::server::control_channel::{
|
||||
ControlChannelHandler, ControlChannelRouter, WRAITH_PREFIX,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ProxyMode {
|
||||
Direct,
|
||||
Socks5(SocketAddr),
|
||||
HttpConnect(SocketAddr),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ProxyConfig {
|
||||
pub mode: ProxyMode,
|
||||
}
|
||||
|
||||
pub struct ServerHandler {
|
||||
auth_config: Arc<ServerAuthConfig>,
|
||||
outbound_proxy: Option<ProxyConfig>,
|
||||
remote_addr: Option<SocketAddr>,
|
||||
control_channel_router: ControlChannelRouter,
|
||||
}
|
||||
|
||||
impl ServerHandler {
|
||||
pub fn new(
|
||||
auth_config: Arc<ServerAuthConfig>,
|
||||
outbound_proxy: Option<ProxyConfig>,
|
||||
remote_addr: Option<SocketAddr>,
|
||||
) -> Self {
|
||||
Self {
|
||||
auth_config,
|
||||
outbound_proxy,
|
||||
remote_addr,
|
||||
control_channel_router: ControlChannelRouter::without_handler(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_control_channel_handler(
|
||||
mut self,
|
||||
handler: Box<dyn ControlChannelHandler>,
|
||||
) -> Self {
|
||||
self.control_channel_router = ControlChannelRouter::with_handler(handler);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn control_channel_router(&self) -> &ControlChannelRouter {
|
||||
&self.control_channel_router
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Handler for ServerHandler {
|
||||
type Error = russh::Error;
|
||||
|
||||
async fn auth_publickey(
|
||||
&mut self,
|
||||
user: &str,
|
||||
public_key: &russh::keys::ssh_key::PublicKey,
|
||||
) -> Result<Auth, Self::Error> {
|
||||
let fingerprint = format!("{}", public_key.fingerprint(HashAlg::Sha256));
|
||||
let remote_addr_display = self
|
||||
.remote_addr
|
||||
.map_or("unknown".to_string(), |a| a.to_string());
|
||||
|
||||
let russh_pub = russh::keys::PublicKey::new(public_key.key_data().clone(), user);
|
||||
let result = self.auth_config.authenticate_publickey(&russh_pub);
|
||||
|
||||
match result {
|
||||
Ok(()) => {
|
||||
tracing::info!(
|
||||
remote_addr = %remote_addr_display,
|
||||
key_fingerprint = %fingerprint,
|
||||
result = "accept",
|
||||
"auth attempt"
|
||||
);
|
||||
Ok(Auth::Accept)
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::info!(
|
||||
remote_addr = %remote_addr_display,
|
||||
key_fingerprint = %fingerprint,
|
||||
result = "reject",
|
||||
"auth attempt"
|
||||
);
|
||||
Ok(Auth::Reject {
|
||||
proceed_with_methods: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn channel_open_direct_tcpip(
|
||||
&mut self,
|
||||
channel: Channel<Msg>,
|
||||
host_to_connect: &str,
|
||||
port_to_connect: u32,
|
||||
originator_address: &str,
|
||||
originator_port: u32,
|
||||
_session: &mut Session,
|
||||
) -> Result<bool, Self::Error> {
|
||||
if host_to_connect.starts_with(WRAITH_PREFIX) {
|
||||
tracing::info!(
|
||||
host = host_to_connect,
|
||||
port = port_to_connect,
|
||||
"routing to internal control channel handler"
|
||||
);
|
||||
|
||||
if !self.control_channel_router.has_handler() {
|
||||
tracing::warn!(
|
||||
host = host_to_connect,
|
||||
"no control channel handler configured, rejecting channel open"
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let _ = channel;
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let proxy_info = self
|
||||
.outbound_proxy
|
||||
.as_ref()
|
||||
.map(|p| format!("{:?}", p.mode))
|
||||
.unwrap_or_else(|| "direct".to_string());
|
||||
|
||||
tracing::info!(
|
||||
host = host_to_connect,
|
||||
port = port_to_connect,
|
||||
originator_address = originator_address,
|
||||
originator_port = originator_port,
|
||||
proxy = %proxy_info,
|
||||
"spawning tcp proxy task"
|
||||
);
|
||||
|
||||
let _ = channel;
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
async fn channel_open_session(
|
||||
&mut self,
|
||||
_channel: Channel<Msg>,
|
||||
_session: &mut Session,
|
||||
) -> Result<bool, Self::Error> {
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
async fn channel_open_x11(
|
||||
&mut self,
|
||||
_channel: Channel<Msg>,
|
||||
_originator_address: &str,
|
||||
_originator_port: u32,
|
||||
_session: &mut Session,
|
||||
) -> Result<bool, Self::Error> {
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
async fn channel_open_forwarded_tcpip(
|
||||
&mut self,
|
||||
_channel: Channel<Msg>,
|
||||
_host_to_connect: &str,
|
||||
_port_to_connect: u32,
|
||||
_originator_address: &str,
|
||||
_originator_port: u32,
|
||||
_session: &mut Session,
|
||||
) -> Result<bool, Self::Error> {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::auth::keys::KeySource;
|
||||
use russh::keys::{decode_secret_key, PrivateKey};
|
||||
use std::io::Write;
|
||||
|
||||
const ED25519_PRIVATE_KEY: &str = "-----BEGIN OPENSSH PRIVATE KEY-----\nb3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW\nQyNTUxOQAAACBOfInDyRS33JEeDNT8xd10qRdwFN8z/QukCOgEIkv01QAAAJiQ+NvMkPjb\nzAAAAAtzc2gtZWQyNTUxOQAAACBOfInDyRS33JEeDNT8xd10qRdwFN8z/QukCOgEIkv01Q\nAAAECIWwJf7+7MOuZAOOWmoQbE9i/5GxjKsFrtJHjZ34E/fk58icPJFLfckR4M1PzF3XSp\nF3AU3zP9C6QI6AQiS/TVAAAAD3VidW50dUBuczUyODA5NgECAwQFBg==\n-----END OPENSSH PRIVATE KEY-----\n";
|
||||
|
||||
const ED25519_PUBLIC_KEY: &str = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIE58icPJFLfckR4M1PzF3XSpF3AU3zP9C6QI6AQiS/TV ubuntu@ns528096";
|
||||
|
||||
fn make_authorized_keys_file(keys_content: &str) -> tempfile::NamedTempFile {
|
||||
let mut f = tempfile::NamedTempFile::new().unwrap();
|
||||
f.write_all(keys_content.as_bytes()).unwrap();
|
||||
f.flush().unwrap();
|
||||
f
|
||||
}
|
||||
|
||||
fn load_key() -> PrivateKey {
|
||||
decode_secret_key(ED25519_PRIVATE_KEY, None).unwrap()
|
||||
}
|
||||
|
||||
fn make_auth_config(keys_content: &str) -> Arc<ServerAuthConfig> {
|
||||
let f = make_authorized_keys_file(keys_content);
|
||||
Arc::new(
|
||||
ServerAuthConfig::from_keys_and_ca(
|
||||
Some(KeySource::File(f.path().to_path_buf())),
|
||||
None,
|
||||
)
|
||||
.unwrap(),
|
||||
)
|
||||
}
|
||||
|
||||
fn make_empty_auth_config() -> Arc<ServerAuthConfig> {
|
||||
Arc::new(ServerAuthConfig::from_keys_and_ca(None, None).unwrap())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn auth_delegation_accepts_known_key() {
|
||||
let auth_config = make_auth_config(ED25519_PUBLIC_KEY);
|
||||
let mut handler = ServerHandler::new(auth_config, None, None);
|
||||
|
||||
let ssh_key = load_key().public_key().clone();
|
||||
let result = handler.auth_publickey("testuser", &ssh_key).await.unwrap();
|
||||
assert_eq!(result, Auth::Accept);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn auth_delegation_rejects_unknown_key() {
|
||||
let auth_config = make_auth_config(ED25519_PUBLIC_KEY);
|
||||
let mut handler = ServerHandler::new(auth_config, None, None);
|
||||
|
||||
let other_key_text = "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIHeLC1lWiCYrXsf/85O/pkbUFZ6OGIt49PX3nw8iRoXE other@host";
|
||||
let other_ssh_key = russh::keys::parse_public_key_base64(
|
||||
other_key_text.split_whitespace().nth(1).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let result = handler
|
||||
.auth_publickey("testuser", &other_ssh_key)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
result,
|
||||
Auth::Reject {
|
||||
proceed_with_methods: None
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn auth_delegation_empty_config_rejects_all() {
|
||||
let auth_config = make_empty_auth_config();
|
||||
let mut handler = ServerHandler::new(auth_config, None, None);
|
||||
|
||||
let ssh_key = load_key().public_key().clone();
|
||||
let result = handler
|
||||
.auth_publickey("testuser", &ssh_key)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
result,
|
||||
Auth::Reject {
|
||||
proceed_with_methods: None
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn auth_logging_includes_remote_addr() {
|
||||
let auth_config = make_auth_config(ED25519_PUBLIC_KEY);
|
||||
let remote_addr: SocketAddr = "203.0.113.50:12345".parse().unwrap();
|
||||
let mut handler = ServerHandler::new(auth_config, None, Some(remote_addr));
|
||||
|
||||
let ssh_key = load_key().public_key().clone();
|
||||
let _ = handler.auth_publickey("root", &ssh_key).await.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reserved_wraith_destination_routing() {
|
||||
use crate::server::control_channel::is_reserved_destination;
|
||||
assert!(is_reserved_destination("wraith-control"));
|
||||
assert!(is_reserved_destination("wraith-status"));
|
||||
assert!(is_reserved_destination("wraith-events"));
|
||||
assert!(!is_reserved_destination("example.com"));
|
||||
assert!(!is_reserved_destination("localhost"));
|
||||
assert!(!is_reserved_destination("wraith.example.com"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn server_handler_without_control_handler_rejects_wraith_destinations() {
|
||||
let auth_config = make_empty_auth_config();
|
||||
let handler = ServerHandler::new(auth_config, None, None);
|
||||
assert!(!handler.control_channel_router().has_handler());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn proxy_mode_variants() {
|
||||
let direct = ProxyMode::Direct;
|
||||
let socks5 = ProxyMode::Socks5("127.0.0.1:9050".parse().unwrap());
|
||||
let http = ProxyMode::HttpConnect("127.0.0.1:8080".parse().unwrap());
|
||||
|
||||
match direct {
|
||||
ProxyMode::Direct => {}
|
||||
_ => panic!("expected Direct"),
|
||||
}
|
||||
match socks5 {
|
||||
ProxyMode::Socks5(_) => {}
|
||||
_ => panic!("expected Socks5"),
|
||||
}
|
||||
match http {
|
||||
ProxyMode::HttpConnect(_) => {}
|
||||
_ => panic!("expected HttpConnect"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn server_handler_holds_config() {
|
||||
let auth_config = make_empty_auth_config();
|
||||
let proxy = Some(ProxyConfig {
|
||||
mode: ProxyMode::Socks5("127.0.0.1:9050".parse().unwrap()),
|
||||
});
|
||||
let remote: Option<SocketAddr> = Some("10.0.0.1:22".parse().unwrap());
|
||||
|
||||
let handler = ServerHandler::new(auth_config, proxy.clone(), remote);
|
||||
assert!(handler.outbound_proxy.is_some());
|
||||
assert!(handler.remote_addr.is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn one_handler_per_connection() {
|
||||
let auth_config = make_empty_auth_config();
|
||||
let handler1 = ServerHandler::new(auth_config.clone(), None, Some("10.0.0.1:22".parse().unwrap()));
|
||||
let handler2 = ServerHandler::new(auth_config.clone(), None, Some("10.0.0.2:22".parse().unwrap()));
|
||||
|
||||
assert!(handler1.remote_addr != handler2.remote_addr);
|
||||
}
|
||||
}
|
||||
8
crates/wraith-core/src/server/mod.rs
Normal file
8
crates/wraith-core/src/server/mod.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
pub mod control_channel;
|
||||
pub mod handler;
|
||||
|
||||
pub use control_channel::{
|
||||
ControlChannelHandler, ControlChannelRouter, DuplexStream, WRAITH_CONTROL_DESTINATION,
|
||||
WRAITH_PREFIX, is_reserved_destination,
|
||||
};
|
||||
pub use handler::{ProxyConfig, ProxyMode, ServerHandler};
|
||||
362
crates/wraith-core/src/transport/acme.rs
Normal file
362
crates/wraith-core/src/transport/acme.rs
Normal file
@@ -0,0 +1,362 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, Result};
|
||||
use rustls::crypto::aws_lc_rs::default_provider;
|
||||
use rustls::ServerConfig;
|
||||
use rustls_acme::caches::DirCache;
|
||||
use rustls_acme::{AcmeConfig, AcmeState, ResolvesServerCertAcme};
|
||||
use tracing::{error, info};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_rustls::TlsAcceptor as TokioTlsAcceptor;
|
||||
|
||||
use super::{TransportAcceptor, TransportInfo, TransportKind};
|
||||
|
||||
const ACME_TLS_ALPN_NAME: &[u8] = b"acme-tls/1";
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum AcmeMode {
|
||||
Domain { domain: String },
|
||||
Ip,
|
||||
}
|
||||
|
||||
pub struct AcmeCertProvider {
|
||||
mode: AcmeMode,
|
||||
cache_dir: Option<PathBuf>,
|
||||
directory_url: String,
|
||||
contact: Vec<String>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for AcmeCertProvider {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("AcmeCertProvider")
|
||||
.field("mode", &self.mode)
|
||||
.field("cache_dir", &self.cache_dir)
|
||||
.field("directory_url", &self.directory_url)
|
||||
.field("contact", &self.contact)
|
||||
.finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl AcmeCertProvider {
|
||||
pub fn new(mode: AcmeMode) -> Self {
|
||||
Self {
|
||||
mode,
|
||||
cache_dir: None,
|
||||
directory_url: rustls_acme::acme::LETS_ENCRYPT_STAGING_DIRECTORY.to_string(),
|
||||
contact: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn domain(domain: impl Into<String>) -> Self {
|
||||
Self::new(AcmeMode::Domain {
|
||||
domain: domain.into(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn ip() -> Self {
|
||||
Self::new(AcmeMode::Ip)
|
||||
}
|
||||
|
||||
pub fn with_cache_dir(mut self, dir: impl Into<PathBuf>) -> Self {
|
||||
self.cache_dir = Some(dir.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_directory(mut self, url: impl Into<String>) -> Self {
|
||||
self.directory_url = url.into();
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_production_directory(mut self) -> Self {
|
||||
self.directory_url = rustls_acme::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY.to_string();
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_contact(mut self, contact: impl Into<String>) -> Self {
|
||||
self.contact.push(contact.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn mode(&self) -> &AcmeMode {
|
||||
&self.mode
|
||||
}
|
||||
|
||||
fn build_acme_state(&self) -> (AcmeState<std::io::Error>, Arc<ResolvesServerCertAcme>) {
|
||||
let domains: Vec<String> = match &self.mode {
|
||||
AcmeMode::Domain { domain } => vec![domain.clone()],
|
||||
AcmeMode::Ip => vec![],
|
||||
};
|
||||
|
||||
let base_config = AcmeConfig::new(domains)
|
||||
.directory(&self.directory_url)
|
||||
.contact(self.contact.clone());
|
||||
|
||||
let state = match &self.cache_dir {
|
||||
Some(cache_dir) => {
|
||||
base_config.cache(DirCache::new(cache_dir.clone())).state()
|
||||
}
|
||||
None => {
|
||||
base_config
|
||||
.cache(rustls_acme::caches::NoCache::default())
|
||||
.state()
|
||||
}
|
||||
};
|
||||
|
||||
let resolver = state.resolver();
|
||||
(state, resolver)
|
||||
}
|
||||
|
||||
pub fn build_server_config_with_resolver(
|
||||
&self,
|
||||
resolver: Arc<ResolvesServerCertAcme>,
|
||||
) -> Result<Arc<ServerConfig>> {
|
||||
let provider = default_provider().into();
|
||||
let mut config = ServerConfig::builder_with_provider(provider)
|
||||
.with_safe_default_protocol_versions()
|
||||
.map_err(|e| anyhow!("failed to set protocol versions: {}", e))?
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(resolver);
|
||||
config.alpn_protocols.push(ACME_TLS_ALPN_NAME.to_vec());
|
||||
Ok(Arc::new(config))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AcmeTlsAcceptor {
|
||||
listener: TcpListener,
|
||||
listen_addr: SocketAddr,
|
||||
#[allow(dead_code)]
|
||||
server_config: Arc<ServerConfig>,
|
||||
tokio_acceptor: TokioTlsAcceptor,
|
||||
}
|
||||
|
||||
impl AcmeTlsAcceptor {
|
||||
pub async fn bind_acme(
|
||||
addr: SocketAddr,
|
||||
provider: Arc<AcmeCertProvider>,
|
||||
) -> Result<Self> {
|
||||
let (state, resolver) = provider.build_acme_state();
|
||||
|
||||
let server_config = provider.build_server_config_with_resolver(resolver.clone())?;
|
||||
|
||||
Self::spawn_state_worker(state, resolver);
|
||||
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
let listen_addr = listener.local_addr()?;
|
||||
|
||||
let tokio_acceptor = TokioTlsAcceptor::from(server_config.clone());
|
||||
|
||||
Ok(Self {
|
||||
listener,
|
||||
listen_addr,
|
||||
server_config,
|
||||
tokio_acceptor,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn listen_addr(&self) -> SocketAddr {
|
||||
self.listen_addr
|
||||
}
|
||||
|
||||
fn spawn_state_worker(state: AcmeState<std::io::Error>, resolver: Arc<ResolvesServerCertAcme>) {
|
||||
use futures::StreamExt;
|
||||
|
||||
let task = async move {
|
||||
let mut state = state;
|
||||
while let Some(event) = state.next().await {
|
||||
match event {
|
||||
Ok(ok) => {
|
||||
if let rustls_acme::EventOk::DeployedNewCert = ok {
|
||||
info!("ACME: new certificate deployed");
|
||||
} else {
|
||||
info!("ACME event: {:?}", ok);
|
||||
}
|
||||
}
|
||||
Err(err) => error!("ACME event error: {:?}", err),
|
||||
}
|
||||
if Arc::strong_count(&resolver) == 1 {
|
||||
info!("ACME resolver dropped, stopping background task");
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
tokio::spawn(task);
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl TransportAcceptor for AcmeTlsAcceptor {
|
||||
type Stream = tokio_rustls::server::TlsStream<tokio::net::TcpStream>;
|
||||
|
||||
async fn accept(&self) -> Result<(Self::Stream, TransportInfo)> {
|
||||
let (tcp_stream, remote_addr) = self.listener.accept().await?;
|
||||
let tls_stream = self.tokio_acceptor.accept(tcp_stream).await?;
|
||||
|
||||
let server_name = tls_stream
|
||||
.get_ref()
|
||||
.1
|
||||
.server_name()
|
||||
.map(|s| s.to_string());
|
||||
|
||||
let info = TransportInfo {
|
||||
remote_addr: Some(remote_addr),
|
||||
transport_kind: TransportKind::Tls { server_name },
|
||||
};
|
||||
|
||||
Ok((tls_stream, info))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_domain_mode() {
|
||||
let provider = AcmeCertProvider::domain("example.com");
|
||||
assert!(matches!(provider.mode(), AcmeMode::Domain { .. }));
|
||||
if let AcmeMode::Domain { domain } = provider.mode() {
|
||||
assert_eq!(domain, "example.com");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_ip_mode() {
|
||||
let provider = AcmeCertProvider::ip();
|
||||
assert!(matches!(provider.mode(), AcmeMode::Ip));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_default_staging_directory() {
|
||||
let provider = AcmeCertProvider::domain("example.com");
|
||||
assert_eq!(
|
||||
provider.directory_url,
|
||||
rustls_acme::acme::LETS_ENCRYPT_STAGING_DIRECTORY
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_production_directory() {
|
||||
let provider = AcmeCertProvider::domain("example.com").with_production_directory();
|
||||
assert_eq!(
|
||||
provider.directory_url,
|
||||
rustls_acme::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_custom_directory() {
|
||||
let provider =
|
||||
AcmeCertProvider::domain("example.com").with_directory("https://custom.acme.dir/");
|
||||
assert_eq!(provider.directory_url, "https://custom.acme.dir/");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_with_cache_dir() {
|
||||
let provider = AcmeCertProvider::domain("example.com").with_cache_dir("/tmp/acme_cache");
|
||||
assert_eq!(provider.cache_dir, Some(PathBuf::from("/tmp/acme_cache")));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_with_contact() {
|
||||
let provider =
|
||||
AcmeCertProvider::domain("example.com").with_contact("mailto:admin@example.com");
|
||||
assert_eq!(
|
||||
provider.contact,
|
||||
vec!["mailto:admin@example.com".to_string()]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_build_state_domain() {
|
||||
let provider = AcmeCertProvider::domain("example.com");
|
||||
let (_state, resolver) = provider.build_acme_state();
|
||||
assert!(Arc::strong_count(&resolver) >= 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_build_state_with_cache() {
|
||||
let provider =
|
||||
AcmeCertProvider::domain("example.com").with_cache_dir("/tmp/test_cache");
|
||||
let (_state, resolver) = provider.build_acme_state();
|
||||
assert!(Arc::strong_count(&resolver) >= 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_build_server_config() {
|
||||
let _ = default_provider().install_default();
|
||||
let provider = AcmeCertProvider::domain("example.com");
|
||||
let (_, resolver) = provider.build_acme_state();
|
||||
let config = provider.build_server_config_with_resolver(resolver).unwrap();
|
||||
assert!(!config.alpn_protocols.is_empty());
|
||||
assert!(config
|
||||
.alpn_protocols
|
||||
.iter()
|
||||
.any(|p| p == ACME_TLS_ALPN_NAME));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_mode_domain_debug() {
|
||||
let mode = AcmeMode::Domain {
|
||||
domain: "test.example.com".to_string(),
|
||||
};
|
||||
let debug_str = format!("{:?}", mode);
|
||||
assert!(debug_str.contains("test.example.com"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_mode_ip_debug() {
|
||||
let mode = AcmeMode::Ip;
|
||||
let debug_str = format!("{:?}", mode);
|
||||
assert!(debug_str.contains("Ip"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acme_cert_provider_builder_chain() {
|
||||
let provider = AcmeCertProvider::domain("test.example.com")
|
||||
.with_production_directory()
|
||||
.with_cache_dir("/tmp/cache")
|
||||
.with_contact("mailto:admin@test.example.com");
|
||||
assert!(matches!(provider.mode(), AcmeMode::Domain { .. }));
|
||||
assert_eq!(
|
||||
provider.directory_url,
|
||||
rustls_acme::acme::LETS_ENCRYPT_PRODUCTION_DIRECTORY
|
||||
);
|
||||
assert_eq!(provider.cache_dir, Some(PathBuf::from("/tmp/cache")));
|
||||
assert_eq!(provider.contact.len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn acme_tls_acceptor_bind_acme() {
|
||||
let _ = default_provider().install_default();
|
||||
let provider = Arc::new(AcmeCertProvider::domain("example.com"));
|
||||
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
|
||||
let acceptor = AcmeTlsAcceptor::bind_acme(addr, provider).await.unwrap();
|
||||
assert_ne!(acceptor.listen_addr().port(), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore]
|
||||
async fn acme_staging_domain_cert_provisioning() {
|
||||
let _ = default_provider().install_default();
|
||||
|
||||
let cache_dir = tempfile::tempdir().unwrap();
|
||||
let provider = Arc::new(
|
||||
AcmeCertProvider::domain("acme-test.example.com")
|
||||
.with_cache_dir(cache_dir.path())
|
||||
.with_contact("mailto:admin@example.com"),
|
||||
);
|
||||
|
||||
let addr: SocketAddr = "0.0.0.0:443".parse().unwrap();
|
||||
let result = AcmeTlsAcceptor::bind_acme(addr, provider).await;
|
||||
assert!(
|
||||
result.is_ok(),
|
||||
"ACME TlsAcceptor should bind: {:?}",
|
||||
result.err()
|
||||
);
|
||||
|
||||
let acceptor = result.unwrap();
|
||||
assert_eq!(acceptor.listen_addr().port(), 443);
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,12 @@ mod tls;
|
||||
#[cfg(feature = "tls")]
|
||||
pub use tls::{AcmeConfig, TlsAcceptor, TlsTransport};
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
mod acme;
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
pub use acme::{AcmeCertProvider, AcmeMode, AcmeTlsAcceptor};
|
||||
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
@@ -9,8 +9,16 @@ use rustls::{ClientConfig, DigitallySignedStruct, RootCertStore, ServerConfig};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use tokio_rustls::{client::TlsStream as ClientTlsStream, TlsAcceptor as TokioTlsAcceptor, TlsConnector};
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
use rustls::crypto::aws_lc_rs::default_provider;
|
||||
#[cfg(feature = "acme")]
|
||||
use rustls_acme::ResolvesServerCertAcme;
|
||||
|
||||
use super::{Transport, TransportAcceptor, TransportInfo, TransportKind};
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
const ACME_TLS_ALPN_NAME: &[u8] = b"acme-tls/1";
|
||||
|
||||
/// A TLS-based client transport that connects to a remote address over TLS.
|
||||
///
|
||||
/// Wraps a TCP connection with a TLS client session via `tokio_rustls::TlsConnector`.
|
||||
@@ -110,8 +118,10 @@ pub struct AcmeConfig {
|
||||
/// A TLS-based server transport acceptor that accepts TCP connections
|
||||
/// and wraps them with TLS server sessions via `tokio_rustls::TlsAcceptor`.
|
||||
///
|
||||
/// Requires certificate and private key configuration. Supports manual
|
||||
/// cert/key paths and an ACME config stub (ADR-008).
|
||||
/// Supports three certificate modes (ADR-008):
|
||||
/// - Manual certs via `bind()` with explicit cert/key
|
||||
/// - ACME certs via `bind_acme()` with an `AcmeCertProvider`
|
||||
/// - The stub `AcmeConfig` parameter in `bind()` is kept for backward compat
|
||||
pub struct TlsAcceptor {
|
||||
listener: TcpListener,
|
||||
listen_addr: SocketAddr,
|
||||
@@ -145,6 +155,33 @@ impl TlsAcceptor {
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
pub async fn bind_acme(
|
||||
addr: SocketAddr,
|
||||
acme_resolver: Arc<ResolvesServerCertAcme>,
|
||||
) -> Result<Self> {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
let listen_addr = listener.local_addr()?;
|
||||
|
||||
let provider = default_provider().into();
|
||||
let mut server_config = ServerConfig::builder_with_provider(provider)
|
||||
.with_safe_default_protocol_versions()
|
||||
.map_err(|e| anyhow!("failed to set protocol versions: {}", e))?
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(acme_resolver);
|
||||
server_config.alpn_protocols.push(ACME_TLS_ALPN_NAME.to_vec());
|
||||
|
||||
let server_config = Arc::new(server_config);
|
||||
let tokio_acceptor = TokioTlsAcceptor::from(server_config.clone());
|
||||
|
||||
Ok(Self {
|
||||
listener,
|
||||
listen_addr,
|
||||
server_config,
|
||||
tokio_acceptor,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn listen_addr(&self) -> SocketAddr {
|
||||
self.listen_addr
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
---
|
||||
id: client/channel-manager
|
||||
name: Implement ChannelManager — SSH session management, channel opens, reconnection
|
||||
status: pending
|
||||
status: done
|
||||
depends_on:
|
||||
- auth/client-auth-handler
|
||||
- transport/trait-and-types
|
||||
@@ -32,18 +32,18 @@ Reconnection is always enabled. The backoff caps at 30 seconds and continues ind
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
- [ ] `crates/wraith-core/src/client/channel_manager.rs` exports `ChannelManager`
|
||||
- [ ] `ChannelManager` holds: `Arc<Transport>`, `Arc<ClientAuthConfig>`, `Arc<client::Handle<ClientHandler>>` (behind RwLock for reconnection)
|
||||
- [ ] `ChannelManager::new()` establishes initial transport connection, authenticates, returns manager
|
||||
- [ ] `open_direct_tcpip(host, port)` — opens SSH channel to target
|
||||
- [ ] `request_tcpip_forward(addr, port)` — sends `tcpip_forward` request
|
||||
- [ ] `cancel_tcpip_forward(addr, port)` — sends `cancel_tcpip_forward` request
|
||||
- [ ] Reconnection detection: monitors `handle.is_closed()`, triggers reconnect on failure
|
||||
- [ ] Exponential backoff: 1s, 2s, 4s, 8s, 16s, 30s (cap), continues indefinitely
|
||||
- [ ] Full reconnect: new transport stream, new SSH session over it (ADR-004)
|
||||
- [ ] After reconnect: port forward listeners (`-L`, `-R`) re-registered with new session
|
||||
- [ ] In-flight connections on old session fail gracefully (channel errors, not session-wide)
|
||||
- [ ] Unit tests: channel open, reconnection trigger, backoff timing, forward re-registration
|
||||
- [x] `crates/wraith-core/src/client/channel_manager.rs` exports `ChannelManager`
|
||||
- [x] `ChannelManager` holds: `Arc<Transport>`, `Arc<ClientAuthConfig>`, `Arc<client::Handle<ClientHandler>>` (behind RwLock for reconnection)
|
||||
- [x] `ChannelManager::new()` establishes initial transport connection, authenticates, returns manager
|
||||
- [x] `open_direct_tcpip(host, port)` — opens SSH channel to target
|
||||
- [x] `request_tcpip_forward(addr, port)` — sends `tcpip_forward` request
|
||||
- [x] `cancel_tcpip_forward(addr, port)` — sends `cancel_tcpip_forward` request
|
||||
- [x] Reconnection detection: monitors `handle.is_closed()`, triggers reconnect on failure
|
||||
- [x] Exponential backoff: 1s, 2s, 4s, 8s, 16s, 30s (cap), continues indefinitely
|
||||
- [x] Full reconnect: new transport stream, new SSH session over it (ADR-004)
|
||||
- [x] After reconnect: port forward listeners (`-L`, `-R`) re-registered with new session
|
||||
- [x] In-flight connections on old session fail gracefully (channel errors, not session-wide)
|
||||
- [x] Unit tests: channel open, reconnection trigger, backoff timing, forward re-registration
|
||||
|
||||
## References
|
||||
|
||||
@@ -52,8 +52,13 @@ Reconnection is always enabled. The backoff caps at 30 seconds and continues ind
|
||||
|
||||
## Notes
|
||||
|
||||
> To be filled by implementation agent
|
||||
- Converted `client.rs` (single file) to directory module: `client/mod.rs` + `client/channel_manager.rs`
|
||||
- Used `russh::keys::PrivateKey` and `russh::keys::PublicKey` (not the nonexistent `russh::key::KeyPair`)
|
||||
- Reconnection monitor runs as a spawned tokio task that polls `handle.is_closed()` every 1s
|
||||
- On reconnect: creates new transport stream + new SSH session (ADR-004 full reconnect)
|
||||
- `ForwardRequest` struct tracks registered port forwards for re-registration after reconnect
|
||||
- In-flight channels on old session naturally fail with `ChannelError::ChannelClosed` since the handle is replaced
|
||||
|
||||
## Summary
|
||||
|
||||
> To be filled on completion
|
||||
Implemented `ChannelManager` in `crates/wraith-core/src/client/channel_manager.rs` with SSH session management, channel opens (`open_direct_tcpip`), port forward requests (`request_tcpip_forward`/`cancel_tcpip_forward`), and automatic reconnection with exponential backoff (1s→30s cap). Full reconnect per ADR-004 creates new transport stream + new SSH session. Port forwards are re-registered after successful reconnect. 8 unit tests covering backoff timing, forward tracking, transport failure, and reconnection detection.
|
||||
@@ -43,8 +43,14 @@ This integrates with `TlsAcceptor` by providing ACME-resolved certificates inste
|
||||
|
||||
## Notes
|
||||
|
||||
> To be filled by implementation agent
|
||||
- `AcmeCertProvider` is the main entry point. It creates `AcmeState` and `ResolvesServerCertAcme` from `rustls-acme`.
|
||||
- The `ResolvesServerCertAcme` resolver is shared between the `AcmeState` background task and the `ServerConfig`, so cert updates propagate automatically.
|
||||
- `AcmeTlsAcceptor::bind_acme()` creates a TLS acceptor that uses ACME-provisioned certs and spawns a background tokio task for auto-renewal.
|
||||
- `TlsAcceptor::bind_acme()` also added for users who want to use ACME with the standard `TlsAcceptor` type directly.
|
||||
- The `AcmeConfig` stub in `tls.rs` is retained for backward compat with existing `TlsAcceptor::bind()`.
|
||||
- `acme` feature implies `tls` and adds `rustls-acme` + `futures` dependencies.
|
||||
- TLS-ALPN-01 challenge handling works via the `acme-tls/1` ALPN protocol registered in `ServerConfig` — the resolver dispatches challenge vs regular certs automatically.
|
||||
|
||||
## Summary
|
||||
|
||||
> To be filled on completion
|
||||
Implemented ACME/Let's Encrypt certificate provisioning (ADR-008) behind the `acme` feature flag. `AcmeCertProvider` supports domain-based and IP-based modes using `rustls-acme`. `AcmeTlsAcceptor::bind_acme()` and `TlsAcceptor::bind_acme()` provide ACME-integrated TLS acceptance with automatic certificate renewal via a background tokio task. Unit tests cover config construction, builder patterns, and server config generation. Integration test for LE staging is marked `#[ignore]`.
|
||||
Reference in New Issue
Block a user