feat(core): implement RawFramingInterface accept/recv/send with first-frame auth
Implement RawFramingSession with tokio::io::split for read/write halves, BufReader/BufWriter for buffered I/O, and decode_with_remainder for partial frame reassembly. Add first-frame authentication via IdentityProvider::resolve_from_token(). Add RawFramingConfig.auth field for IdentityProvider reference. Update SshInterface test for new RawFramingConfig shape.
This commit is contained in:
@@ -111,7 +111,9 @@ pub struct SshInterfaceConfig {
|
||||
pub host_key: Arc<PrivateKey>,
|
||||
}
|
||||
|
||||
pub struct RawFramingConfig {}
|
||||
pub struct RawFramingConfig {
|
||||
pub auth: Arc<dyn IdentityProvider>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct HttpInterfaceConfig {
|
||||
@@ -141,6 +143,7 @@ impl std::fmt::Display for DnsInterfaceConfig {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::auth::ConfigIdentityProvider;
|
||||
|
||||
#[test]
|
||||
fn stream_interface_kind_display() {
|
||||
@@ -172,7 +175,11 @@ mod tests {
|
||||
});
|
||||
assert_eq!(ssh_config.kind(), StreamInterfaceKind::Ssh);
|
||||
|
||||
let raw_config = StreamInterfaceConfig::RawFraming(RawFramingConfig {});
|
||||
let raw_config = StreamInterfaceConfig::RawFraming(RawFramingConfig {
|
||||
auth: Arc::new(ConfigIdentityProvider::new(Arc::new(ArcSwap::new(
|
||||
Arc::new(DynamicConfig::default()),
|
||||
)))),
|
||||
});
|
||||
assert_eq!(raw_config.kind(), StreamInterfaceKind::RawFraming);
|
||||
}
|
||||
|
||||
@@ -211,7 +218,10 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn raw_framing_config_minimal() {
|
||||
let _config = RawFramingConfig {};
|
||||
let auth: Arc<dyn IdentityProvider> = Arc::new(ConfigIdentityProvider::new(Arc::new(
|
||||
ArcSwap::new(Arc::new(DynamicConfig::default())),
|
||||
)));
|
||||
let _config = RawFramingConfig { auth };
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,12 +1,18 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
|
||||
|
||||
use crate::auth::{AuthToken, Identity, IdentityProvider};
|
||||
use crate::call::frame::{decode_with_remainder, encode};
|
||||
use crate::call::EventEnvelope;
|
||||
use crate::interface::session::{InterfaceEvent, InterfaceSession};
|
||||
use crate::interface::{StreamInterface, StreamInterfaceConfig, TransportStream};
|
||||
|
||||
pub struct RawFramingInterface;
|
||||
const READ_BUF_SIZE: usize = 8192;
|
||||
|
||||
pub struct RawFramingSession;
|
||||
pub struct RawFramingInterface;
|
||||
|
||||
#[async_trait]
|
||||
impl StreamInterface for RawFramingInterface {
|
||||
@@ -14,49 +20,380 @@ impl StreamInterface for RawFramingInterface {
|
||||
|
||||
async fn accept(
|
||||
&self,
|
||||
_stream: Box<dyn TransportStream>,
|
||||
_config: &StreamInterfaceConfig,
|
||||
stream: Box<dyn TransportStream>,
|
||||
config: &StreamInterfaceConfig,
|
||||
) -> Result<Self::Session> {
|
||||
Err(anyhow::anyhow!(
|
||||
"RawFramingInterface is not yet implemented (Phase 4+)"
|
||||
))
|
||||
let raw_config = match config {
|
||||
StreamInterfaceConfig::RawFraming(c) => c,
|
||||
StreamInterfaceConfig::Ssh(_) => {
|
||||
return Err(anyhow::anyhow!(
|
||||
"RawFramingInterface received SshInterfaceConfig"
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
Ok(RawFramingSession::new(stream, Arc::clone(&raw_config.auth)))
|
||||
}
|
||||
}
|
||||
|
||||
enum AuthState {
|
||||
Pending,
|
||||
Authenticated(Identity),
|
||||
Failed,
|
||||
}
|
||||
|
||||
pub struct RawFramingSession {
|
||||
reader: BufReader<tokio::io::ReadHalf<Box<dyn TransportStream>>>,
|
||||
writer: BufWriter<tokio::io::WriteHalf<Box<dyn TransportStream>>>,
|
||||
auth_state: AuthState,
|
||||
identity_provider: Arc<dyn IdentityProvider>,
|
||||
read_buf: Vec<u8>,
|
||||
}
|
||||
|
||||
impl RawFramingSession {
|
||||
pub fn new(
|
||||
stream: Box<dyn TransportStream>,
|
||||
identity_provider: Arc<dyn IdentityProvider>,
|
||||
) -> Self {
|
||||
let (read_half, write_half) = tokio::io::split(stream);
|
||||
Self {
|
||||
reader: BufReader::new(read_half),
|
||||
writer: BufWriter::new(write_half),
|
||||
auth_state: AuthState::Pending,
|
||||
identity_provider,
|
||||
read_buf: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn read_frame(&mut self) -> Result<EventEnvelope> {
|
||||
loop {
|
||||
match decode_with_remainder(&self.read_buf) {
|
||||
Ok((envelope, consumed)) => {
|
||||
self.read_buf.drain(..consumed);
|
||||
return Ok(envelope);
|
||||
}
|
||||
Err(crate::call::frame::FrameDecodeError::TooShort { .. })
|
||||
| Err(crate::call::frame::FrameDecodeError::Incomplete { .. }) => {
|
||||
let mut tmp = [0u8; READ_BUF_SIZE];
|
||||
let n = self.reader.read(&mut tmp).await?;
|
||||
if n == 0 {
|
||||
return Err(anyhow::anyhow!("stream closed while reading frame"));
|
||||
}
|
||||
self.read_buf.extend_from_slice(&tmp[..n]);
|
||||
}
|
||||
Err(crate::call::frame::FrameDecodeError::Json(e)) => {
|
||||
return Err(anyhow::anyhow!("frame JSON decode error: {e}"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_frame(&mut self, envelope: &EventEnvelope) -> Result<()> {
|
||||
let frame = encode(envelope);
|
||||
self.writer.write_all(&frame).await?;
|
||||
self.writer.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl InterfaceSession for RawFramingSession {
|
||||
async fn recv(&mut self) -> Option<InterfaceEvent> {
|
||||
None
|
||||
match &self.auth_state {
|
||||
AuthState::Failed => return None,
|
||||
AuthState::Authenticated(_) => {
|
||||
let identity = match &self.auth_state {
|
||||
AuthState::Authenticated(id) => id.clone(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let envelope = match self.read_frame().await {
|
||||
Ok(e) => e,
|
||||
Err(_) => return None,
|
||||
};
|
||||
return Some(InterfaceEvent::with_identity(envelope, identity));
|
||||
}
|
||||
AuthState::Pending => {}
|
||||
}
|
||||
|
||||
let envelope = match self.read_frame().await {
|
||||
Ok(e) => e,
|
||||
Err(_) => {
|
||||
self.auth_state = AuthState::Failed;
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let token_raw = envelope.payload.as_str().unwrap_or("").as_bytes().to_vec();
|
||||
let token = AuthToken { raw: token_raw };
|
||||
|
||||
match self.identity_provider.resolve_from_token(&token) {
|
||||
Some(identity) => {
|
||||
self.auth_state = AuthState::Authenticated(identity.clone());
|
||||
Some(InterfaceEvent::with_identity(envelope, identity))
|
||||
}
|
||||
None => {
|
||||
self.auth_state = AuthState::Failed;
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn send(&mut self, _envelope: crate::call::EventEnvelope) -> Result<()> {
|
||||
Err(anyhow::anyhow!(
|
||||
"RawFramingSession is not yet implemented (Phase 4+)"
|
||||
))
|
||||
async fn send(&mut self, envelope: EventEnvelope) -> Result<()> {
|
||||
match self.auth_state {
|
||||
AuthState::Failed => Err(anyhow::anyhow!("session authentication failed")),
|
||||
_ => self.write_frame(&envelope).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::auth::ConfigIdentityProvider;
|
||||
use crate::config::DynamicConfig;
|
||||
use crate::interface::RawFramingConfig;
|
||||
use arc_swap::ArcSwap;
|
||||
use std::collections::HashMap;
|
||||
|
||||
fn make_provider() -> Arc<dyn IdentityProvider> {
|
||||
Arc::new(ConfigIdentityProvider::new(Arc::new(ArcSwap::new(
|
||||
Arc::new(DynamicConfig::default()),
|
||||
))))
|
||||
}
|
||||
|
||||
fn make_provider_with_identity(
|
||||
identity: Identity,
|
||||
valid_token: &str,
|
||||
) -> (Arc<dyn IdentityProvider>, String) {
|
||||
struct MockProvider {
|
||||
identity: Identity,
|
||||
valid_token: String,
|
||||
}
|
||||
impl IdentityProvider for MockProvider {
|
||||
fn resolve_from_fingerprint(&self, _fp: &str) -> Option<Identity> {
|
||||
None
|
||||
}
|
||||
fn resolve_from_token(&self, token: &AuthToken) -> Option<Identity> {
|
||||
if token.raw == self.valid_token.as_bytes() {
|
||||
Some(self.identity.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
let provider = Arc::new(MockProvider {
|
||||
identity,
|
||||
valid_token: valid_token.to_string(),
|
||||
});
|
||||
(provider, valid_token.to_string())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn raw_framing_interface_accept_succeeds() {
|
||||
let iface = RawFramingInterface;
|
||||
let (_client, server) = tokio::io::duplex(1024);
|
||||
let stream: Box<dyn TransportStream> = Box::new(server);
|
||||
let config = StreamInterfaceConfig::RawFraming(RawFramingConfig {
|
||||
auth: make_provider(),
|
||||
});
|
||||
let result = iface.accept(stream, &config).await;
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn raw_framing_interface_rejects_ssh_config() {
|
||||
let iface = RawFramingInterface;
|
||||
let (_client, server) = tokio::io::duplex(1024);
|
||||
let stream: Box<dyn TransportStream> = Box::new(server);
|
||||
let config = StreamInterfaceConfig::Ssh(crate::interface::SshInterfaceConfig {
|
||||
auth: make_provider(),
|
||||
forwarding: Arc::new(ArcSwap::new(Arc::new(DynamicConfig::default()))),
|
||||
host_key: Arc::new(
|
||||
russh::keys::PrivateKey::random(
|
||||
&mut rand_core::OsRng,
|
||||
russh::keys::Algorithm::Ed25519,
|
||||
)
|
||||
.unwrap(),
|
||||
),
|
||||
});
|
||||
let result = iface.accept(stream, &config).await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn raw_framing_session_round_trip() {
|
||||
let identity = Identity {
|
||||
id: "test-id".to_string(),
|
||||
scopes: vec!["relay:connect".to_string()],
|
||||
resources: HashMap::new(),
|
||||
};
|
||||
let (provider, token_str) =
|
||||
make_provider_with_identity(identity.clone(), "valid-test-token");
|
||||
|
||||
let (client, server) = tokio::io::duplex(4096);
|
||||
let server_stream: Box<dyn TransportStream> = Box::new(server);
|
||||
let client_stream: Box<dyn TransportStream> = Box::new(client);
|
||||
|
||||
let mut server_session = RawFramingSession::new(server_stream, provider);
|
||||
|
||||
let auth_envelope = EventEnvelope::new("auth", "auth-1", serde_json::json!(token_str));
|
||||
let auth_frame = encode(&auth_envelope);
|
||||
|
||||
let mut client_writer = tokio::io::BufWriter::new(client_stream);
|
||||
client_writer.write_all(&auth_frame).await.unwrap();
|
||||
client_writer.flush().await.unwrap();
|
||||
|
||||
let event = server_session.recv().await;
|
||||
assert!(event.is_some());
|
||||
let event = event.unwrap();
|
||||
assert!(event.identity.is_some());
|
||||
assert_eq!(event.identity.as_ref().unwrap().id, "test-id");
|
||||
|
||||
let data_envelope =
|
||||
EventEnvelope::call_requested("req-2", serde_json::json!({"op": "test"}));
|
||||
let data_frame = encode(&data_envelope);
|
||||
client_writer.write_all(&data_frame).await.unwrap();
|
||||
client_writer.flush().await.unwrap();
|
||||
|
||||
let event = server_session.recv().await;
|
||||
assert!(event.is_some());
|
||||
let event = event.unwrap();
|
||||
assert_eq!(event.envelope.r#type, "call.requested");
|
||||
assert_eq!(event.envelope.id, "req-2");
|
||||
assert!(event.identity.is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn first_frame_auth_valid_token() {
|
||||
let identity = Identity {
|
||||
id: "auth-user".to_string(),
|
||||
scopes: vec!["admin".to_string()],
|
||||
resources: HashMap::new(),
|
||||
};
|
||||
let (provider, token_str) = make_provider_with_identity(identity, "my-valid-token");
|
||||
|
||||
let (client, server) = tokio::io::duplex(4096);
|
||||
let server_stream: Box<dyn TransportStream> = Box::new(server);
|
||||
let client_stream: Box<dyn TransportStream> = Box::new(client);
|
||||
|
||||
let mut session = RawFramingSession::new(server_stream, provider);
|
||||
|
||||
let auth_envelope = EventEnvelope::new("auth", "auth-1", serde_json::json!(token_str));
|
||||
let frame = encode(&auth_envelope);
|
||||
let mut writer = tokio::io::BufWriter::new(client_stream);
|
||||
writer.write_all(&frame).await.unwrap();
|
||||
writer.flush().await.unwrap();
|
||||
|
||||
let event = session.recv().await;
|
||||
assert!(event.is_some());
|
||||
let event = event.unwrap();
|
||||
assert!(event.identity.is_some());
|
||||
assert_eq!(event.identity.as_ref().unwrap().id, "auth-user");
|
||||
assert_eq!(event.identity.as_ref().unwrap().scopes, vec!["admin"]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn first_frame_auth_invalid_token() {
|
||||
let identity = Identity {
|
||||
id: "auth-user".to_string(),
|
||||
scopes: vec![],
|
||||
resources: HashMap::new(),
|
||||
};
|
||||
let (provider, _) = make_provider_with_identity(identity, "correct-token");
|
||||
|
||||
let (client, server) = tokio::io::duplex(4096);
|
||||
let server_stream: Box<dyn TransportStream> = Box::new(server);
|
||||
let client_stream: Box<dyn TransportStream> = Box::new(client);
|
||||
|
||||
let mut session = RawFramingSession::new(server_stream, provider);
|
||||
|
||||
let bad_envelope =
|
||||
EventEnvelope::new("auth", "auth-1", serde_json::json!("bad-token-value"));
|
||||
let frame = encode(&bad_envelope);
|
||||
let mut writer = tokio::io::BufWriter::new(client_stream);
|
||||
writer.write_all(&frame).await.unwrap();
|
||||
writer.flush().await.unwrap();
|
||||
|
||||
let event = session.recv().await;
|
||||
assert!(event.is_none());
|
||||
|
||||
let data_envelope = EventEnvelope::call_requested("req-2", serde_json::json!({}));
|
||||
let result = session.send(data_envelope).await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn raw_framing_session_send() {
|
||||
let identity = Identity {
|
||||
id: "send-user".to_string(),
|
||||
scopes: vec!["relay:connect".to_string()],
|
||||
resources: HashMap::new(),
|
||||
};
|
||||
let (provider, token_str) = make_provider_with_identity(identity, "send-token");
|
||||
|
||||
let (client, server) = tokio::io::duplex(4096);
|
||||
let server_stream: Box<dyn TransportStream> = Box::new(server);
|
||||
let client_stream: Box<dyn TransportStream> = Box::new(client);
|
||||
|
||||
let mut server_session = RawFramingSession::new(server_stream, provider);
|
||||
|
||||
let auth_envelope = EventEnvelope::new("auth", "auth-1", serde_json::json!(token_str));
|
||||
let auth_frame = encode(&auth_envelope);
|
||||
let mut client_writer = tokio::io::BufWriter::new(client_stream);
|
||||
client_writer.write_all(&auth_frame).await.unwrap();
|
||||
client_writer.flush().await.unwrap();
|
||||
|
||||
let _ = server_session.recv().await;
|
||||
|
||||
let response = EventEnvelope::call_responded("req-1", serde_json::json!({"result": "ok"}));
|
||||
let send_result = server_session.send(response).await;
|
||||
assert!(send_result.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn raw_framing_multiple_frames_over_duplex() {
|
||||
let identity = Identity {
|
||||
id: "multi-user".to_string(),
|
||||
scopes: vec!["relay:connect".to_string()],
|
||||
resources: HashMap::new(),
|
||||
};
|
||||
let (provider, token_str) = make_provider_with_identity(identity, "multi-token");
|
||||
|
||||
let (client, server) = tokio::io::duplex(8192);
|
||||
let server_stream: Box<dyn TransportStream> = Box::new(server);
|
||||
let client_stream: Box<dyn TransportStream> = Box::new(client);
|
||||
|
||||
let mut session = RawFramingSession::new(server_stream, provider);
|
||||
let mut client_writer = tokio::io::BufWriter::new(client_stream);
|
||||
|
||||
let auth_envelope = EventEnvelope::new("auth", "auth-0", serde_json::json!(token_str));
|
||||
client_writer
|
||||
.write_all(&encode(&auth_envelope))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
for i in 1..=5 {
|
||||
let envelope =
|
||||
EventEnvelope::call_requested(format!("req-{i}"), serde_json::json!({"seq": i}));
|
||||
client_writer.write_all(&encode(&envelope)).await.unwrap();
|
||||
}
|
||||
client_writer.flush().await.unwrap();
|
||||
|
||||
let auth_event = session.recv().await;
|
||||
assert!(auth_event.is_some());
|
||||
assert!(auth_event.unwrap().identity.is_some());
|
||||
|
||||
for i in 1..=5 {
|
||||
let event = session.recv().await;
|
||||
assert!(event.is_some());
|
||||
let event = event.unwrap();
|
||||
assert_eq!(event.envelope.id, format!("req-{i}"));
|
||||
assert!(event.identity.is_some());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn raw_framing_interface_type_exists() {
|
||||
let _iface = RawFramingInterface;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn raw_framing_session_type_exists() {
|
||||
let _session = RawFramingSession;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn raw_framing_interface_accept_returns_error() {
|
||||
let iface = RawFramingInterface;
|
||||
let (_client, server) = tokio::io::duplex(1024);
|
||||
let stream: Box<dyn TransportStream> = Box::new(server);
|
||||
let config = StreamInterfaceConfig::RawFraming(crate::interface::RawFramingConfig {});
|
||||
let result = iface.accept(stream, &config).await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -734,7 +734,11 @@ mod tests {
|
||||
let (_client, server) = tokio::io::duplex(1024);
|
||||
let stream: Box<dyn TransportStream> = Box::new(server);
|
||||
|
||||
let raw_config = StreamInterfaceConfig::RawFraming(crate::interface::RawFramingConfig {});
|
||||
let raw_config = StreamInterfaceConfig::RawFraming(crate::interface::RawFramingConfig {
|
||||
auth: Arc::new(crate::auth::ConfigIdentityProvider::new(Arc::new(
|
||||
ArcSwap::new(Arc::new(DynamicConfig::default())),
|
||||
))),
|
||||
});
|
||||
let result = iface.accept(stream, &raw_config).await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user