From e49aef05d33cefeb196fefd0477189dae96a71ac Mon Sep 17 00:00:00 2001 From: "glm-5.1" Date: Tue, 2 Jun 2026 20:22:13 +0000 Subject: [PATCH] fix: wire channel proxy into handler, add client reconnection with backoff, fix ADR-006 violations - handler.channel_open_direct_tcpip now proxies non-wraith channels via connect_outbound+proxy_channel instead of dropping them - ClientSession.run() spawns reconnect monitor that detects handle closure, reconnects with exponential backoff (1s/2s/4s/8s/16s/30s cap), and re-registers remote port forwards - Remove server-side logging of tunnel destinations (ADR-006 compliance) - Remove debug-level logging of proxy targets in channel_proxy --- crates/wraith-core/src/client/connect.rs | 98 +++++++++++++++++++ .../wraith-core/src/server/channel_proxy.rs | 7 +- crates/wraith-core/src/server/handler.rs | 34 ++++--- 3 files changed, 122 insertions(+), 17 deletions(-) diff --git a/crates/wraith-core/src/client/connect.rs b/crates/wraith-core/src/client/connect.rs index 97d278b..1da0471 100644 --- a/crates/wraith-core/src/client/connect.rs +++ b/crates/wraith-core/src/client/connect.rs @@ -306,6 +306,60 @@ impl ClientSession { }; let mut wait_shutdown = self.shutdown_rx.clone(); + let reconnect_handle = Arc::clone(&self.handle); + let reconnect_transport = Arc::clone(&self.transport); + let reconnect_auth = Arc::clone(&self.auth_config); + let reconnect_username = self.username.clone(); + let reconnect_shutdown = self.shutdown_rx.clone(); + let reconnect_remote_specs = remote_specs.clone(); + + let reconnect_monitor = tokio::spawn(async move { + let mut attempts: u32 = 0; + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + if *reconnect_shutdown.borrow() { + break; + } + let h = reconnect_handle.lock().await; + if h.is_closed() { + drop(h); + info!("SSH session closed, starting reconnection"); + let backoff = backoff_duration(attempts); + warn!("reconnect attempt #{}, waiting {:?}", attempts + 1, backoff); + tokio::time::sleep(backoff).await; + + let handler = ClientHandler::from_config(&reconnect_auth); + let username = reconnect_username.clone(); + match establish_session(&*reconnect_transport, handler, &reconnect_auth, &username).await { + Ok(new_handle) => { + info!("reconnection successful"); + { + let mut guard = reconnect_handle.lock().await; + *guard = new_handle; + } + for spec in &reconnect_remote_specs { + match RemoteForwarder::new(spec.clone()) { + Ok(rf) => { + let mut h = reconnect_handle.lock().await; + match rf.register(&mut h).await { + Ok(_) => debug!("re-registered remote forward: {}", spec), + Err(e) => warn!("failed to re-register remote forward {}: {e}", spec), + } + } + Err(e) => warn!("failed to create remote forwarder: {e}"), + } + } + attempts = 0; + } + Err(e) => { + warn!("reconnection attempt failed: {e}"); + attempts += 1; + } + } + } + } + }); + tokio::select! { _ = wait_shutdown.changed() => { if *wait_shutdown.borrow() { @@ -317,6 +371,8 @@ impl ClientSession { } } + reconnect_monitor.abort(); + #[cfg(unix)] signal_done.abort(); @@ -358,6 +414,48 @@ fn derive_username() -> String { .unwrap_or_else(|_| "wraith".to_string()) } +async fn establish_session( + transport: &T, + handler: ClientHandler, + auth_config: &ClientAuthConfig, + username: &str, +) -> Result, ConnectError> { + let stream = transport.connect().await.map_err(|e| { + error!("transport connect failed: {e}"); + ConnectError::ConnectionFailed + })?; + + let config = Arc::new(client::Config::default()); + let mut handle = client::connect_stream(config, stream, handler) + .await + .map_err(|e| { + error!("SSH connect failed: {e}"); + ConnectError::ConnectionFailed + })?; + + let auth_ok = auth_config + .authenticate(&mut handle, username) + .await + .map_err(|_| ConnectError::AuthFailed)?; + if !auth_ok { + return Err(ConnectError::AuthFailed); + } + + Ok(handle) +} + +fn backoff_duration(attempt: u32) -> Duration { + let secs: u64 = match attempt { + 0 => 1, + 1 => 2, + 2 => 4, + 3 => 8, + 4 => 16, + _ => 30, + }; + Duration::from_secs(secs) +} + fn build_local_forwarders(opts: &ConnectOptions) -> Result, ConnectError> { let mut forwarders = Vec::new(); for spec_str in &opts.forwards { diff --git a/crates/wraith-core/src/server/channel_proxy.rs b/crates/wraith-core/src/server/channel_proxy.rs index 8482d7c..55309a3 100644 --- a/crates/wraith-core/src/server/channel_proxy.rs +++ b/crates/wraith-core/src/server/channel_proxy.rs @@ -142,16 +142,13 @@ async fn connect_http_connect( } } -fn map_connection_error(e: std::io::Error, target: SocketAddr) -> ChannelProxyError { +fn map_connection_error(e: std::io::Error, _target: SocketAddr) -> ChannelProxyError { match e.kind() { std::io::ErrorKind::ConnectionRefused => ChannelProxyError::ConnectionRefused, std::io::ErrorKind::AddrNotAvailable | std::io::ErrorKind::NetworkUnreachable | std::io::ErrorKind::HostUnreachable => ChannelProxyError::TargetUnreachable, - _ => { - tracing::debug!(error = %e, "outbound connection failed to {:?}", target); - ChannelProxyError::Io(e) - } + _ => ChannelProxyError::Io(e), } } diff --git a/crates/wraith-core/src/server/handler.rs b/crates/wraith-core/src/server/handler.rs index 7e5e8c0..a6e158b 100644 --- a/crates/wraith-core/src/server/handler.rs +++ b/crates/wraith-core/src/server/handler.rs @@ -210,17 +210,7 @@ impl Handler for ServerHandler { _session: &mut Session, ) -> Result { if host_to_connect.starts_with(WRAITH_PREFIX) { - tracing::info!( - host = host_to_connect, - port = port_to_connect, - "routing to internal control channel handler" - ); - if !self.control_channel_router.has_handler() { - tracing::warn!( - host = host_to_connect, - "no control channel handler configured, rejecting channel open" - ); return Ok(false); } @@ -228,8 +218,28 @@ impl Handler for ServerHandler { return Ok(true); } - let _ = (host_to_connect, port_to_connect, originator_address, originator_port, channel); - Ok(false) + let target_host = host_to_connect.to_string(); + let target_port = port_to_connect; + let proxy_config = self.outbound_proxy.clone().unwrap_or(ProxyConfig { + mode: ProxyMode::Direct, + }); + + tokio::spawn(async move { + let target = match format!("{target_host}:{target_port}").parse::() { + Ok(addr) => addr, + Err(_) => match tokio::net::lookup_host((&target_host[..], target_port as u16)).await { + Ok(mut addrs) => match addrs.next() { + Some(addr) => addr, + None => return, + }, + Err(_) => return, + }, + }; + crate::server::channel_proxy::proxy_channel(channel.into_stream(), target, &proxy_config).await; + }); + + let _ = (originator_address, originator_port); + Ok(true) } async fn channel_open_session(