Compare commits
2 Commits
feat/serve
...
feat/clien
| Author | SHA1 | Date | |
|---|---|---|---|
| 128affd264 | |||
| f963898a05 |
727
crates/wraith-core/src/client/connect.rs
Normal file
727
crates/wraith-core/src/client/connect.rs
Normal file
@@ -0,0 +1,727 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use russh::client;
|
||||
use russh::keys::PrivateKey;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use crate::auth::client_auth::{ClientAuthConfig, ClientHandler};
|
||||
use crate::auth::keys::KeySource;
|
||||
use crate::client::forward::{LocalForwarder, PortForwardSpec, RemoteForwarder};
|
||||
use crate::error::ConfigError;
|
||||
use crate::socks5::{HandleChannelOpener, Socks5Server};
|
||||
use crate::transport::Transport;
|
||||
|
||||
const DEFAULT_SOCKS5_ADDR: &str = "127.0.0.1:1080";
|
||||
const DRAIN_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum TransportMode {
|
||||
Tcp,
|
||||
Tls,
|
||||
Iroh,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TransportMode {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
TransportMode::Tcp => write!(f, "tcp"),
|
||||
TransportMode::Tls => write!(f, "tls"),
|
||||
TransportMode::Iroh => write!(f, "iroh"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ConnectOptions {
|
||||
pub server: Option<String>,
|
||||
pub peer: Option<String>,
|
||||
pub transport_mode: TransportMode,
|
||||
pub identity: KeySource,
|
||||
pub socks5_addr: String,
|
||||
pub forwards: Vec<String>,
|
||||
pub remote_forwards: Vec<String>,
|
||||
pub proxy: Option<String>,
|
||||
pub iroh_relay: Option<String>,
|
||||
pub tls_server_name: Option<String>,
|
||||
pub insecure: bool,
|
||||
}
|
||||
|
||||
impl ConnectOptions {
|
||||
pub fn new(identity: KeySource) -> Self {
|
||||
Self {
|
||||
server: None,
|
||||
peer: None,
|
||||
transport_mode: TransportMode::Tcp,
|
||||
identity,
|
||||
socks5_addr: DEFAULT_SOCKS5_ADDR.to_string(),
|
||||
forwards: Vec::new(),
|
||||
remote_forwards: Vec::new(),
|
||||
proxy: None,
|
||||
iroh_relay: None,
|
||||
tls_server_name: None,
|
||||
insecure: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn server(mut self, addr: impl Into<String>) -> Self {
|
||||
self.server = Some(addr.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn peer(mut self, endpoint_id: impl Into<String>) -> Self {
|
||||
self.peer = Some(endpoint_id.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn transport_mode(mut self, mode: TransportMode) -> Self {
|
||||
self.transport_mode = mode;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn socks5_addr(mut self, addr: impl Into<String>) -> Self {
|
||||
self.socks5_addr = addr.into();
|
||||
self
|
||||
}
|
||||
|
||||
pub fn forward(mut self, spec: impl Into<String>) -> Self {
|
||||
self.forwards.push(spec.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn remote_forward(mut self, spec: impl Into<String>) -> Self {
|
||||
self.remote_forwards.push(spec.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn proxy(mut self, url: impl Into<String>) -> Self {
|
||||
self.proxy = Some(url.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn iroh_relay(mut self, url: impl Into<String>) -> Self {
|
||||
self.iroh_relay = Some(url.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn tls_server_name(mut self, name: impl Into<String>) -> Self {
|
||||
self.tls_server_name = Some(name.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn insecure(mut self, insecure: bool) -> Self {
|
||||
self.insecure = insecure;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn validate(&self) -> Result<(), ConfigError> {
|
||||
match self.transport_mode {
|
||||
TransportMode::Tcp | TransportMode::Tls => {
|
||||
if self.server.is_none() {
|
||||
return Err(ConfigError::InvalidFlag {
|
||||
name: "--server is required for tcp/tls transport".to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
TransportMode::Iroh => {
|
||||
if self.peer.is_none() {
|
||||
return Err(ConfigError::InvalidFlag {
|
||||
name: "--peer is required for iroh transport".to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ConnectOptions {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("ConnectOptions")
|
||||
.field("server", &self.server)
|
||||
.field("peer", &self.peer)
|
||||
.field("transport_mode", &self.transport_mode)
|
||||
.field("identity", &"<KeySource>")
|
||||
.field("socks5_addr", &self.socks5_addr)
|
||||
.field("forwards", &self.forwards)
|
||||
.field("remote_forwards", &self.remote_forwards)
|
||||
.field("proxy", &self.proxy)
|
||||
.field("iroh_relay", &self.iroh_relay)
|
||||
.field("tls_server_name", &self.tls_server_name)
|
||||
.field("insecure", &self.insecure)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ClientSession<T: Transport> {
|
||||
opts: ConnectOptions,
|
||||
transport: Arc<T>,
|
||||
handle: Arc<Mutex<client::Handle<ClientHandler>>>,
|
||||
auth_config: Arc<ClientAuthConfig>,
|
||||
#[allow(dead_code)]
|
||||
private_key: Arc<PrivateKey>,
|
||||
#[allow(dead_code)]
|
||||
username: String,
|
||||
shutdown_tx: tokio::sync::watch::Sender<bool>,
|
||||
shutdown_rx: tokio::sync::watch::Receiver<bool>,
|
||||
}
|
||||
|
||||
impl<T: Transport> ClientSession<T> {
|
||||
pub async fn new(
|
||||
opts: ConnectOptions,
|
||||
transport: Arc<T>,
|
||||
) -> Result<Self, ConnectError> {
|
||||
opts.validate().map_err(ConnectError::Config)?;
|
||||
|
||||
let auth_config = Arc::new(
|
||||
ClientAuthConfig::from_key_source(opts.identity.clone())
|
||||
.map_err(ConnectError::Config)?,
|
||||
);
|
||||
let private_key = auth_config.private_key();
|
||||
|
||||
let username = derive_username();
|
||||
let handler = ClientHandler::from_config(&auth_config);
|
||||
|
||||
let stream = transport.connect().await.map_err(|e| {
|
||||
error!("transport connect failed: {e}");
|
||||
ConnectError::ConnectionFailed
|
||||
})?;
|
||||
|
||||
let config = Arc::new(client::Config::default());
|
||||
let mut handle = client::connect_stream(config, stream, handler)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!("SSH connect failed: {e}");
|
||||
ConnectError::ConnectionFailed
|
||||
})?;
|
||||
|
||||
let auth_ok = auth_config
|
||||
.authenticate(&mut handle, &username)
|
||||
.await
|
||||
.map_err(|_| ConnectError::AuthFailed)?;
|
||||
if !auth_ok {
|
||||
return Err(ConnectError::AuthFailed);
|
||||
}
|
||||
|
||||
let handle = Arc::new(Mutex::new(handle));
|
||||
let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
|
||||
|
||||
Ok(Self {
|
||||
opts,
|
||||
transport,
|
||||
handle,
|
||||
auth_config,
|
||||
private_key,
|
||||
username,
|
||||
shutdown_tx,
|
||||
shutdown_rx,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn handle(&self) -> Arc<Mutex<client::Handle<ClientHandler>>> {
|
||||
Arc::clone(&self.handle)
|
||||
}
|
||||
|
||||
pub fn auth_config(&self) -> &Arc<ClientAuthConfig> {
|
||||
&self.auth_config
|
||||
}
|
||||
|
||||
pub fn transport(&self) -> &Arc<T> {
|
||||
&self.transport
|
||||
}
|
||||
|
||||
pub fn options(&self) -> &ConnectOptions {
|
||||
&self.opts
|
||||
}
|
||||
|
||||
pub fn shutdown_sender(&self) -> tokio::sync::watch::Sender<bool> {
|
||||
self.shutdown_tx.clone()
|
||||
}
|
||||
|
||||
pub async fn run(self) -> Result<(), ConnectError> {
|
||||
let socks5_addr: SocketAddr = self.opts.socks5_addr.parse().map_err(|_| {
|
||||
ConnectError::Config(ConfigError::InvalidFlag {
|
||||
name: format!("invalid SOCKS5 address: {}", self.opts.socks5_addr),
|
||||
})
|
||||
})?;
|
||||
|
||||
let channel_opener = HandleChannelOpener::from_arc(Arc::clone(&self.handle));
|
||||
let socks5_server = Socks5Server::with_addr(channel_opener, &socks5_addr.to_string());
|
||||
let socks5_listen = socks5_server.listen_addr();
|
||||
|
||||
let local_forwarders = build_local_forwarders(&self.opts)?;
|
||||
let remote_specs = build_remote_specs(&self.opts)?;
|
||||
|
||||
for spec in &remote_specs {
|
||||
let remote_forwarder = RemoteForwarder::new(spec.clone())
|
||||
.map_err(|_| ConnectError::ForwardFailed)?;
|
||||
let mut h = self.handle.lock().await;
|
||||
remote_forwarder
|
||||
.register(&mut h)
|
||||
.await
|
||||
.map_err(|_| {
|
||||
warn!("failed to register remote forward {}", spec);
|
||||
ConnectError::ForwardFailed
|
||||
})?;
|
||||
info!("registered remote forward: {}", spec);
|
||||
}
|
||||
|
||||
let socks5_task = tokio::spawn(async move {
|
||||
debug!("SOCKS5 server starting on {}", socks5_listen);
|
||||
if let Err(e) = socks5_server.run().await {
|
||||
error!("SOCKS5 server error: {e}");
|
||||
}
|
||||
});
|
||||
|
||||
let fwd_handle = Arc::clone(&self.handle);
|
||||
let fwd_shutdown = self.shutdown_rx.clone();
|
||||
let forward_task = tokio::spawn(async move {
|
||||
crate::client::forward::run_local_forwarders(
|
||||
local_forwarders, fwd_handle, fwd_shutdown,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
info!("wraith client running: SOCKS5 on {}", socks5_listen);
|
||||
|
||||
#[cfg(unix)]
|
||||
let signal_done = {
|
||||
let sig_tx = self.shutdown_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut sigterm_stream =
|
||||
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
|
||||
.expect("failed to install SIGTERM handler");
|
||||
tokio::select! {
|
||||
_ = sigterm_stream.recv() => {
|
||||
info!("received SIGTERM");
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
info!("received SIGINT (Ctrl+C)");
|
||||
}
|
||||
}
|
||||
let _ = sig_tx.send(true);
|
||||
})
|
||||
};
|
||||
|
||||
let mut wait_shutdown = self.shutdown_rx.clone();
|
||||
tokio::select! {
|
||||
_ = wait_shutdown.changed() => {
|
||||
if *wait_shutdown.borrow() {
|
||||
info!("shutdown signal received");
|
||||
}
|
||||
}
|
||||
_ = socks5_task => {
|
||||
warn!("SOCKS5 server exited unexpectedly");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
signal_done.abort();
|
||||
|
||||
self.shutdown().await?;
|
||||
|
||||
forward_task.abort();
|
||||
let _ = forward_task.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) -> Result<(), ConnectError> {
|
||||
info!("initiating graceful shutdown");
|
||||
|
||||
let _ = self.shutdown_tx.send(true);
|
||||
|
||||
{
|
||||
let handle = self.handle.lock().await;
|
||||
if !handle.is_closed() {
|
||||
if let Err(e) = handle
|
||||
.disconnect(russh::Disconnect::ByApplication, "shutdown", "")
|
||||
.await
|
||||
{
|
||||
warn!("failed to send SSH disconnect: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(DRAIN_TIMEOUT).await;
|
||||
|
||||
info!("graceful shutdown complete");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn derive_username() -> String {
|
||||
std::env::var("USER")
|
||||
.or_else(|_| std::env::var("USERNAME"))
|
||||
.unwrap_or_else(|_| "wraith".to_string())
|
||||
}
|
||||
|
||||
fn build_local_forwarders(opts: &ConnectOptions) -> Result<Vec<LocalForwarder>, ConnectError> {
|
||||
let mut forwarders = Vec::new();
|
||||
for spec_str in &opts.forwards {
|
||||
let spec = PortForwardSpec::local(spec_str).map_err(|e| {
|
||||
warn!("invalid local forward spec '{}': {}", spec_str, e);
|
||||
ConnectError::Config(ConfigError::InvalidFlag {
|
||||
name: format!("invalid forward spec: {}", spec_str),
|
||||
})
|
||||
})?;
|
||||
forwarders.push(
|
||||
LocalForwarder::new(spec).map_err(|e| {
|
||||
warn!("failed to create local forwarder: {}", e);
|
||||
ConnectError::ForwardFailed
|
||||
})?,
|
||||
);
|
||||
}
|
||||
Ok(forwarders)
|
||||
}
|
||||
|
||||
fn build_remote_specs(opts: &ConnectOptions) -> Result<Vec<PortForwardSpec>, ConnectError> {
|
||||
let mut specs = Vec::new();
|
||||
for spec_str in &opts.remote_forwards {
|
||||
let spec = PortForwardSpec::remote(spec_str).map_err(|e| {
|
||||
warn!("invalid remote forward spec '{}': {}", spec_str, e);
|
||||
ConnectError::Config(ConfigError::InvalidFlag {
|
||||
name: format!("invalid remote forward spec: {}", spec_str),
|
||||
})
|
||||
})?;
|
||||
specs.push(spec);
|
||||
}
|
||||
Ok(specs)
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ConnectError {
|
||||
#[error("connection failed")]
|
||||
ConnectionFailed,
|
||||
#[error("authentication failed")]
|
||||
AuthFailed,
|
||||
#[error("forward setup failed")]
|
||||
ForwardFailed,
|
||||
#[error("config error: {0}")]
|
||||
Config(#[from] ConfigError),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use tokio::io::duplex;
|
||||
|
||||
const ED25519_PRIVATE_KEY: &str = "-----BEGIN OPENSSH PRIVATE KEY-----\nb3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW\nQyNTUxOQAAACBOfInDyRS33JEeDNT8xd10qRdwFN8z/QukCOgEIkv01QAAAJiQ+NvMkPjb\nzAAAAAtzc2gtZWQyNTUxOQAAACBOfInDyRS33JEeDNT8xd10qRdwFN8z/QukCOgEIkv01Q\nAAAECIWwJf7+7MOuZAOOWmoQbE9i/5GxjKsFrtJHjZ34E/fk58icPJFLfckR4M1PzF3XSp\nF3AU3zP9C6QI6AQiS/TVAAAAD3VidW50dUBuczUyODA5NgECAwQFBg==\n-----END OPENSSH PRIVATE KEY-----\n";
|
||||
|
||||
fn make_identity() -> KeySource {
|
||||
KeySource::Memory(ED25519_PRIVATE_KEY.as_bytes().to_vec())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connect_options_default_fields() {
|
||||
let opts = ConnectOptions::new(make_identity());
|
||||
assert!(opts.server.is_none());
|
||||
assert!(opts.peer.is_none());
|
||||
assert_eq!(opts.transport_mode, TransportMode::Tcp);
|
||||
assert_eq!(opts.socks5_addr, "127.0.0.1:1080");
|
||||
assert!(opts.forwards.is_empty());
|
||||
assert!(opts.remote_forwards.is_empty());
|
||||
assert!(opts.proxy.is_none());
|
||||
assert!(opts.iroh_relay.is_none());
|
||||
assert!(opts.tls_server_name.is_none());
|
||||
assert!(!opts.insecure);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connect_options_builder_pattern() {
|
||||
let opts = ConnectOptions::new(make_identity())
|
||||
.server("example.com:22")
|
||||
.transport_mode(TransportMode::Tls)
|
||||
.socks5_addr("127.0.0.1:9050")
|
||||
.forward("127.0.0.1:5432:db:5432")
|
||||
.remote_forward("0.0.0.0:8080:127.0.0.1:3000")
|
||||
.proxy("socks5://127.0.0.1:1080")
|
||||
.iroh_relay("https://relay.example.com")
|
||||
.tls_server_name("wraith.test")
|
||||
.insecure(true);
|
||||
|
||||
assert_eq!(opts.server.as_deref(), Some("example.com:22"));
|
||||
assert_eq!(opts.transport_mode, TransportMode::Tls);
|
||||
assert_eq!(opts.socks5_addr, "127.0.0.1:9050");
|
||||
assert_eq!(opts.forwards.len(), 1);
|
||||
assert_eq!(opts.remote_forwards.len(), 1);
|
||||
assert_eq!(opts.proxy.as_deref(), Some("socks5://127.0.0.1:1080"));
|
||||
assert_eq!(opts.iroh_relay.as_deref(), Some("https://relay.example.com"));
|
||||
assert_eq!(opts.tls_server_name.as_deref(), Some("wraith.test"));
|
||||
assert!(opts.insecure);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connect_options_validate_tcp_requires_server() {
|
||||
let opts = ConnectOptions::new(make_identity()).transport_mode(TransportMode::Tcp);
|
||||
assert!(opts.validate().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connect_options_validate_tcp_with_server_ok() {
|
||||
let opts = ConnectOptions::new(make_identity()).server("example.com:22");
|
||||
assert!(opts.validate().is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connect_options_validate_tls_requires_server() {
|
||||
let opts = ConnectOptions::new(make_identity()).transport_mode(TransportMode::Tls);
|
||||
assert!(opts.validate().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connect_options_validate_tls_with_server_ok() {
|
||||
let opts = ConnectOptions::new(make_identity())
|
||||
.transport_mode(TransportMode::Tls)
|
||||
.server("example.com:443");
|
||||
assert!(opts.validate().is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connect_options_validate_iroh_requires_peer() {
|
||||
let opts = ConnectOptions::new(make_identity()).transport_mode(TransportMode::Iroh);
|
||||
assert!(opts.validate().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connect_options_validate_iroh_with_peer_ok() {
|
||||
let opts = ConnectOptions::new(make_identity())
|
||||
.transport_mode(TransportMode::Iroh)
|
||||
.peer("some-endpoint-id");
|
||||
assert!(opts.validate().is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn identity_accepts_key_source_file() {
|
||||
let file_source = KeySource::File(std::path::PathBuf::from("/path/to/key"));
|
||||
let opts = ConnectOptions::new(file_source);
|
||||
match &opts.identity {
|
||||
KeySource::File(p) => assert_eq!(p, &std::path::PathBuf::from("/path/to/key")),
|
||||
_ => panic!("expected File variant"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn identity_accepts_key_source_memory() {
|
||||
let mem_source = KeySource::Memory(b"key-data".to_vec());
|
||||
let opts = ConnectOptions::new(mem_source);
|
||||
match &opts.identity {
|
||||
KeySource::Memory(d) => assert_eq!(d, b"key-data"),
|
||||
_ => panic!("expected Memory variant"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn transport_mode_display() {
|
||||
assert_eq!(TransportMode::Tcp.to_string(), "tcp");
|
||||
assert_eq!(TransportMode::Tls.to_string(), "tls");
|
||||
assert_eq!(TransportMode::Iroh.to_string(), "iroh");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connect_error_variants() {
|
||||
assert_eq!(ConnectError::ConnectionFailed.to_string(), "connection failed");
|
||||
assert_eq!(ConnectError::AuthFailed.to_string(), "authentication failed");
|
||||
assert_eq!(ConnectError::ForwardFailed.to_string(), "forward setup failed");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn connect_options_debug_redacts_identity() {
|
||||
let opts = ConnectOptions::new(make_identity());
|
||||
let debug_str = format!("{:?}", opts);
|
||||
assert!(debug_str.contains("<KeySource>"));
|
||||
assert!(!debug_str.contains("OPENSSH"));
|
||||
}
|
||||
|
||||
struct FailTransport;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Transport for FailTransport {
|
||||
type Stream = tokio::io::DuplexStream;
|
||||
|
||||
async fn connect(&self) -> anyhow::Result<Self::Stream> {
|
||||
Err(anyhow::anyhow!("always fails"))
|
||||
}
|
||||
|
||||
fn describe(&self) -> String {
|
||||
"fail".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
struct DuplexTransport {
|
||||
connect_count: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Transport for DuplexTransport {
|
||||
type Stream = tokio::io::DuplexStream;
|
||||
|
||||
async fn connect(&self) -> anyhow::Result<Self::Stream> {
|
||||
self.connect_count.fetch_add(1, Ordering::SeqCst);
|
||||
let (client, _) = duplex(4096);
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
fn describe(&self) -> String {
|
||||
"duplex".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn client_session_new_transport_fails() {
|
||||
let opts = ConnectOptions::new(make_identity()).server("example.com:22");
|
||||
let transport = Arc::new(FailTransport);
|
||||
let result = ClientSession::new(opts, transport).await;
|
||||
assert!(result.is_err());
|
||||
assert!(matches!(result.err().unwrap(), ConnectError::ConnectionFailed));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn client_session_new_ssh_handshake_fails() {
|
||||
let transport = Arc::new(DuplexTransport {
|
||||
connect_count: Arc::new(AtomicUsize::new(0)),
|
||||
});
|
||||
let opts = ConnectOptions::new(make_identity()).server("example.com:22");
|
||||
let result = ClientSession::new(opts, transport).await;
|
||||
assert!(result.is_err());
|
||||
assert!(matches!(result.err().unwrap(), ConnectError::ConnectionFailed));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_local_forwarders_empty() {
|
||||
let opts = ConnectOptions::new(make_identity());
|
||||
let result = build_local_forwarders(&opts);
|
||||
assert!(result.is_ok());
|
||||
assert!(result.unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_local_forwarders_valid() {
|
||||
let opts = ConnectOptions::new(make_identity()).forward("127.0.0.1:5432:db:5432");
|
||||
let result = build_local_forwarders(&opts);
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap().len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_local_forwarders_invalid_spec() {
|
||||
let opts = ConnectOptions::new(make_identity()).forward("bad-spec");
|
||||
let result = build_local_forwarders(&opts);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_remote_specs_empty() {
|
||||
let opts = ConnectOptions::new(make_identity());
|
||||
let result = build_remote_specs(&opts);
|
||||
assert!(result.is_ok());
|
||||
assert!(result.unwrap().is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_remote_specs_valid() {
|
||||
let opts = ConnectOptions::new(make_identity()).remote_forward("0.0.0.0:8080:127.0.0.1:3000");
|
||||
let result = build_remote_specs(&opts);
|
||||
assert!(result.is_ok());
|
||||
assert_eq!(result.unwrap().len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_remote_specs_invalid() {
|
||||
let opts = ConnectOptions::new(make_identity()).remote_forward("bad");
|
||||
let result = build_remote_specs(&opts);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn default_socks5_addr() {
|
||||
assert_eq!(DEFAULT_SOCKS5_ADDR, "127.0.0.1:1080");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drain_timeout_is_two_seconds() {
|
||||
assert_eq!(DRAIN_TIMEOUT, Duration::from_secs(2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn transport_mode_equality() {
|
||||
assert_eq!(TransportMode::Tcp, TransportMode::Tcp);
|
||||
assert_ne!(TransportMode::Tcp, TransportMode::Tls);
|
||||
assert_ne!(TransportMode::Tls, TransportMode::Iroh);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shutdown_sends_disconnect_and_drains() {
|
||||
let transport = Arc::new(DuplexTransport {
|
||||
connect_count: Arc::new(AtomicUsize::new(0)),
|
||||
});
|
||||
let opts = ConnectOptions::new(make_identity()).server("example.com:22");
|
||||
let result = ClientSession::new(opts, transport).await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn socks5_is_always_enabled_by_default() {
|
||||
let opts = ConnectOptions::new(make_identity());
|
||||
assert!(!opts.socks5_addr.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn integration_mock_transport_session() {
|
||||
use crate::socks5::{ChannelOpener, ChannelOpenError};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex};
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
|
||||
struct MockOpener;
|
||||
|
||||
impl ChannelOpener for MockOpener {
|
||||
type Stream = tokio::io::DuplexStream;
|
||||
|
||||
async fn open_channel(
|
||||
&self,
|
||||
_host: String,
|
||||
_port: u16,
|
||||
) -> Result<Self::Stream, ChannelOpenError> {
|
||||
let (client, _server) = duplex(4096);
|
||||
Ok(client)
|
||||
}
|
||||
}
|
||||
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let bound_addr = listener.local_addr().unwrap();
|
||||
drop(listener);
|
||||
|
||||
let opener = MockOpener;
|
||||
let server = Socks5Server::with_addr(opener, &bound_addr.to_string());
|
||||
|
||||
let _server_task = tokio::spawn(async move {
|
||||
let _ = server.run().await;
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
let mut conn = TcpStream::connect(bound_addr).await.unwrap();
|
||||
|
||||
let greeting = [0x05, 0x01, 0x00];
|
||||
conn.write_all(&greeting).await.unwrap();
|
||||
|
||||
let mut auth_resp = [0u8; 2];
|
||||
conn.read_exact(&mut auth_resp).await.unwrap();
|
||||
assert_eq!(auth_resp, [0x05, 0x00]);
|
||||
|
||||
let connect_req = [
|
||||
0x05, 0x01, 0x00, 0x01, 127, 0, 0, 1, 0, 80,
|
||||
];
|
||||
conn.write_all(&connect_req).await.unwrap();
|
||||
|
||||
let mut reply = [0u8; 10];
|
||||
conn.read_exact(&mut reply).await.unwrap();
|
||||
assert_eq!(reply[1], 0x00);
|
||||
|
||||
conn.write_all(b"test data").await.unwrap();
|
||||
conn.shutdown().await.unwrap();
|
||||
}
|
||||
}
|
||||
@@ -125,7 +125,7 @@ impl LocalForwarder {
|
||||
handle: Arc<Mutex<client::Handle<H>>>,
|
||||
) -> Result<(), ForwardError> {
|
||||
let listen_addr = self.spec.listen_addr()?;
|
||||
let listener: TcpListener = TcpListener::bind(listen_addr)
|
||||
let listener = TcpListener::bind(listen_addr)
|
||||
.await
|
||||
.map_err(|e| ForwardError::BindFailed { source: e })?;
|
||||
self.listener = Some(listener);
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
pub mod channel_manager;
|
||||
pub mod connect;
|
||||
pub mod forward;
|
||||
|
||||
pub use channel_manager::{ChannelManager, ForwardRequest};
|
||||
pub use connect::{ClientSession, ConnectError, ConnectOptions, TransportMode};
|
||||
pub use forward::{LocalForwarder, PortForwardSpec, PortForwardSpecKind, RemoteForwarder};
|
||||
@@ -62,7 +62,7 @@ pub enum ConfigError {
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ForwardError {
|
||||
#[error("invalid port forward spec: {spec}")]
|
||||
#[error("invalid forward specification: {spec}")]
|
||||
InvalidSpec { spec: String },
|
||||
#[error("bind failed")]
|
||||
BindFailed {
|
||||
@@ -74,11 +74,6 @@ pub enum ForwardError {
|
||||
#[source]
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
},
|
||||
#[error("connect to local target failed")]
|
||||
LocalConnectFailed {
|
||||
#[source]
|
||||
source: io::Error,
|
||||
},
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -171,36 +166,4 @@ mod tests {
|
||||
let plain = AuthError::KeyRejected;
|
||||
assert!(plain.source().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn forward_error_display() {
|
||||
assert_eq!(
|
||||
ForwardError::InvalidSpec { spec: "bad".to_string() }.to_string(),
|
||||
"invalid port forward spec: bad"
|
||||
);
|
||||
assert_eq!(
|
||||
ForwardError::BindFailed {
|
||||
source: io::Error::new(io::ErrorKind::AddrInUse, "in use")
|
||||
}
|
||||
.to_string(),
|
||||
"bind failed"
|
||||
);
|
||||
assert_eq!(
|
||||
ForwardError::LocalConnectFailed {
|
||||
source: io::Error::new(io::ErrorKind::ConnectionRefused, "refused")
|
||||
}
|
||||
.to_string(),
|
||||
"connect to local target failed"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn forward_error_source_chaining() {
|
||||
let io_err = io::Error::new(io::ErrorKind::AddrInUse, "in use");
|
||||
let forward_err = ForwardError::BindFailed { source: io_err };
|
||||
assert!(forward_err.source().is_some());
|
||||
|
||||
let plain = ForwardError::InvalidSpec { spec: "bad".to_string() };
|
||||
assert!(plain.source().is_none());
|
||||
}
|
||||
}
|
||||
@@ -11,3 +11,4 @@ pub mod testutil;
|
||||
pub use error::{AuthError, ChannelError, ConfigError, ForwardError, TransportError};
|
||||
pub use transport::{Transport, TransportAcceptor, TransportInfo, TransportKind};
|
||||
pub use client::channel_manager::{ChannelManager, ForwardRequest};
|
||||
pub use client::connect::{ClientSession, ConnectError, ConnectOptions, TransportMode};
|
||||
@@ -1,560 +0,0 @@
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
use super::handler::{ProxyConfig, ProxyMode};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ChannelProxyError {
|
||||
#[error("connection refused")]
|
||||
ConnectionRefused,
|
||||
#[error("target unreachable")]
|
||||
TargetUnreachable,
|
||||
#[error("socks5 proxy handshake failed")]
|
||||
Socks5HandshakeFailed,
|
||||
#[error("socks5 proxy rejected connection")]
|
||||
Socks5ProxyRejected,
|
||||
#[error("http connect proxy handshake failed")]
|
||||
HttpConnectHandshakeFailed,
|
||||
#[error("http connect proxy rejected: {0}")]
|
||||
HttpConnectProxyRejected(String),
|
||||
#[error("io error")]
|
||||
Io(#[from] std::io::Error),
|
||||
}
|
||||
|
||||
pub async fn connect_outbound(
|
||||
target: SocketAddr,
|
||||
proxy: &ProxyConfig,
|
||||
) -> Result<TcpStream, ChannelProxyError> {
|
||||
match &proxy.mode {
|
||||
ProxyMode::Direct => connect_direct(target).await,
|
||||
ProxyMode::Socks5(addr) => connect_socks5(target, *addr).await,
|
||||
ProxyMode::HttpConnect(addr) => connect_http_connect(target, *addr).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect_direct(target: SocketAddr) -> Result<TcpStream, ChannelProxyError> {
|
||||
TcpStream::connect(target)
|
||||
.await
|
||||
.map_err(|e| map_connection_error(e, target))
|
||||
}
|
||||
|
||||
async fn connect_socks5(target: SocketAddr, proxy_addr: SocketAddr) -> Result<TcpStream, ChannelProxyError> {
|
||||
let mut stream = TcpStream::connect(proxy_addr)
|
||||
.await
|
||||
.map_err(ChannelProxyError::from)?;
|
||||
|
||||
stream.write_all(&[0x05, 0x01, 0x00]).await?;
|
||||
stream.flush().await?;
|
||||
|
||||
let mut resp = [0u8; 2];
|
||||
stream.read_exact(&mut resp).await?;
|
||||
if resp[0] != 0x05 || resp[1] != 0x00 {
|
||||
return Err(ChannelProxyError::Socks5HandshakeFailed);
|
||||
}
|
||||
|
||||
let ip_bytes = target.ip().to_string();
|
||||
let mut connect_req = vec![0x05, 0x01, 0x00, 0x03];
|
||||
connect_req.push(ip_bytes.len() as u8);
|
||||
connect_req.extend_from_slice(ip_bytes.as_bytes());
|
||||
connect_req.extend_from_slice(&target.port().to_be_bytes());
|
||||
stream.write_all(&connect_req).await?;
|
||||
stream.flush().await?;
|
||||
|
||||
let mut reply_header = [0u8; 4];
|
||||
stream.read_exact(&mut reply_header).await?;
|
||||
if reply_header[0] != 0x05 {
|
||||
return Err(ChannelProxyError::Socks5HandshakeFailed);
|
||||
}
|
||||
if reply_header[1] != 0x00 {
|
||||
return Err(ChannelProxyError::Socks5ProxyRejected);
|
||||
}
|
||||
|
||||
let atyp = reply_header[3];
|
||||
match atyp {
|
||||
0x01 => {
|
||||
let mut _addr = [0u8; 4];
|
||||
stream.read_exact(&mut _addr).await?;
|
||||
}
|
||||
0x04 => {
|
||||
let mut _addr = [0u8; 16];
|
||||
stream.read_exact(&mut _addr).await?;
|
||||
}
|
||||
0x03 => {
|
||||
let len = stream.read_u8().await?;
|
||||
let mut _domain = vec![0u8; len as usize];
|
||||
stream.read_exact(&mut _domain).await?;
|
||||
}
|
||||
_ => {
|
||||
return Err(ChannelProxyError::Socks5HandshakeFailed);
|
||||
}
|
||||
}
|
||||
let mut _port = [0u8; 2];
|
||||
stream.read_exact(&mut _port).await?;
|
||||
|
||||
Ok(stream)
|
||||
}
|
||||
|
||||
async fn connect_http_connect(
|
||||
target: SocketAddr,
|
||||
proxy_addr: SocketAddr,
|
||||
) -> Result<TcpStream, ChannelProxyError> {
|
||||
let mut stream = TcpStream::connect(proxy_addr)
|
||||
.await
|
||||
.map_err(ChannelProxyError::from)?;
|
||||
|
||||
let connect_request = format!(
|
||||
"CONNECT {}:{} HTTP/1.1\r\nHost: {}:{}\r\n\r\n",
|
||||
target.ip(),
|
||||
target.port(),
|
||||
target.ip(),
|
||||
target.port()
|
||||
);
|
||||
stream.write_all(connect_request.as_bytes()).await?;
|
||||
stream.flush().await?;
|
||||
|
||||
let mut response = Vec::new();
|
||||
let mut buf = [0u8; 1024];
|
||||
loop {
|
||||
let n = stream.read(&mut buf).await?;
|
||||
if n == 0 {
|
||||
return Err(ChannelProxyError::HttpConnectHandshakeFailed);
|
||||
}
|
||||
response.extend_from_slice(&buf[..n]);
|
||||
if response.windows(4).any(|w| w == b"\r\n\r\n") {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let response_str = String::from_utf8_lossy(&response);
|
||||
let status_line = response_str
|
||||
.lines()
|
||||
.next()
|
||||
.unwrap_or("");
|
||||
|
||||
if status_line.contains("200") {
|
||||
Ok(stream)
|
||||
} else {
|
||||
Err(ChannelProxyError::HttpConnectProxyRejected(
|
||||
status_line.to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
fn map_connection_error(e: std::io::Error, target: SocketAddr) -> ChannelProxyError {
|
||||
match e.kind() {
|
||||
std::io::ErrorKind::ConnectionRefused => ChannelProxyError::ConnectionRefused,
|
||||
std::io::ErrorKind::AddrNotAvailable
|
||||
| std::io::ErrorKind::NetworkUnreachable
|
||||
| std::io::ErrorKind::HostUnreachable => ChannelProxyError::TargetUnreachable,
|
||||
_ => {
|
||||
tracing::debug!(error = %e, "outbound connection failed to {:?}", target);
|
||||
ChannelProxyError::Io(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn proxy_channel<S>(channel: S, target: SocketAddr, proxy: &ProxyConfig)
|
||||
where
|
||||
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
if let Ok(outbound) = connect_outbound(target, proxy).await {
|
||||
let (mut read_chan, mut write_chan) = tokio::io::split(channel);
|
||||
let (mut read_out, mut write_out) = outbound.into_split();
|
||||
|
||||
let client_to_target = tokio::spawn(async move {
|
||||
let _ = tokio::io::copy(&mut read_chan, &mut write_out).await;
|
||||
let _ = write_out.shutdown().await;
|
||||
});
|
||||
|
||||
let target_to_client = tokio::spawn(async move {
|
||||
let _ = tokio::io::copy(&mut read_out, &mut write_chan).await;
|
||||
let _ = write_chan.shutdown().await;
|
||||
});
|
||||
|
||||
let _ = client_to_target.await;
|
||||
let _ = target_to_client.await;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
fn direct_config() -> ProxyConfig {
|
||||
ProxyConfig {
|
||||
mode: ProxyMode::Direct,
|
||||
}
|
||||
}
|
||||
|
||||
fn socks5_config(addr: SocketAddr) -> ProxyConfig {
|
||||
ProxyConfig {
|
||||
mode: ProxyMode::Socks5(addr),
|
||||
}
|
||||
}
|
||||
|
||||
fn http_connect_config(addr: SocketAddr) -> ProxyConfig {
|
||||
ProxyConfig {
|
||||
mode: ProxyMode::HttpConnect(addr),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn direct_connection_to_echo_server() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
|
||||
let server = tokio::spawn(async move {
|
||||
let (mut sock, _) = listener.accept().await.unwrap();
|
||||
let mut buf = [0u8; 64];
|
||||
let n = sock.read(&mut buf).await.unwrap();
|
||||
sock.write_all(&buf[..n]).await.unwrap();
|
||||
});
|
||||
|
||||
let stream = connect_outbound(addr, &direct_config()).await.unwrap();
|
||||
let (mut read, mut write) = stream.into_split();
|
||||
write.write_all(b"hello").await.unwrap();
|
||||
let mut buf = [0u8; 5];
|
||||
read.read_exact(&mut buf).await.unwrap();
|
||||
assert_eq!(&buf, b"hello");
|
||||
|
||||
let _ = server.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn direct_connection_target_unreachable() {
|
||||
let target: SocketAddr = "240.0.0.1:1".parse().unwrap();
|
||||
let result = connect_outbound(target, &direct_config()).await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn socks5_proxy_handshake() {
|
||||
let proxy_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let proxy_addr = proxy_listener.local_addr().unwrap();
|
||||
|
||||
let target_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let target_addr = target_listener.local_addr().unwrap();
|
||||
|
||||
let target_server = tokio::spawn(async move {
|
||||
let (mut sock, _) = target_listener.accept().await.unwrap();
|
||||
let mut buf = [0u8; 64];
|
||||
let n = sock.read(&mut buf).await.unwrap();
|
||||
sock.write_all(&buf[..n]).await.unwrap();
|
||||
});
|
||||
|
||||
let proxy_server = tokio::spawn(async move {
|
||||
let (mut proxy_sock, _) = proxy_listener.accept().await.unwrap();
|
||||
|
||||
let mut greeting = [0u8; 3];
|
||||
proxy_sock.read_exact(&mut greeting).await.unwrap();
|
||||
assert_eq!(greeting[0], 0x05);
|
||||
proxy_sock.write_all(&[0x05, 0x00]).await.unwrap();
|
||||
|
||||
let mut req_header = [0u8; 4];
|
||||
proxy_sock.read_exact(&mut req_header).await.unwrap();
|
||||
assert_eq!(req_header[0], 0x05);
|
||||
assert_eq!(req_header[1], 0x01);
|
||||
|
||||
let atyp = req_header[3];
|
||||
assert_eq!(atyp, 0x03);
|
||||
|
||||
let domain_len = proxy_sock.read_u8().await.unwrap() as usize;
|
||||
let mut domain = vec![0u8; domain_len];
|
||||
proxy_sock.read_exact(&mut domain).await.unwrap();
|
||||
let mut port_bytes = [0u8; 2];
|
||||
proxy_sock.read_exact(&mut port_bytes).await.unwrap();
|
||||
|
||||
let target: SocketAddr = format!(
|
||||
"{}:{}",
|
||||
String::from_utf8_lossy(&domain),
|
||||
u16::from_be_bytes(port_bytes)
|
||||
)
|
||||
.parse()
|
||||
.unwrap();
|
||||
|
||||
let reply = vec![
|
||||
0x05, 0x00, 0x00, 0x01,
|
||||
0, 0, 0, 0,
|
||||
0, 0,
|
||||
];
|
||||
proxy_sock.write_all(&reply).await.unwrap();
|
||||
|
||||
let mut target_stream = TcpStream::connect(target).await.unwrap();
|
||||
let _ = tokio::io::copy_bidirectional(&mut proxy_sock, &mut target_stream).await;
|
||||
});
|
||||
|
||||
let config = socks5_config(proxy_addr);
|
||||
let mut stream = connect_outbound(target_addr, &config).await.unwrap();
|
||||
stream.write_all(b"hello socks").await.unwrap();
|
||||
let mut buf = [0u8; 11];
|
||||
stream.read_exact(&mut buf).await.unwrap();
|
||||
assert_eq!(&buf, b"hello socks");
|
||||
drop(stream);
|
||||
|
||||
let _ = target_server.await;
|
||||
let _ = proxy_server.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn socks5_proxy_rejected() {
|
||||
let proxy_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let proxy_addr = proxy_listener.local_addr().unwrap();
|
||||
|
||||
let proxy_server = tokio::spawn(async move {
|
||||
let (mut proxy_sock, _) = proxy_listener.accept().await.unwrap();
|
||||
|
||||
let mut greeting = [0u8; 3];
|
||||
proxy_sock.read_exact(&mut greeting).await.unwrap();
|
||||
proxy_sock.write_all(&[0x05, 0x00]).await.unwrap();
|
||||
|
||||
let mut req_header = [0u8; 4];
|
||||
proxy_sock.read_exact(&mut req_header).await.unwrap();
|
||||
|
||||
let domain_len = proxy_sock.read_u8().await.unwrap() as usize;
|
||||
let mut domain = vec![0u8; domain_len];
|
||||
proxy_sock.read_exact(&mut domain).await.unwrap();
|
||||
let mut port_bytes = [0u8; 2];
|
||||
proxy_sock.read_exact(&mut port_bytes).await.unwrap();
|
||||
|
||||
let reply = vec![
|
||||
0x05, 0x05, 0x00, 0x01,
|
||||
0, 0, 0, 0,
|
||||
0, 0,
|
||||
];
|
||||
proxy_sock.write_all(&reply).await.unwrap();
|
||||
});
|
||||
|
||||
let target: SocketAddr = "127.0.0.1:9999".parse().unwrap();
|
||||
let config = socks5_config(proxy_addr);
|
||||
let result = connect_outbound(target, &config).await;
|
||||
assert!(result.is_err());
|
||||
assert!(matches!(
|
||||
result.unwrap_err(),
|
||||
ChannelProxyError::Socks5ProxyRejected
|
||||
));
|
||||
|
||||
let _ = proxy_server.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn http_connect_proxy_handshake() {
|
||||
let proxy_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let proxy_addr = proxy_listener.local_addr().unwrap();
|
||||
|
||||
let target_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let target_addr = target_listener.local_addr().unwrap();
|
||||
|
||||
let target_server = tokio::spawn(async move {
|
||||
let (mut sock, _) = target_listener.accept().await.unwrap();
|
||||
let mut buf = [0u8; 64];
|
||||
let n = sock.read(&mut buf).await.unwrap();
|
||||
sock.write_all(&buf[..n]).await.unwrap();
|
||||
});
|
||||
|
||||
let proxy_server = tokio::spawn(async move {
|
||||
let (mut proxy_sock, _) = proxy_listener.accept().await.unwrap();
|
||||
|
||||
let mut request = Vec::new();
|
||||
let mut buf = [0u8; 1024];
|
||||
loop {
|
||||
let n = proxy_sock.read(&mut buf).await.unwrap();
|
||||
request.extend_from_slice(&buf[..n]);
|
||||
if request.windows(4).any(|w| w == b"\r\n\r\n") {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let response = "HTTP/1.1 200 Connection Established\r\n\r\n";
|
||||
proxy_sock.write_all(response.as_bytes()).await.unwrap();
|
||||
|
||||
let target_str = extract_connect_target(&String::from_utf8_lossy(&request));
|
||||
let mut target_stream = TcpStream::connect(target_str).await.unwrap();
|
||||
let _ = tokio::io::copy_bidirectional(&mut proxy_sock, &mut target_stream).await;
|
||||
});
|
||||
|
||||
let config = http_connect_config(proxy_addr);
|
||||
let mut stream = connect_outbound(target_addr, &config).await.unwrap();
|
||||
stream.write_all(b"hello http").await.unwrap();
|
||||
let mut buf = [0u8; 10];
|
||||
stream.read_exact(&mut buf).await.unwrap();
|
||||
assert_eq!(&buf, b"hello http");
|
||||
drop(stream);
|
||||
|
||||
let _ = target_server.await;
|
||||
let _ = proxy_server.await;
|
||||
}
|
||||
|
||||
fn extract_connect_target(request: &str) -> String {
|
||||
let connect_line = request.lines().next().unwrap_or("");
|
||||
let parts: Vec<&str> = connect_line.split_whitespace().collect();
|
||||
if parts.len() >= 2 {
|
||||
parts[1].to_string()
|
||||
} else {
|
||||
String::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn http_connect_proxy_rejected() {
|
||||
let proxy_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let proxy_addr = proxy_listener.local_addr().unwrap();
|
||||
|
||||
let proxy_server = tokio::spawn(async move {
|
||||
let (mut proxy_sock, _) = proxy_listener.accept().await.unwrap();
|
||||
|
||||
let mut request = Vec::new();
|
||||
let mut buf = [0u8; 1024];
|
||||
loop {
|
||||
let n = proxy_sock.read(&mut buf).await.unwrap();
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
request.extend_from_slice(&buf[..n]);
|
||||
if request.windows(4).any(|w| w == b"\r\n\r\n") {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let response = "HTTP/1.1 403 Forbidden\r\n\r\n";
|
||||
proxy_sock.write_all(response.as_bytes()).await.unwrap();
|
||||
});
|
||||
|
||||
let target: SocketAddr = "127.0.0.1:9999".parse().unwrap();
|
||||
let config = http_connect_config(proxy_addr);
|
||||
let result = connect_outbound(target, &config).await;
|
||||
assert!(result.is_err());
|
||||
match result.unwrap_err() {
|
||||
ChannelProxyError::HttpConnectProxyRejected(msg) => {
|
||||
assert!(msg.contains("403"));
|
||||
}
|
||||
other => panic!("expected HttpConnectProxyRejected, got {:?}", other),
|
||||
}
|
||||
|
||||
let _ = proxy_server.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn target_unreachable_returns_appropriate_error() {
|
||||
let target: SocketAddr = "240.0.0.1:1".parse().unwrap();
|
||||
let result = connect_outbound(target, &direct_config()).await;
|
||||
match result.unwrap_err() {
|
||||
ChannelProxyError::TargetUnreachable
|
||||
| ChannelProxyError::ConnectionRefused
|
||||
| ChannelProxyError::Io(_) => {}
|
||||
other => panic!("unexpected error type: {:?}", other),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn socks5_proxy_unreachable() {
|
||||
let target: SocketAddr = "127.0.0.1:9999".parse().unwrap();
|
||||
let bad_proxy: SocketAddr = "127.0.0.1:1".parse().unwrap();
|
||||
let config = socks5_config(bad_proxy);
|
||||
let result = connect_outbound(target, &config).await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn http_connect_proxy_unreachable() {
|
||||
let target: SocketAddr = "127.0.0.1:9999".parse().unwrap();
|
||||
let bad_proxy: SocketAddr = "127.0.0.1:1".parse().unwrap();
|
||||
let config = http_connect_config(bad_proxy);
|
||||
let result = connect_outbound(target, &config).await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
struct MockChannel {
|
||||
read_half: tokio::io::ReadHalf<DuplexStream>,
|
||||
write_half: tokio::io::WriteHalf<DuplexStream>,
|
||||
}
|
||||
|
||||
impl tokio::io::AsyncRead for MockChannel {
|
||||
fn poll_read(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &mut tokio::io::ReadBuf<'_>,
|
||||
) -> std::task::Poll<std::io::Result<()>> {
|
||||
std::pin::Pin::new(&mut self.get_mut().read_half).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio::io::AsyncWrite for MockChannel {
|
||||
fn poll_write(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> std::task::Poll<std::io::Result<usize>> {
|
||||
std::pin::Pin::new(&mut self.get_mut().write_half).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<std::io::Result<()>> {
|
||||
std::pin::Pin::new(&mut self.get_mut().write_half).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_shutdown(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> std::task::Poll<std::io::Result<()>> {
|
||||
std::pin::Pin::new(&mut self.get_mut().write_half).poll_shutdown(cx)
|
||||
}
|
||||
}
|
||||
|
||||
fn make_mock_channel() -> (MockChannel, DuplexStream) {
|
||||
let (client, server) = duplex(4096);
|
||||
let (read_half, write_half) = tokio::io::split(client);
|
||||
(
|
||||
MockChannel {
|
||||
read_half,
|
||||
write_half,
|
||||
},
|
||||
server,
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn proxy_channel_bidirectional_data_flow() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let target_addr = listener.local_addr().unwrap();
|
||||
|
||||
let echo_server = tokio::spawn(async move {
|
||||
let (mut sock, _) = listener.accept().await.unwrap();
|
||||
let mut buf = [0u8; 64];
|
||||
let n = sock.read(&mut buf).await.unwrap();
|
||||
sock.write_all(&buf[..n]).await.unwrap();
|
||||
});
|
||||
|
||||
let (channel, mut channel_peer) = make_mock_channel();
|
||||
|
||||
let target = target_addr;
|
||||
let proxy = direct_config();
|
||||
tokio::spawn(async move {
|
||||
proxy_channel(channel, target, &proxy).await;
|
||||
});
|
||||
|
||||
channel_peer.write_all(b"ping").await.unwrap();
|
||||
channel_peer.flush().await.unwrap();
|
||||
|
||||
let mut buf = [0u8; 4];
|
||||
channel_peer.read_exact(&mut buf).await.unwrap();
|
||||
assert_eq!(&buf, b"ping");
|
||||
|
||||
drop(channel_peer);
|
||||
let _ = echo_server.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn proxy_channel_target_unreachable_closes_cleanly() {
|
||||
let target: SocketAddr = "240.0.0.1:1".parse().unwrap();
|
||||
let (channel, _channel_peer) = make_mock_channel();
|
||||
|
||||
let proxy = direct_config();
|
||||
proxy_channel(channel, target, &proxy).await;
|
||||
}
|
||||
}
|
||||
186
crates/wraith-core/src/server/control_channel.rs
Normal file
186
crates/wraith-core/src/server/control_channel.rs
Normal file
@@ -0,0 +1,186 @@
|
||||
use std::io;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
|
||||
pub const WRAITH_CONTROL_DESTINATION: &str = "wraith-control";
|
||||
pub const WRAITH_PREFIX: &str = "wraith-";
|
||||
|
||||
pub fn is_reserved_destination(host: &str) -> bool {
|
||||
host.starts_with(WRAITH_PREFIX)
|
||||
}
|
||||
|
||||
pub trait DuplexStream: AsyncRead + AsyncWrite + Unpin + Send {}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite + Unpin + Send> DuplexStream for T {}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ControlChannelHandler: Send + Sync {
|
||||
async fn handle_channel(&self, stream: Box<dyn DuplexStream>);
|
||||
}
|
||||
|
||||
pub struct ControlChannelRouter {
|
||||
handler: Option<Box<dyn ControlChannelHandler>>,
|
||||
}
|
||||
|
||||
impl ControlChannelRouter {
|
||||
pub fn new(handler: Option<Box<dyn ControlChannelHandler>>) -> Self {
|
||||
Self { handler }
|
||||
}
|
||||
|
||||
pub fn without_handler() -> Self {
|
||||
Self { handler: None }
|
||||
}
|
||||
|
||||
pub fn with_handler(handler: Box<dyn ControlChannelHandler>) -> Self {
|
||||
Self {
|
||||
handler: Some(handler),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn has_handler(&self) -> bool {
|
||||
self.handler.is_some()
|
||||
}
|
||||
|
||||
pub async fn route(&self, stream: Box<dyn DuplexStream>) -> io::Result<()> {
|
||||
match &self.handler {
|
||||
Some(handler) => {
|
||||
handler.handle_channel(stream).await;
|
||||
Ok(())
|
||||
}
|
||||
None => Err(io::Error::new(
|
||||
io::ErrorKind::ConnectionRefused,
|
||||
"no control channel handler configured",
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use tokio::io::duplex;
|
||||
|
||||
#[test]
|
||||
fn wraith_control_destination_constant() {
|
||||
assert_eq!(WRAITH_CONTROL_DESTINATION, "wraith-control");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wraith_prefix_constant() {
|
||||
assert_eq!(WRAITH_PREFIX, "wraith-");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reserved_destination_detected() {
|
||||
assert!(is_reserved_destination("wraith-control"));
|
||||
assert!(is_reserved_destination("wraith-status"));
|
||||
assert!(is_reserved_destination("wraith-events"));
|
||||
assert!(is_reserved_destination("wraith-"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn non_reserved_destination_passes_through() {
|
||||
assert!(!is_reserved_destination("example.com"));
|
||||
assert!(!is_reserved_destination("localhost"));
|
||||
assert!(!is_reserved_destination("192.168.1.1"));
|
||||
assert!(!is_reserved_destination("wraith.example.com"));
|
||||
assert!(!is_reserved_destination(""));
|
||||
assert!(!is_reserved_destination("wrait-control"));
|
||||
assert!(!is_reserved_destination("WRAITH-control"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prefix_matching_case_sensitive() {
|
||||
assert!(!is_reserved_destination("Wraith-control"));
|
||||
assert!(!is_reserved_destination("WRAITH-control"));
|
||||
assert!(is_reserved_destination("wraith-Control"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn router_without_handler_has_no_handler() {
|
||||
let router = ControlChannelRouter::without_handler();
|
||||
assert!(!router.has_handler());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn router_with_handler_has_handler() {
|
||||
struct DummyHandler;
|
||||
#[async_trait]
|
||||
impl ControlChannelHandler for DummyHandler {
|
||||
async fn handle_channel(&self, _stream: Box<dyn DuplexStream>) {}
|
||||
}
|
||||
let router = ControlChannelRouter::with_handler(Box::new(DummyHandler));
|
||||
assert!(router.has_handler());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn route_without_handler_returns_error() {
|
||||
let router = ControlChannelRouter::without_handler();
|
||||
let (_client, server) = duplex(64);
|
||||
let stream: Box<dyn DuplexStream> = Box::new(server);
|
||||
let result = router.route(stream).await;
|
||||
assert!(result.is_err());
|
||||
let err = result.unwrap_err();
|
||||
assert_eq!(err.kind(), io::ErrorKind::ConnectionRefused);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn route_with_handler_succeeds() {
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
struct TrackedHandler {
|
||||
called: Arc<AtomicBool>,
|
||||
}
|
||||
#[async_trait]
|
||||
impl ControlChannelHandler for TrackedHandler {
|
||||
async fn handle_channel(&self, _stream: Box<dyn DuplexStream>) {
|
||||
self.called.store(true, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
let called = Arc::new(AtomicBool::new(false));
|
||||
let handler = TrackedHandler {
|
||||
called: called.clone(),
|
||||
};
|
||||
let router = ControlChannelRouter::with_handler(Box::new(handler));
|
||||
let (_client, server) = duplex(64);
|
||||
let stream: Box<dyn DuplexStream> = Box::new(server);
|
||||
let result = router.route(stream).await;
|
||||
assert!(result.is_ok());
|
||||
assert!(called.load(Ordering::SeqCst));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn route_with_handler_can_read_write() {
|
||||
struct EchoHandler;
|
||||
#[async_trait]
|
||||
impl ControlChannelHandler for EchoHandler {
|
||||
async fn handle_channel(&self, mut stream: Box<dyn DuplexStream>) {
|
||||
let mut buf = [0u8; 64];
|
||||
let n = stream.read(&mut buf).await.unwrap();
|
||||
stream.write_all(&buf[..n]).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let router = ControlChannelRouter::with_handler(Box::new(EchoHandler));
|
||||
let (client, server) = duplex(64);
|
||||
let stream: Box<dyn DuplexStream> = Box::new(server);
|
||||
tokio::spawn(async move {
|
||||
router.route(stream).await.unwrap();
|
||||
});
|
||||
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
let mut client = client;
|
||||
client.write_all(b"hello").await.unwrap();
|
||||
let mut buf = [0u8; 5];
|
||||
client.read_exact(&mut buf).await.unwrap();
|
||||
assert_eq!(&buf, b"hello");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn control_channel_destination_matches_prefix() {
|
||||
assert!(is_reserved_destination(WRAITH_CONTROL_DESTINATION));
|
||||
}
|
||||
}
|
||||
@@ -7,8 +7,9 @@ use russh::server::{Auth, Handler, Msg, Session};
|
||||
use russh::Channel;
|
||||
|
||||
use crate::auth::ServerAuthConfig;
|
||||
|
||||
const WRAITH_PREFIX: &str = "wraith-";
|
||||
use crate::server::control_channel::{
|
||||
ControlChannelHandler, ControlChannelRouter, WRAITH_PREFIX,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ProxyMode {
|
||||
@@ -24,9 +25,9 @@ pub struct ProxyConfig {
|
||||
|
||||
pub struct ServerHandler {
|
||||
auth_config: Arc<ServerAuthConfig>,
|
||||
#[allow(dead_code)]
|
||||
outbound_proxy: Option<ProxyConfig>,
|
||||
remote_addr: Option<SocketAddr>,
|
||||
control_channel_router: ControlChannelRouter,
|
||||
}
|
||||
|
||||
impl ServerHandler {
|
||||
@@ -39,8 +40,21 @@ impl ServerHandler {
|
||||
auth_config,
|
||||
outbound_proxy,
|
||||
remote_addr,
|
||||
control_channel_router: ControlChannelRouter::without_handler(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_control_channel_handler(
|
||||
mut self,
|
||||
handler: Box<dyn ControlChannelHandler>,
|
||||
) -> Self {
|
||||
self.control_channel_router = ControlChannelRouter::with_handler(handler);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn control_channel_router(&self) -> &ControlChannelRouter {
|
||||
&self.control_channel_router
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -99,10 +113,35 @@ impl Handler for ServerHandler {
|
||||
port = port_to_connect,
|
||||
"routing to internal control channel handler"
|
||||
);
|
||||
|
||||
if !self.control_channel_router.has_handler() {
|
||||
tracing::warn!(
|
||||
host = host_to_connect,
|
||||
"no control channel handler configured, rejecting channel open"
|
||||
);
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let _ = channel;
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
let _ = (host_to_connect, port_to_connect, originator_address, originator_port, channel);
|
||||
let proxy_info = self
|
||||
.outbound_proxy
|
||||
.as_ref()
|
||||
.map(|p| format!("{:?}", p.mode))
|
||||
.unwrap_or_else(|| "direct".to_string());
|
||||
|
||||
tracing::info!(
|
||||
host = host_to_connect,
|
||||
port = port_to_connect,
|
||||
originator_address = originator_address,
|
||||
originator_port = originator_port,
|
||||
proxy = %proxy_info,
|
||||
"spawning tcp proxy task"
|
||||
);
|
||||
|
||||
let _ = channel;
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
@@ -237,12 +276,20 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn reserved_wraith_destination_routing() {
|
||||
assert!("wraith-control".starts_with(WRAITH_PREFIX));
|
||||
assert!("wraith-status".starts_with(WRAITH_PREFIX));
|
||||
assert!("wraith-events".starts_with(WRAITH_PREFIX));
|
||||
assert!(!"example.com".starts_with(WRAITH_PREFIX));
|
||||
assert!(!"localhost".starts_with(WRAITH_PREFIX));
|
||||
assert!(!"wraith.example.com".starts_with(WRAITH_PREFIX));
|
||||
use crate::server::control_channel::is_reserved_destination;
|
||||
assert!(is_reserved_destination("wraith-control"));
|
||||
assert!(is_reserved_destination("wraith-status"));
|
||||
assert!(is_reserved_destination("wraith-events"));
|
||||
assert!(!is_reserved_destination("example.com"));
|
||||
assert!(!is_reserved_destination("localhost"));
|
||||
assert!(!is_reserved_destination("wraith.example.com"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn server_handler_without_control_handler_rejects_wraith_destinations() {
|
||||
let auth_config = make_empty_auth_config();
|
||||
let handler = ServerHandler::new(auth_config, None, None);
|
||||
assert!(!handler.control_channel_router().has_handler());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
pub mod channel_proxy;
|
||||
pub mod control_channel;
|
||||
pub mod handler;
|
||||
|
||||
pub use channel_proxy::{ChannelProxyError, connect_outbound, proxy_channel};
|
||||
pub use control_channel::{
|
||||
ControlChannelHandler, ControlChannelRouter, DuplexStream, WRAITH_CONTROL_DESTINATION,
|
||||
WRAITH_PREFIX, is_reserved_destination,
|
||||
};
|
||||
pub use handler::{ProxyConfig, ProxyMode, ServerHandler};
|
||||
Reference in New Issue
Block a user