diff --git a/crates/alknet-core/src/interface/config.rs b/crates/alknet-core/src/interface/config.rs index 250a5af..3528d73 100644 --- a/crates/alknet-core/src/interface/config.rs +++ b/crates/alknet-core/src/interface/config.rs @@ -111,7 +111,9 @@ pub struct SshInterfaceConfig { pub host_key: Arc, } -pub struct RawFramingConfig {} +pub struct RawFramingConfig { + pub auth: Arc, +} #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct HttpInterfaceConfig { @@ -141,6 +143,7 @@ impl std::fmt::Display for DnsInterfaceConfig { #[cfg(test)] mod tests { use super::*; + use crate::auth::ConfigIdentityProvider; #[test] fn stream_interface_kind_display() { @@ -172,7 +175,11 @@ mod tests { }); assert_eq!(ssh_config.kind(), StreamInterfaceKind::Ssh); - let raw_config = StreamInterfaceConfig::RawFraming(RawFramingConfig {}); + let raw_config = StreamInterfaceConfig::RawFraming(RawFramingConfig { + auth: Arc::new(ConfigIdentityProvider::new(Arc::new(ArcSwap::new( + Arc::new(DynamicConfig::default()), + )))), + }); assert_eq!(raw_config.kind(), StreamInterfaceKind::RawFraming); } @@ -211,7 +218,10 @@ mod tests { #[test] fn raw_framing_config_minimal() { - let _config = RawFramingConfig {}; + let auth: Arc = Arc::new(ConfigIdentityProvider::new(Arc::new( + ArcSwap::new(Arc::new(DynamicConfig::default())), + ))); + let _config = RawFramingConfig { auth }; } #[test] diff --git a/crates/alknet-core/src/interface/raw_framing.rs b/crates/alknet-core/src/interface/raw_framing.rs index 3563a36..c2acd6c 100644 --- a/crates/alknet-core/src/interface/raw_framing.rs +++ b/crates/alknet-core/src/interface/raw_framing.rs @@ -1,12 +1,18 @@ +use std::sync::Arc; + use anyhow::Result; use async_trait::async_trait; +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}; +use crate::auth::{AuthToken, Identity, IdentityProvider}; +use crate::call::frame::{decode_with_remainder, encode}; +use crate::call::EventEnvelope; use crate::interface::session::{InterfaceEvent, InterfaceSession}; use crate::interface::{StreamInterface, StreamInterfaceConfig, TransportStream}; -pub struct RawFramingInterface; +const READ_BUF_SIZE: usize = 8192; -pub struct RawFramingSession; +pub struct RawFramingInterface; #[async_trait] impl StreamInterface for RawFramingInterface { @@ -14,49 +20,380 @@ impl StreamInterface for RawFramingInterface { async fn accept( &self, - _stream: Box, - _config: &StreamInterfaceConfig, + stream: Box, + config: &StreamInterfaceConfig, ) -> Result { - Err(anyhow::anyhow!( - "RawFramingInterface is not yet implemented (Phase 4+)" - )) + let raw_config = match config { + StreamInterfaceConfig::RawFraming(c) => c, + StreamInterfaceConfig::Ssh(_) => { + return Err(anyhow::anyhow!( + "RawFramingInterface received SshInterfaceConfig" + )); + } + }; + + Ok(RawFramingSession::new(stream, Arc::clone(&raw_config.auth))) + } +} + +enum AuthState { + Pending, + Authenticated(Identity), + Failed, +} + +pub struct RawFramingSession { + reader: BufReader>>, + writer: BufWriter>>, + auth_state: AuthState, + identity_provider: Arc, + read_buf: Vec, +} + +impl RawFramingSession { + pub fn new( + stream: Box, + identity_provider: Arc, + ) -> Self { + let (read_half, write_half) = tokio::io::split(stream); + Self { + reader: BufReader::new(read_half), + writer: BufWriter::new(write_half), + auth_state: AuthState::Pending, + identity_provider, + read_buf: Vec::new(), + } + } + + async fn read_frame(&mut self) -> Result { + loop { + match decode_with_remainder(&self.read_buf) { + Ok((envelope, consumed)) => { + self.read_buf.drain(..consumed); + return Ok(envelope); + } + Err(crate::call::frame::FrameDecodeError::TooShort { .. }) + | Err(crate::call::frame::FrameDecodeError::Incomplete { .. }) => { + let mut tmp = [0u8; READ_BUF_SIZE]; + let n = self.reader.read(&mut tmp).await?; + if n == 0 { + return Err(anyhow::anyhow!("stream closed while reading frame")); + } + self.read_buf.extend_from_slice(&tmp[..n]); + } + Err(crate::call::frame::FrameDecodeError::Json(e)) => { + return Err(anyhow::anyhow!("frame JSON decode error: {e}")); + } + } + } + } + + async fn write_frame(&mut self, envelope: &EventEnvelope) -> Result<()> { + let frame = encode(envelope); + self.writer.write_all(&frame).await?; + self.writer.flush().await?; + Ok(()) } } #[async_trait] impl InterfaceSession for RawFramingSession { async fn recv(&mut self) -> Option { - None + match &self.auth_state { + AuthState::Failed => return None, + AuthState::Authenticated(_) => { + let identity = match &self.auth_state { + AuthState::Authenticated(id) => id.clone(), + _ => unreachable!(), + }; + let envelope = match self.read_frame().await { + Ok(e) => e, + Err(_) => return None, + }; + return Some(InterfaceEvent::with_identity(envelope, identity)); + } + AuthState::Pending => {} + } + + let envelope = match self.read_frame().await { + Ok(e) => e, + Err(_) => { + self.auth_state = AuthState::Failed; + return None; + } + }; + + let token_raw = envelope.payload.as_str().unwrap_or("").as_bytes().to_vec(); + let token = AuthToken { raw: token_raw }; + + match self.identity_provider.resolve_from_token(&token) { + Some(identity) => { + self.auth_state = AuthState::Authenticated(identity.clone()); + Some(InterfaceEvent::with_identity(envelope, identity)) + } + None => { + self.auth_state = AuthState::Failed; + None + } + } } - async fn send(&mut self, _envelope: crate::call::EventEnvelope) -> Result<()> { - Err(anyhow::anyhow!( - "RawFramingSession is not yet implemented (Phase 4+)" - )) + async fn send(&mut self, envelope: EventEnvelope) -> Result<()> { + match self.auth_state { + AuthState::Failed => Err(anyhow::anyhow!("session authentication failed")), + _ => self.write_frame(&envelope).await, + } } } #[cfg(test)] mod tests { use super::*; + use crate::auth::ConfigIdentityProvider; + use crate::config::DynamicConfig; + use crate::interface::RawFramingConfig; + use arc_swap::ArcSwap; + use std::collections::HashMap; + + fn make_provider() -> Arc { + Arc::new(ConfigIdentityProvider::new(Arc::new(ArcSwap::new( + Arc::new(DynamicConfig::default()), + )))) + } + + fn make_provider_with_identity( + identity: Identity, + valid_token: &str, + ) -> (Arc, String) { + struct MockProvider { + identity: Identity, + valid_token: String, + } + impl IdentityProvider for MockProvider { + fn resolve_from_fingerprint(&self, _fp: &str) -> Option { + None + } + fn resolve_from_token(&self, token: &AuthToken) -> Option { + if token.raw == self.valid_token.as_bytes() { + Some(self.identity.clone()) + } else { + None + } + } + } + let provider = Arc::new(MockProvider { + identity, + valid_token: valid_token.to_string(), + }); + (provider, valid_token.to_string()) + } + + #[tokio::test] + async fn raw_framing_interface_accept_succeeds() { + let iface = RawFramingInterface; + let (_client, server) = tokio::io::duplex(1024); + let stream: Box = Box::new(server); + let config = StreamInterfaceConfig::RawFraming(RawFramingConfig { + auth: make_provider(), + }); + let result = iface.accept(stream, &config).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn raw_framing_interface_rejects_ssh_config() { + let iface = RawFramingInterface; + let (_client, server) = tokio::io::duplex(1024); + let stream: Box = Box::new(server); + let config = StreamInterfaceConfig::Ssh(crate::interface::SshInterfaceConfig { + auth: make_provider(), + forwarding: Arc::new(ArcSwap::new(Arc::new(DynamicConfig::default()))), + host_key: Arc::new( + russh::keys::PrivateKey::random( + &mut rand_core::OsRng, + russh::keys::Algorithm::Ed25519, + ) + .unwrap(), + ), + }); + let result = iface.accept(stream, &config).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn raw_framing_session_round_trip() { + let identity = Identity { + id: "test-id".to_string(), + scopes: vec!["relay:connect".to_string()], + resources: HashMap::new(), + }; + let (provider, token_str) = + make_provider_with_identity(identity.clone(), "valid-test-token"); + + let (client, server) = tokio::io::duplex(4096); + let server_stream: Box = Box::new(server); + let client_stream: Box = Box::new(client); + + let mut server_session = RawFramingSession::new(server_stream, provider); + + let auth_envelope = EventEnvelope::new("auth", "auth-1", serde_json::json!(token_str)); + let auth_frame = encode(&auth_envelope); + + let mut client_writer = tokio::io::BufWriter::new(client_stream); + client_writer.write_all(&auth_frame).await.unwrap(); + client_writer.flush().await.unwrap(); + + let event = server_session.recv().await; + assert!(event.is_some()); + let event = event.unwrap(); + assert!(event.identity.is_some()); + assert_eq!(event.identity.as_ref().unwrap().id, "test-id"); + + let data_envelope = + EventEnvelope::call_requested("req-2", serde_json::json!({"op": "test"})); + let data_frame = encode(&data_envelope); + client_writer.write_all(&data_frame).await.unwrap(); + client_writer.flush().await.unwrap(); + + let event = server_session.recv().await; + assert!(event.is_some()); + let event = event.unwrap(); + assert_eq!(event.envelope.r#type, "call.requested"); + assert_eq!(event.envelope.id, "req-2"); + assert!(event.identity.is_some()); + } + + #[tokio::test] + async fn first_frame_auth_valid_token() { + let identity = Identity { + id: "auth-user".to_string(), + scopes: vec!["admin".to_string()], + resources: HashMap::new(), + }; + let (provider, token_str) = make_provider_with_identity(identity, "my-valid-token"); + + let (client, server) = tokio::io::duplex(4096); + let server_stream: Box = Box::new(server); + let client_stream: Box = Box::new(client); + + let mut session = RawFramingSession::new(server_stream, provider); + + let auth_envelope = EventEnvelope::new("auth", "auth-1", serde_json::json!(token_str)); + let frame = encode(&auth_envelope); + let mut writer = tokio::io::BufWriter::new(client_stream); + writer.write_all(&frame).await.unwrap(); + writer.flush().await.unwrap(); + + let event = session.recv().await; + assert!(event.is_some()); + let event = event.unwrap(); + assert!(event.identity.is_some()); + assert_eq!(event.identity.as_ref().unwrap().id, "auth-user"); + assert_eq!(event.identity.as_ref().unwrap().scopes, vec!["admin"]); + } + + #[tokio::test] + async fn first_frame_auth_invalid_token() { + let identity = Identity { + id: "auth-user".to_string(), + scopes: vec![], + resources: HashMap::new(), + }; + let (provider, _) = make_provider_with_identity(identity, "correct-token"); + + let (client, server) = tokio::io::duplex(4096); + let server_stream: Box = Box::new(server); + let client_stream: Box = Box::new(client); + + let mut session = RawFramingSession::new(server_stream, provider); + + let bad_envelope = + EventEnvelope::new("auth", "auth-1", serde_json::json!("bad-token-value")); + let frame = encode(&bad_envelope); + let mut writer = tokio::io::BufWriter::new(client_stream); + writer.write_all(&frame).await.unwrap(); + writer.flush().await.unwrap(); + + let event = session.recv().await; + assert!(event.is_none()); + + let data_envelope = EventEnvelope::call_requested("req-2", serde_json::json!({})); + let result = session.send(data_envelope).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn raw_framing_session_send() { + let identity = Identity { + id: "send-user".to_string(), + scopes: vec!["relay:connect".to_string()], + resources: HashMap::new(), + }; + let (provider, token_str) = make_provider_with_identity(identity, "send-token"); + + let (client, server) = tokio::io::duplex(4096); + let server_stream: Box = Box::new(server); + let client_stream: Box = Box::new(client); + + let mut server_session = RawFramingSession::new(server_stream, provider); + + let auth_envelope = EventEnvelope::new("auth", "auth-1", serde_json::json!(token_str)); + let auth_frame = encode(&auth_envelope); + let mut client_writer = tokio::io::BufWriter::new(client_stream); + client_writer.write_all(&auth_frame).await.unwrap(); + client_writer.flush().await.unwrap(); + + let _ = server_session.recv().await; + + let response = EventEnvelope::call_responded("req-1", serde_json::json!({"result": "ok"})); + let send_result = server_session.send(response).await; + assert!(send_result.is_ok()); + } + + #[tokio::test] + async fn raw_framing_multiple_frames_over_duplex() { + let identity = Identity { + id: "multi-user".to_string(), + scopes: vec!["relay:connect".to_string()], + resources: HashMap::new(), + }; + let (provider, token_str) = make_provider_with_identity(identity, "multi-token"); + + let (client, server) = tokio::io::duplex(8192); + let server_stream: Box = Box::new(server); + let client_stream: Box = Box::new(client); + + let mut session = RawFramingSession::new(server_stream, provider); + let mut client_writer = tokio::io::BufWriter::new(client_stream); + + let auth_envelope = EventEnvelope::new("auth", "auth-0", serde_json::json!(token_str)); + client_writer + .write_all(&encode(&auth_envelope)) + .await + .unwrap(); + + for i in 1..=5 { + let envelope = + EventEnvelope::call_requested(format!("req-{i}"), serde_json::json!({"seq": i})); + client_writer.write_all(&encode(&envelope)).await.unwrap(); + } + client_writer.flush().await.unwrap(); + + let auth_event = session.recv().await; + assert!(auth_event.is_some()); + assert!(auth_event.unwrap().identity.is_some()); + + for i in 1..=5 { + let event = session.recv().await; + assert!(event.is_some()); + let event = event.unwrap(); + assert_eq!(event.envelope.id, format!("req-{i}")); + assert!(event.identity.is_some()); + } + } #[test] fn raw_framing_interface_type_exists() { let _iface = RawFramingInterface; } - - #[test] - fn raw_framing_session_type_exists() { - let _session = RawFramingSession; - } - - #[tokio::test] - async fn raw_framing_interface_accept_returns_error() { - let iface = RawFramingInterface; - let (_client, server) = tokio::io::duplex(1024); - let stream: Box = Box::new(server); - let config = StreamInterfaceConfig::RawFraming(crate::interface::RawFramingConfig {}); - let result = iface.accept(stream, &config).await; - assert!(result.is_err()); - } } diff --git a/crates/alknet-core/src/interface/ssh.rs b/crates/alknet-core/src/interface/ssh.rs index 8b67241..26f80fc 100644 --- a/crates/alknet-core/src/interface/ssh.rs +++ b/crates/alknet-core/src/interface/ssh.rs @@ -734,7 +734,11 @@ mod tests { let (_client, server) = tokio::io::duplex(1024); let stream: Box = Box::new(server); - let raw_config = StreamInterfaceConfig::RawFraming(crate::interface::RawFramingConfig {}); + let raw_config = StreamInterfaceConfig::RawFraming(crate::interface::RawFramingConfig { + auth: Arc::new(crate::auth::ConfigIdentityProvider::new(Arc::new( + ArcSwap::new(Arc::new(DynamicConfig::default())), + ))), + }); let result = iface.accept(stream, &raw_config).await; assert!(result.is_err()); }