From c9898566b9c28c5ecccfa6d34fc04096563a8eda Mon Sep 17 00:00:00 2001 From: "glm-5.2" Date: Tue, 23 Jun 2026 14:06:48 +0000 Subject: [PATCH] Implement call protocol wire types and framing Implements src/protocol/wire.rs with: - EventEnvelope (type/id/payload, JSON wire format with leading-slash op ids) - ResponseEnvelope and CallError (with optional typed details, ADR-023) - ResponseEnvelope::ok/error/not_found/forbidden helpers - ResponseEnvelope -> EventEnvelope conversion (Ok -> call.responded, Err -> call.error) - FrameFramedReader / FrameFramedWriter: 4-byte big-endian length-prefixed JSON frames - FrameError: Io, Json, ConnectionClosed, InvalidFrame - 20 unit tests covering round-trip, large payloads, truncated frames, helpers Builds on the call/crate-init skeleton. See docs/architecture/crates/call/call-protocol.md and ADR-005/012/023. --- crates/alknet-call/src/protocol/wire.rs | 539 +++++++++++++++++++++++- 1 file changed, 538 insertions(+), 1 deletion(-) diff --git a/crates/alknet-call/src/protocol/wire.rs b/crates/alknet-call/src/protocol/wire.rs index 6c62352..7a0d8ab 100644 --- a/crates/alknet-call/src/protocol/wire.rs +++ b/crates/alknet-call/src/protocol/wire.rs @@ -4,4 +4,541 @@ //! See `docs/architecture/crates/call/call-protocol.md` for the full //! specification. -// TODO: implement +use std::io; + +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +pub const EVENT_REQUESTED: &str = "call.requested"; +pub const EVENT_RESPONDED: &str = "call.responded"; +pub const EVENT_COMPLETED: &str = "call.completed"; +pub const EVENT_ABORTED: &str = "call.aborted"; +pub const EVENT_ERROR: &str = "call.error"; + +const LENGTH_PREFIX_BYTES: usize = 4; +const MAX_FRAME_SIZE: u32 = 64 * 1024 * 1024; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct EventEnvelope { + #[serde(rename = "type")] + pub r#type: String, + pub id: String, + pub payload: Value, +} + +impl EventEnvelope { + pub fn new(event_type: impl Into, id: impl Into, payload: Value) -> Self { + Self { + r#type: event_type.into(), + id: id.into(), + payload, + } + } + + pub fn requested(id: impl Into, payload: Value) -> Self { + Self::new(EVENT_REQUESTED, id, payload) + } + + pub fn responded(id: impl Into, output: Value) -> Self { + Self::new(EVENT_RESPONDED, id, serde_json::json!({ "output": output })) + } + + pub fn completed(id: impl Into) -> Self { + Self::new(EVENT_COMPLETED, id, serde_json::json!({})) + } + + pub fn aborted(id: impl Into) -> Self { + Self::new(EVENT_ABORTED, id, serde_json::json!({})) + } + + pub fn error(id: impl Into, error: &CallError) -> Self { + let payload = serde_json::to_value(error).unwrap_or(Value::Null); + Self::new(EVENT_ERROR, id, payload) + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct CallError { + pub code: String, + pub message: String, + pub retryable: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub details: Option, +} + +impl CallError { + pub fn new(code: impl Into, message: impl Into, retryable: bool) -> Self { + Self { + code: code.into(), + message: message.into(), + retryable, + details: None, + } + } + + pub fn with_details(mut self, details: Value) -> Self { + self.details = Some(details); + self + } + + pub fn not_found(op_name: &str) -> Self { + Self::new( + "NOT_FOUND", + format!("operation not found: {op_name}"), + false, + ) + } + + pub fn forbidden(message: impl Into) -> Self { + Self::new("FORBIDDEN", message, false) + } + + pub fn invalid_input(message: impl Into) -> Self { + Self::new("INVALID_INPUT", message, false) + } + + pub fn internal(message: impl Into) -> Self { + Self::new("INTERNAL", message, false) + } + + pub fn timeout(message: impl Into) -> Self { + Self::new("TIMEOUT", message, true) + } +} + +impl Eq for CallError {} + +#[derive(Debug, Clone, PartialEq)] +pub struct ResponseEnvelope { + pub request_id: String, + pub result: Result, +} + +impl ResponseEnvelope { + pub fn ok(request_id: impl Into, output: Value) -> Self { + Self { + request_id: request_id.into(), + result: Ok(output), + } + } + + pub fn error(request_id: impl Into, error: CallError) -> Self { + Self { + request_id: request_id.into(), + result: Err(error), + } + } + + pub fn not_found(request_id: impl Into, op_name: &str) -> Self { + Self::error(request_id, CallError::not_found(op_name)) + } + + pub fn forbidden(request_id: impl Into, message: impl Into) -> Self { + Self::error(request_id, CallError::forbidden(message)) + } + + pub fn into_event(self) -> EventEnvelope { + let id = self.request_id; + match self.result { + Ok(output) => EventEnvelope::responded(id, output), + Err(ref err) => EventEnvelope::error(id, err), + } + } +} + +impl From for EventEnvelope { + fn from(envelope: ResponseEnvelope) -> EventEnvelope { + envelope.into_event() + } +} + +#[derive(Debug, thiserror::Error)] +pub enum FrameError { + #[error("io error: {0}")] + Io(#[from] io::Error), + #[error("json error: {0}")] + Json(#[from] serde_json::Error), + #[error("connection closed")] + ConnectionClosed, + #[error("invalid frame")] + InvalidFrame, +} + +pub struct FrameFramedReader { + reader: R, + len_buf: [u8; LENGTH_PREFIX_BYTES], +} + +impl FrameFramedReader { + pub fn new(reader: R) -> Self { + Self { + reader, + len_buf: [0u8; LENGTH_PREFIX_BYTES], + } + } + + pub fn into_inner(self) -> R { + self.reader + } + + pub async fn read_frame(&mut self) -> Result { + match self.reader.read_exact(&mut self.len_buf).await { + Ok(_) => {} + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => { + return Err(FrameError::ConnectionClosed); + } + Err(e) => return Err(FrameError::Io(e)), + } + + let length = u32::from_be_bytes(self.len_buf); + if length == 0 { + return Err(FrameError::InvalidFrame); + } + if length > MAX_FRAME_SIZE { + return Err(FrameError::InvalidFrame); + } + + let mut body = vec![0u8; length as usize]; + match self.reader.read_exact(&mut body).await { + Ok(_) => {} + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => { + return Err(FrameError::ConnectionClosed); + } + Err(e) => return Err(FrameError::Io(e)), + } + + let envelope: EventEnvelope = serde_json::from_slice(&body)?; + Ok(envelope) + } +} + +pub struct FrameFramedWriter { + writer: W, +} + +impl FrameFramedWriter { + pub fn new(writer: W) -> Self { + Self { writer } + } + + pub fn into_inner(self) -> W { + self.writer + } + + pub async fn write_frame(&mut self, envelope: &EventEnvelope) -> Result<(), FrameError> { + let body = serde_json::to_vec(envelope)?; + let len = body.len(); + if len > MAX_FRAME_SIZE as usize { + return Err(FrameError::InvalidFrame); + } + let len_bytes = (len as u32).to_be_bytes(); + self.writer.write_all(&len_bytes).await?; + self.writer.write_all(&body).await?; + self.writer.flush().await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::io::{duplex, AsyncReadExt}; + + fn sample_envelope() -> EventEnvelope { + EventEnvelope::new( + "call.requested", + "req-1", + serde_json::json!({ + "operationId": "/fs/readFile", + "input": { "path": "/etc/hosts" } + }), + ) + } + + #[tokio::test] + async fn round_trip_envelope() { + let (client, server) = duplex(8 * 1024); + let envelope = sample_envelope(); + + let mut writer = FrameFramedWriter::new(client); + writer.write_frame(&envelope).await.unwrap(); + drop(writer); + + let mut reader = FrameFramedReader::new(server); + let read = reader.read_frame().await.unwrap(); + assert_eq!(read, envelope); + } + + #[tokio::test] + async fn round_trip_multiple_frames() { + let (client, server) = duplex(8 * 1024); + + let envelopes = vec![ + EventEnvelope::responded("a", Value::String("hello".into())), + EventEnvelope::completed("a"), + EventEnvelope::aborted("b"), + ]; + + { + let mut writer = FrameFramedWriter::new(client); + for e in &envelopes { + writer.write_frame(e).await.unwrap(); + } + } + + let mut reader = FrameFramedReader::new(server); + for expected in envelopes { + let read = reader.read_frame().await.unwrap(); + assert_eq!(read, expected); + } + } + + #[tokio::test] + async fn read_frame_on_closed_reader_returns_connection_closed() { + let (_, server) = duplex(8 * 1024); + let mut reader = FrameFramedReader::new(server); + match reader.read_frame().await { + Err(FrameError::ConnectionClosed) => {} + other => panic!("expected ConnectionClosed, got {other:?}"), + } + } + + #[tokio::test] + async fn truncated_body_returns_connection_closed() { + let (mut client, server) = duplex(8 * 1024); + let envelope = sample_envelope(); + let body = serde_json::to_vec(&envelope).unwrap(); + let len_bytes = (body.len() as u32).to_be_bytes(); + client.write_all(&len_bytes).await.unwrap(); + client.write_all(&body[..body.len() / 2]).await.unwrap(); + drop(client); + + let mut reader = FrameFramedReader::new(server); + match reader.read_frame().await { + Err(FrameError::ConnectionClosed) => {} + other => panic!("expected ConnectionClosed, got {other:?}"), + } + } + + #[tokio::test] + async fn zero_length_frame_is_invalid() { + let (mut client, server) = duplex(8 * 1024); + client.write_all(&[0u8, 0, 0, 0]).await.unwrap(); + drop(client); + + let mut reader = FrameFramedReader::new(server); + match reader.read_frame().await { + Err(FrameError::InvalidFrame) => {} + other => panic!("expected InvalidFrame, got {other:?}"), + } + } + + #[tokio::test] + async fn oversized_frame_is_invalid() { + let (mut client, server) = duplex(8 * 1024); + let too_big = (MAX_FRAME_SIZE + 1u32).to_be_bytes(); + client.write_all(&too_big).await.unwrap(); + drop(client); + + let mut reader = FrameFramedReader::new(server); + match reader.read_frame().await { + Err(FrameError::InvalidFrame) => {} + other => panic!("expected InvalidFrame, got {other:?}"), + } + } + + #[tokio::test] + async fn framing_handles_large_payload() { + let (client, server) = duplex(1024 * 1024); + let big = "x".repeat(64 * 1024); + let envelope = EventEnvelope::responded("big", Value::String(big.clone())); + + let mut writer = FrameFramedWriter::new(client); + writer.write_frame(&envelope).await.unwrap(); + drop(writer); + + let mut reader = FrameFramedReader::new(server); + let read = reader.read_frame().await.unwrap(); + assert_eq!(read, envelope); + match read.payload { + Value::Object(map) => match map.get("output") { + Some(Value::String(s)) => assert_eq!(s, &big), + other => panic!("expected output string, got {other:?}"), + }, + other => panic!("expected object payload, got {other:?}"), + } + } + + #[test] + fn response_envelope_ok_produces_call_responded_event() { + let response = ResponseEnvelope::ok("req-1", Value::String("hi".into())); + let event: EventEnvelope = response.into(); + assert_eq!(event.r#type, EVENT_RESPONDED); + assert_eq!(event.id, "req-1"); + let map = event.payload.as_object().expect("payload is object"); + assert_eq!(map.get("output"), Some(&Value::String("hi".into()))); + } + + #[test] + fn response_envelope_error_produces_call_error_event() { + let err = CallError::new("FILE_NOT_FOUND", "file not found: /etc/x", false) + .with_details(serde_json::json!({ "path": "/etc/x" })); + let response = ResponseEnvelope::error("req-2", err); + let event: EventEnvelope = response.into(); + assert_eq!(event.r#type, EVENT_ERROR); + assert_eq!(event.id, "req-2"); + assert_eq!( + event.payload.get("code"), + Some(&Value::String("FILE_NOT_FOUND".into())) + ); + assert_eq!( + event.payload.get("message"), + Some(&Value::String("file not found: /etc/x".into())) + ); + assert_eq!(event.payload.get("retryable"), Some(&Value::Bool(false))); + assert_eq!( + event.payload.get("details"), + Some(&serde_json::json!({ "path": "/etc/x" })) + ); + } + + #[test] + fn response_envelope_not_found_helper() { + let response = ResponseEnvelope::not_found("req-3", "fs/missing"); + assert_eq!(response.request_id, "req-3"); + match &response.result { + Err(e) => { + assert_eq!(e.code, "NOT_FOUND"); + assert!(!e.retryable); + assert!(e.message.contains("fs/missing")); + } + other => panic!("expected Err, got {other:?}"), + } + let event: EventEnvelope = response.into(); + assert_eq!(event.r#type, EVENT_ERROR); + assert_eq!(event.id, "req-3"); + assert_eq!( + event.payload.get("code"), + Some(&Value::String("NOT_FOUND".into())) + ); + } + + #[test] + fn response_envelope_forbidden_helper() { + let response = ResponseEnvelope::forbidden("req-4", "authentication required"); + match &response.result { + Err(e) => { + assert_eq!(e.code, "FORBIDDEN"); + assert_eq!(e.message, "authentication required"); + } + other => panic!("expected Err, got {other:?}"), + } + let event: EventEnvelope = response.into(); + assert_eq!(event.r#type, EVENT_ERROR); + assert_eq!(event.id, "req-4"); + } + + #[test] + fn event_envelope_completed_has_empty_payload() { + let event = EventEnvelope::completed("sub-1"); + assert_eq!(event.r#type, EVENT_COMPLETED); + assert_eq!(event.id, "sub-1"); + assert_eq!(event.payload, serde_json::json!({})); + } + + #[test] + fn event_envelope_aborted_has_empty_payload() { + let event = EventEnvelope::aborted("req-9"); + assert_eq!(event.r#type, EVENT_ABORTED); + assert_eq!(event.id, "req-9"); + assert_eq!(event.payload, serde_json::json!({})); + } + + #[test] + fn event_envelope_responded_wraps_output() { + let event = EventEnvelope::responded("req-1", Value::Number(42.into())); + assert_eq!(event.r#type, EVENT_RESPONDED); + assert_eq!(event.payload.get("output"), Some(&Value::Number(42.into()))); + } + + #[test] + fn event_envelope_serializes_type_field() { + let event = sample_envelope(); + let json = serde_json::to_string(&event).unwrap(); + assert!(json.contains("\"type\":\"call.requested\"")); + assert!(!json.contains("\"r#type\"")); + + let parsed: EventEnvelope = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed, event); + } + + #[test] + fn call_error_skips_missing_details() { + let err = CallError::new("INTERNAL", "boom", false); + let json = serde_json::to_string(&err).unwrap(); + assert!(!json.contains("details")); + } + + #[tokio::test] + async fn read_after_eof_then_eof_returns_connection_closed() { + let mut data = Vec::new(); + let envelope = EventEnvelope::responded("one", Value::Null); + let body = serde_json::to_vec(&envelope).unwrap(); + data.extend_from_slice(&(body.len() as u32).to_be_bytes()); + data.extend_from_slice(&body); + let cursor = std::io::Cursor::new(data); + let mut reader = FrameFramedReader::new(cursor); + let first = reader.read_frame().await.unwrap(); + assert_eq!(first, envelope); + match reader.read_frame().await { + Err(FrameError::ConnectionClosed) => {} + other => panic!("expected ConnectionClosed, got {other:?}"), + } + } + + #[tokio::test] + async fn writer_into_inner_recovers_stream() { + let (client, server) = duplex(8 * 1024); + let envelope = sample_envelope(); + let mut writer = FrameFramedWriter::new(client); + writer.write_frame(&envelope).await.unwrap(); + let mut recovered = writer.into_inner(); + recovered.shutdown().await.unwrap(); + drop(recovered); + + let mut reader = FrameFramedReader::new(server); + let read = reader.read_frame().await.unwrap(); + assert_eq!(read, envelope); + let _ = reader.into_inner(); + } + + #[tokio::test] + async fn reader_handles_partial_length_prefix() { + let (mut client, server) = duplex(8 * 1024); + client.write_all(&[0u8, 0]).await.unwrap(); + drop(client); + let mut reader = FrameFramedReader::new(server); + match reader.read_frame().await { + Err(FrameError::ConnectionClosed) => {} + other => panic!("expected ConnectionClosed, got {other:?}"), + } + } + + #[tokio::test] + async fn reader_drains_remaining_after_read() { + let mut data = Vec::new(); + let envelope = sample_envelope(); + let body = serde_json::to_vec(&envelope).unwrap(); + data.extend_from_slice(&(body.len() as u32).to_be_bytes()); + data.extend_from_slice(&body); + data.extend_from_slice(&[9u8; 4]); + let mut cursor = tokio::io::BufReader::new(std::io::Cursor::new(data)); + let mut reader = FrameFramedReader::new(&mut cursor); + let read = reader.read_frame().await.unwrap(); + assert_eq!(read, envelope); + let mut leftover = Vec::new(); + let _ = cursor.read_to_end(&mut leftover).await.unwrap(); + assert_eq!(leftover, vec![9u8; 4]); + } +}