From f19e7675ac85ce7bec5606ac7098e5a144a82aac Mon Sep 17 00:00:00 2001 From: "glm-5.1" Date: Sun, 7 Jun 2026 15:05:11 +0000 Subject: [PATCH] feat(core): implement OperationEnv local dispatch, EventEnvelope, and frame encoding Add local dispatch for OperationEnv with invoke() method, EventEnvelope wire format struct, 4-byte BE length-prefixed frame encoding/decoding, PendingRequestMap for call/subscribe correlation, call protocol event type constants, and default /services/list and /services/schema operations. --- crates/alknet-core/src/call/env.rs | 14 +- crates/alknet-core/src/call/envelope.rs | 141 +++++++++++++ crates/alknet-core/src/call/events.rs | 28 +++ crates/alknet-core/src/call/frame.rs | 168 +++++++++++++++ crates/alknet-core/src/call/mod.rs | 10 + crates/alknet-core/src/call/pending.rs | 265 ++++++++++++++++++++++++ crates/alknet-core/src/call/services.rs | 207 ++++++++++++++++++ crates/alknet-core/src/lib.rs | 11 +- 8 files changed, 837 insertions(+), 7 deletions(-) create mode 100644 crates/alknet-core/src/call/envelope.rs create mode 100644 crates/alknet-core/src/call/events.rs create mode 100644 crates/alknet-core/src/call/frame.rs create mode 100644 crates/alknet-core/src/call/pending.rs create mode 100644 crates/alknet-core/src/call/services.rs diff --git a/crates/alknet-core/src/call/env.rs b/crates/alknet-core/src/call/env.rs index 98aa639..94511f4 100644 --- a/crates/alknet-core/src/call/env.rs +++ b/crates/alknet-core/src/call/env.rs @@ -19,8 +19,8 @@ impl OperationEnv { } pub fn invoke(&self, namespace: &str, operation: &str, input: Value) -> ResponseEnvelope { - let name = format!("{namespace}/{operation}"); - let request_id = format!("env-{name}"); + let name = format!("/{namespace}/{operation}"); + let request_id = format!("env{name}"); let context = OperationContext { request_id: request_id.clone(), parent_request_id: None, @@ -31,6 +31,10 @@ impl OperationEnv { }; self.registry.invoke(&name, input, context) } + + pub fn registry_ref(&self) -> &OperationRegistry { + &self.registry + } } #[cfg(test)] @@ -59,9 +63,9 @@ mod tests { fn operation_env_local_invoke() { let registry = OperationRegistryBuilder::new() .with( - make_spec("auth/verify", "auth"), + make_spec("/auth/verify", "auth"), Arc::new(|_input, _ctx| { - ResponseEnvelope::ok("env-auth/verify", serde_json::json!({"verified": true})) + ResponseEnvelope::ok("env-/auth/verify", serde_json::json!({"verified": true})) }), ) .build(); @@ -85,7 +89,7 @@ mod tests { fn operation_env_invoke_trusted() { let registry = OperationRegistryBuilder::new() .with( - make_spec("auth/verify", "auth"), + make_spec("/auth/verify", "auth"), Arc::new(|_input, ctx| { assert!(ctx.trusted); ResponseEnvelope::ok(&ctx.request_id, serde_json::json!({"ok": true})) diff --git a/crates/alknet-core/src/call/envelope.rs b/crates/alknet-core/src/call/envelope.rs new file mode 100644 index 0000000..1add3e6 --- /dev/null +++ b/crates/alknet-core/src/call/envelope.rs @@ -0,0 +1,141 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct EventEnvelope { + #[serde(rename = "type")] + pub r#type: String, + pub id: String, + pub payload: Value, +} + +impl EventEnvelope { + pub fn new(event_type: impl Into, id: impl Into, payload: Value) -> Self { + Self { + r#type: event_type.into(), + id: id.into(), + payload, + } + } + + pub fn call_requested(id: impl Into, payload: Value) -> Self { + Self::new(super::events::CALL_REQUESTED, id, payload) + } + + pub fn call_responded(id: impl Into, payload: Value) -> Self { + Self::new(super::events::CALL_RESPONDED, id, payload) + } + + pub fn call_completed(id: impl Into, payload: Value) -> Self { + Self::new(super::events::CALL_COMPLETED, id, payload) + } + + pub fn call_aborted(id: impl Into, payload: Value) -> Self { + Self::new(super::events::CALL_ABORTED, id, payload) + } + + pub fn call_error( + id: impl Into, + code: impl Into, + message: impl Into, + retryable: bool, + ) -> Self { + Self::new( + super::events::CALL_ERROR, + id, + serde_json::json!({ + "code": code.into(), + "message": message.into(), + "retryable": retryable, + }), + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn event_envelope_new() { + let env = EventEnvelope::new( + "call.requested", + "req-1", + serde_json::json!({"key": "value"}), + ); + assert_eq!(env.r#type, "call.requested"); + assert_eq!(env.id, "req-1"); + assert_eq!(env.payload, serde_json::json!({"key": "value"})); + } + + #[test] + fn event_envelope_serialization() { + let env = EventEnvelope::new( + "call.requested", + "req-1", + serde_json::json!({"key": "value"}), + ); + let serialized = serde_json::to_string(&env).unwrap(); + let deserialized: EventEnvelope = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized.r#type, "call.requested"); + assert_eq!(deserialized.id, "req-1"); + assert_eq!(deserialized.payload, serde_json::json!({"key": "value"})); + } + + #[test] + fn event_envelope_serialization_type_field() { + let env = EventEnvelope::new("call.requested", "req-1", serde_json::json!(null)); + let serialized = serde_json::to_string(&env).unwrap(); + assert!(serialized.contains("\"type\"")); + } + + #[test] + fn event_envelope_deserialization() { + let json = r#"{"type":"call.responded","id":"req-42","payload":{"result":"ok"}}"#; + let env: EventEnvelope = serde_json::from_str(json).unwrap(); + assert_eq!(env.r#type, "call.responded"); + assert_eq!(env.id, "req-42"); + assert_eq!(env.payload["result"], "ok"); + } + + #[test] + fn event_envelope_call_requested() { + let env = EventEnvelope::call_requested("req-1", serde_json::json!({"op": "test"})); + assert_eq!(env.r#type, "call.requested"); + assert_eq!(env.id, "req-1"); + } + + #[test] + fn event_envelope_call_responded() { + let env = EventEnvelope::call_responded("req-1", serde_json::json!({"data": 42})); + assert_eq!(env.r#type, "call.responded"); + } + + #[test] + fn event_envelope_call_completed() { + let env = EventEnvelope::call_completed("req-1", serde_json::json!(null)); + assert_eq!(env.r#type, "call.completed"); + } + + #[test] + fn event_envelope_call_aborted() { + let env = EventEnvelope::call_aborted("req-1", serde_json::json!({"reason": "cancelled"})); + assert_eq!(env.r#type, "call.aborted"); + } + + #[test] + fn event_envelope_call_error() { + let env = EventEnvelope::call_error("req-1", "TIMEOUT", "timed out", true); + assert_eq!(env.r#type, "call.error"); + assert_eq!(env.id, "req-1"); + assert_eq!(env.payload["code"], "TIMEOUT"); + assert_eq!(env.payload["message"], "timed out"); + assert_eq!(env.payload["retryable"], true); + } + + #[test] + fn event_envelope_empty_id() { + let env = EventEnvelope::new("event.broadcast", "", serde_json::json!({"msg": "hello"})); + assert_eq!(env.id, ""); + } +} diff --git a/crates/alknet-core/src/call/events.rs b/crates/alknet-core/src/call/events.rs new file mode 100644 index 0000000..c59e876 --- /dev/null +++ b/crates/alknet-core/src/call/events.rs @@ -0,0 +1,28 @@ +pub const CALL_REQUESTED: &str = "call.requested"; +pub const CALL_RESPONDED: &str = "call.responded"; +pub const CALL_COMPLETED: &str = "call.completed"; +pub const CALL_ABORTED: &str = "call.aborted"; +pub const CALL_ERROR: &str = "call.error"; + +pub const SERVICE_LIST: &str = "/services/list"; +pub const SERVICE_SCHEMA: &str = "/services/schema"; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn event_type_constants() { + assert_eq!(CALL_REQUESTED, "call.requested"); + assert_eq!(CALL_RESPONDED, "call.responded"); + assert_eq!(CALL_COMPLETED, "call.completed"); + assert_eq!(CALL_ABORTED, "call.aborted"); + assert_eq!(CALL_ERROR, "call.error"); + } + + #[test] + fn service_operation_constants() { + assert_eq!(SERVICE_LIST, "/services/list"); + assert_eq!(SERVICE_SCHEMA, "/services/schema"); + } +} diff --git a/crates/alknet-core/src/call/frame.rs b/crates/alknet-core/src/call/frame.rs new file mode 100644 index 0000000..63168e6 --- /dev/null +++ b/crates/alknet-core/src/call/frame.rs @@ -0,0 +1,168 @@ +use crate::call::envelope::EventEnvelope; + +pub fn encode(envelope: &EventEnvelope) -> Vec { + let json = serde_json::to_vec(envelope).expect("EventEnvelope serialization must not fail"); + let len = json.len() as u32; + let mut frame = Vec::with_capacity(4 + json.len()); + frame.extend_from_slice(&len.to_be_bytes()); + frame.extend_from_slice(&json); + frame +} + +pub fn decode(data: &[u8]) -> Result { + if data.len() < 4 { + return Err(FrameDecodeError::TooShort { + expected: 4, + actual: data.len(), + }); + } + let len = u32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize; + if data.len() < 4 + len { + return Err(FrameDecodeError::Incomplete { + expected: 4 + len, + actual: data.len(), + }); + } + let body = &data[4..4 + len]; + let envelope: EventEnvelope = serde_json::from_slice(body).map_err(FrameDecodeError::Json)?; + Ok(envelope) +} + +pub fn decode_with_remainder(data: &[u8]) -> Result<(EventEnvelope, usize), FrameDecodeError> { + if data.len() < 4 { + return Err(FrameDecodeError::TooShort { + expected: 4, + actual: data.len(), + }); + } + let len = u32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize; + let total = 4 + len; + if data.len() < total { + return Err(FrameDecodeError::Incomplete { + expected: total, + actual: data.len(), + }); + } + let body = &data[4..total]; + let envelope: EventEnvelope = serde_json::from_slice(body).map_err(FrameDecodeError::Json)?; + Ok((envelope, total)) +} + +#[derive(Debug, thiserror::Error)] +pub enum FrameDecodeError { + #[error("frame too short: expected at least {expected} bytes, got {actual}")] + TooShort { expected: usize, actual: usize }, + #[error("incomplete frame: expected {expected} bytes, got {actual}")] + Incomplete { expected: usize, actual: usize }, + #[error("JSON deserialization error: {0}")] + Json(#[from] serde_json::Error), +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::call::events; + use serde_json::json; + + #[test] + fn frame_encode_decode_round_trip() { + let envelope = EventEnvelope::new( + events::CALL_REQUESTED, + "req-1", + json!({"namespace": "auth", "operation": "verify"}), + ); + let frame = encode(&envelope); + let decoded = decode(&frame).unwrap(); + assert_eq!(decoded, envelope); + } + + #[test] + fn frame_encode_starts_with_length_prefix() { + let envelope = EventEnvelope::new(events::CALL_REQUESTED, "req-1", json!({})); + let frame = encode(&envelope); + let json = serde_json::to_vec(&envelope).unwrap(); + let expected_len = json.len() as u32; + let stored_len = u32::from_be_bytes([frame[0], frame[1], frame[2], frame[3]]); + assert_eq!(stored_len, expected_len); + assert_eq!(frame.len(), 4 + json.len()); + } + + #[test] + fn frame_decode_too_short() { + let data = [0u8; 2]; + let result = decode(&data); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(matches!( + err, + FrameDecodeError::TooShort { + expected: 4, + actual: 2 + } + )); + } + + #[test] + fn frame_decode_incomplete() { + let len = 100u32; + let mut data = Vec::new(); + data.extend_from_slice(&len.to_be_bytes()); + data.extend_from_slice(&[0u8; 10]); + let result = decode(&data); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(matches!( + err, + FrameDecodeError::Incomplete { + expected: 104, + actual: 14 + } + )); + } + + #[test] + fn frame_decode_invalid_json() { + let json = b"not valid json"; + let mut data = Vec::new(); + data.extend_from_slice(&(json.len() as u32).to_be_bytes()); + data.extend_from_slice(json); + let result = decode(&data); + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), FrameDecodeError::Json(_))); + } + + #[test] + fn frame_decode_with_remainder() { + let envelope = EventEnvelope::new(events::CALL_RESPONDED, "req-1", json!({"result": 42})); + let frame = encode(&envelope); + let mut extended = frame.clone(); + extended.extend_from_slice(&[0u8; 50]); + let (decoded, consumed) = decode_with_remainder(&extended).unwrap(); + assert_eq!(decoded, envelope); + assert_eq!(consumed, frame.len()); + } + + #[test] + fn frame_encode_decode_empty_payload() { + let envelope = EventEnvelope::new(events::CALL_COMPLETED, "req-1", json!(null)); + let frame = encode(&envelope); + let decoded = decode(&frame).unwrap(); + assert_eq!(decoded, envelope); + } + + #[test] + fn frame_encode_decode_large_payload() { + let large_data: Vec = (0..1000).collect(); + let envelope = EventEnvelope::new(events::CALL_RESPONDED, "req-big", json!(large_data)); + let frame = encode(&envelope); + let decoded = decode(&frame).unwrap(); + assert_eq!(decoded, envelope); + } + + #[test] + fn frame_decode_with_remainder_too_short() { + let data = [0u8; 1]; + let result = decode_with_remainder(&data); + assert!(result.is_err()); + } +} diff --git a/crates/alknet-core/src/call/mod.rs b/crates/alknet-core/src/call/mod.rs index 1d75a18..09d47ed 100644 --- a/crates/alknet-core/src/call/mod.rs +++ b/crates/alknet-core/src/call/mod.rs @@ -1,11 +1,21 @@ pub mod context; pub mod env; +pub mod envelope; +pub mod events; +pub mod frame; +pub mod pending; pub mod registry; pub mod response; +pub mod services; pub mod spec; pub use context::OperationContext; pub use env::OperationEnv; +pub use envelope::EventEnvelope; +pub use events::{CALL_ABORTED, CALL_COMPLETED, CALL_ERROR, CALL_REQUESTED, CALL_RESPONDED}; +pub use frame::{decode, decode_with_remainder, encode, FrameDecodeError}; +pub use pending::PendingRequestMap; pub use registry::{Handler, OperationRegistry, OperationRegistryBuilder}; pub use response::{CallError, ResponseEnvelope}; +pub use services::{register_default_operations, services_list_spec, services_schema_spec}; pub use spec::{AccessControl, OperationSpec, OperationType}; diff --git a/crates/alknet-core/src/call/pending.rs b/crates/alknet-core/src/call/pending.rs new file mode 100644 index 0000000..9f5278c --- /dev/null +++ b/crates/alknet-core/src/call/pending.rs @@ -0,0 +1,265 @@ +use std::collections::HashMap; +use std::time::Instant; + +use serde_json::Value; +use tokio::sync::{mpsc, oneshot}; + +use crate::call::response::CallError; + +enum PendingEntry { + Call { + tx: oneshot::Sender>, + timeout: Instant, + }, + Subscribe { + tx: mpsc::Sender>, + timeout: Option, + }, +} + +pub struct PendingRequestMap { + pending: HashMap, +} + +impl PendingRequestMap { + pub fn new() -> Self { + Self { + pending: HashMap::new(), + } + } + + pub fn insert_call( + &mut self, + request_id: impl Into, + tx: oneshot::Sender>, + timeout: Instant, + ) { + self.pending + .insert(request_id.into(), PendingEntry::Call { tx, timeout }); + } + + pub fn insert_subscribe( + &mut self, + request_id: impl Into, + tx: mpsc::Sender>, + timeout: Option, + ) { + self.pending + .insert(request_id.into(), PendingEntry::Subscribe { tx, timeout }); + } + + pub fn resolve_call(&mut self, request_id: &str, value: Result) -> bool { + if let Some(PendingEntry::Call { tx, .. }) = self.pending.remove(request_id) { + let _ = tx.send(value); + true + } else { + false + } + } + + pub fn push_subscribe(&mut self, request_id: &str, value: Result) -> bool { + match self.pending.get_mut(request_id) { + Some(PendingEntry::Subscribe { tx, .. }) => tx.try_send(value).is_ok(), + _ => false, + } + } + + pub fn complete_subscribe(&mut self, request_id: &str) -> bool { + self.pending.remove(request_id).is_some() + } + + pub fn abort(&mut self, request_id: &str) -> bool { + self.pending.remove(request_id).is_some() + } + + pub fn contains(&self, request_id: &str) -> bool { + self.pending.contains_key(request_id) + } + + pub fn len(&self) -> usize { + self.pending.len() + } + + pub fn is_empty(&self) -> bool { + self.pending.is_empty() + } + + pub fn sweep_expired(&mut self, now: Instant) -> usize { + let expired: Vec = self + .pending + .iter() + .filter(|(_, entry)| match entry { + PendingEntry::Call { timeout, .. } => *timeout <= now, + PendingEntry::Subscribe { timeout, .. } => timeout.is_some_and(|t| t <= now), + }) + .map(|(id, _)| id.clone()) + .collect(); + let count = expired.len(); + for id in &expired { + self.pending.remove(id); + } + count + } +} + +impl Default for PendingRequestMap { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[tokio::test] + async fn pending_request_map_insert_and_resolve_call() { + let mut map = PendingRequestMap::new(); + let (tx, rx) = oneshot::channel(); + let timeout = Instant::now() + Duration::from_secs(30); + map.insert_call("req-1", tx, timeout); + assert!(map.contains("req-1")); + assert_eq!(map.len(), 1); + + let result = map.resolve_call("req-1", Ok(serde_json::json!({"status": "ok"}))); + assert!(result); + assert!(map.is_empty()); + + let response = rx.await.unwrap(); + assert!(response.is_ok()); + assert_eq!(response.unwrap(), serde_json::json!({"status": "ok"})); + } + + #[tokio::test] + async fn pending_request_map_resolve_unknown_call() { + let mut map = PendingRequestMap::new(); + let result = map.resolve_call("unknown", Ok(serde_json::json!(null))); + assert!(!result); + } + + #[tokio::test] + async fn pending_request_map_insert_and_push_subscribe() { + let mut map = PendingRequestMap::new(); + let (tx, mut rx) = mpsc::channel(16); + map.insert_subscribe("sub-1", tx, None); + assert!(map.contains("sub-1")); + + let pushed = map.push_subscribe("sub-1", Ok(serde_json::json!({"item": 1}))); + assert!(pushed); + + let response = rx.recv().await.unwrap(); + assert!(response.is_ok()); + assert_eq!(response.unwrap(), serde_json::json!({"item": 1})); + } + + #[tokio::test] + async fn pending_request_map_complete_subscribe() { + let mut map = PendingRequestMap::new(); + let (tx, mut rx) = mpsc::channel(16); + map.insert_subscribe("sub-1", tx, None); + + map.push_subscribe("sub-1", Ok(serde_json::json!({"item": 1}))); + let completed = map.complete_subscribe("sub-1"); + assert!(completed); + assert!(map.is_empty()); + + let _ = rx.recv().await; + } + + #[tokio::test] + async fn pending_request_map_abort_call() { + let mut map = PendingRequestMap::new(); + let (tx, _rx) = oneshot::channel(); + let timeout = Instant::now() + Duration::from_secs(30); + map.insert_call("req-1", tx, timeout); + + let aborted = map.abort("req-1"); + assert!(aborted); + assert!(map.is_empty()); + } + + #[tokio::test] + async fn pending_request_map_abort_unknown() { + let mut map = PendingRequestMap::new(); + let aborted = map.abort("unknown"); + assert!(!aborted); + } + + #[tokio::test] + async fn pending_request_map_sweep_expired() { + let mut map = PendingRequestMap::new(); + let (tx1, _rx1) = oneshot::channel(); + let (tx2, _rx2) = oneshot::channel(); + let past = Instant::now() - Duration::from_secs(1); + let future = Instant::now() + Duration::from_secs(30); + + map.insert_call("expired-1", tx1, past); + map.insert_call("active-1", tx2, future); + + let swept = map.sweep_expired(Instant::now()); + assert_eq!(swept, 1); + assert!(!map.contains("expired-1")); + assert!(map.contains("active-1")); + } + + #[tokio::test] + async fn pending_request_map_sweep_subscribe_with_timeout() { + let mut map = PendingRequestMap::new(); + let (tx1, _rx1) = mpsc::channel(16); + let (tx2, _rx2) = mpsc::channel(16); + let past = Some(Instant::now() - Duration::from_secs(1)); + let future = Some(Instant::now() + Duration::from_secs(30)); + + map.insert_subscribe("expired-sub", tx1, past); + map.insert_subscribe("active-sub", tx2, future); + + let swept = map.sweep_expired(Instant::now()); + assert_eq!(swept, 1); + assert!(!map.contains("expired-sub")); + assert!(map.contains("active-sub")); + } + + #[tokio::test] + async fn pending_request_map_subscribe_no_timeout_not_swept() { + let mut map = PendingRequestMap::new(); + let (tx, _rx) = mpsc::channel(16); + map.insert_subscribe("sub-no-timeout", tx, None); + + let swept = map.sweep_expired(Instant::now()); + assert_eq!(swept, 0); + assert!(map.contains("sub-no-timeout")); + } + + #[tokio::test] + async fn pending_request_map_push_unknown_subscribe() { + let mut map = PendingRequestMap::new(); + let pushed = map.push_subscribe("unknown", Ok(serde_json::json!(null))); + assert!(!pushed); + } + + #[tokio::test] + async fn pending_request_map_call_error_response() { + let mut map = PendingRequestMap::new(); + let (tx, rx) = oneshot::channel(); + let timeout = Instant::now() + Duration::from_secs(30); + map.insert_call("req-err", tx, timeout); + + let result = map.resolve_call( + "req-err", + Err(CallError { + code: "TIMEOUT".to_string(), + message: "request timed out".to_string(), + retryable: true, + }), + ); + assert!(result); + assert!(map.is_empty()); + + let response = rx.await.unwrap(); + assert!(response.is_err()); + let err = response.unwrap_err(); + assert_eq!(err.code, "TIMEOUT"); + assert!(err.retryable); + } +} diff --git a/crates/alknet-core/src/call/services.rs b/crates/alknet-core/src/call/services.rs new file mode 100644 index 0000000..2888395 --- /dev/null +++ b/crates/alknet-core/src/call/services.rs @@ -0,0 +1,207 @@ +use std::sync::Arc; + +use serde_json::Value; + +use crate::call::context::OperationContext; +use crate::call::response::ResponseEnvelope; +use crate::call::spec::{AccessControl, OperationSpec, OperationType}; + +pub fn services_list_spec() -> OperationSpec { + OperationSpec { + name: super::events::SERVICE_LIST.to_string(), + namespace: "services".to_string(), + op_type: OperationType::Query, + input_schema: serde_json::json!({ + "type": "object", + "properties": {}, + }), + output_schema: serde_json::json!({ + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { "type": "string" }, + "namespace": { "type": "string" }, + "op_type": { "type": "string" }, + }, + }, + }), + access_control: AccessControl { + required_scopes: vec![], + required_scopes_any: None, + resource_type: None, + resource_action: None, + }, + } +} + +pub fn services_schema_spec() -> OperationSpec { + OperationSpec { + name: super::events::SERVICE_SCHEMA.to_string(), + namespace: "services".to_string(), + op_type: OperationType::Query, + input_schema: serde_json::json!({ + "type": "object", + "properties": { + "name": { "type": "string" }, + }, + "required": ["name"], + }), + output_schema: serde_json::json!({ + "type": "object", + "properties": { + "name": { "type": "string" }, + "namespace": { "type": "string" }, + "op_type": { "type": "string" }, + "input_schema": { "type": "object" }, + "output_schema": { "type": "object" }, + }, + }), + access_control: AccessControl { + required_scopes: vec![], + required_scopes_any: None, + resource_type: None, + resource_action: None, + }, + } +} + +pub fn register_default_operations(registry: &mut crate::call::OperationRegistry) { + registry.register(services_list_spec(), Arc::new(services_list_handler)); + registry.register(services_schema_spec(), Arc::new(services_schema_handler)); +} + +fn services_list_handler(_input: Value, ctx: OperationContext) -> ResponseEnvelope { + let registry = &ctx.env.registry_ref(); + let specs = registry.list_operations(); + let ops: Vec = specs + .iter() + .map(|spec| { + serde_json::json!({ + "name": spec.name, + "namespace": spec.namespace, + "op_type": format!("{:?}", spec.op_type).to_lowercase(), + }) + }) + .collect(); + ResponseEnvelope::ok(&ctx.request_id, serde_json::json!({ "operations": ops })) +} + +fn services_schema_handler(input: Value, ctx: OperationContext) -> ResponseEnvelope { + let name = match input.get("name").and_then(|v| v.as_str()) { + Some(n) => n.to_string(), + None => { + return ResponseEnvelope::err( + &ctx.request_id, + "INVALID_INPUT", + "missing required field: name", + false, + ); + } + }; + let registry = &ctx.env.registry_ref(); + match registry.lookup(&name) { + Some((spec, _)) => ResponseEnvelope::ok( + &ctx.request_id, + serde_json::json!({ + "name": spec.name, + "namespace": spec.namespace, + "op_type": format!("{:?}", spec.op_type).to_lowercase(), + "input_schema": spec.input_schema, + "output_schema": spec.output_schema, + }), + ), + None => ResponseEnvelope::err( + &ctx.request_id, + "NOT_FOUND", + format!("operation not found: {name}"), + false, + ), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::call::env::OperationEnv; + + fn make_env() -> OperationEnv { + let mut registry = crate::call::OperationRegistry::new(); + registry.register(services_list_spec(), Arc::new(services_list_handler)); + registry.register(services_schema_spec(), Arc::new(services_schema_handler)); + OperationEnv::local(registry) + } + + #[test] + fn services_list_returns_operations() { + let env = make_env(); + let result = env.invoke("services", "list", serde_json::json!({})); + assert!(result.result.is_ok()); + let value = result.result.unwrap(); + let ops = value.get("operations").unwrap().as_array().unwrap(); + assert_eq!(ops.len(), 2); + } + + #[test] + fn services_schema_returns_spec() { + let env = make_env(); + let result = env.invoke( + "services", + "schema", + serde_json::json!({"name": "/services/list"}), + ); + assert!(result.result.is_ok()); + let value = result.result.unwrap(); + assert_eq!(value["name"], "/services/list"); + assert_eq!(value["namespace"], "services"); + } + + #[test] + fn services_schema_missing_name() { + let env = make_env(); + let result = env.invoke("services", "schema", serde_json::json!({})); + assert!(result.result.is_err()); + let err = result.result.unwrap_err(); + assert_eq!(err.code, "INVALID_INPUT"); + } + + #[test] + fn services_schema_not_found() { + let env = make_env(); + let result = env.invoke( + "services", + "schema", + serde_json::json!({"name": "/nonexistent/op"}), + ); + assert!(result.result.is_err()); + let err = result.result.unwrap_err(); + assert_eq!(err.code, "NOT_FOUND"); + } + + #[test] + fn services_list_spec_fields() { + let spec = services_list_spec(); + assert_eq!(spec.name, "/services/list"); + assert_eq!(spec.namespace, "services"); + assert_eq!(spec.op_type, OperationType::Query); + assert!(!spec.access_control.has_restrictions()); + } + + #[test] + fn services_schema_spec_fields() { + let spec = services_schema_spec(); + assert_eq!(spec.name, "/services/schema"); + assert_eq!(spec.namespace, "services"); + assert_eq!(spec.op_type, OperationType::Query); + assert!(!spec.access_control.has_restrictions()); + } + + #[test] + fn register_default_operations_adds_both() { + let mut registry = crate::call::OperationRegistry::new(); + register_default_operations(&mut registry); + assert!(registry.lookup("/services/list").is_some()); + assert!(registry.lookup("/services/schema").is_some()); + assert_eq!(registry.list_operations().len(), 2); + } +} diff --git a/crates/alknet-core/src/lib.rs b/crates/alknet-core/src/lib.rs index 293985c..400a1a1 100644 --- a/crates/alknet-core/src/lib.rs +++ b/crates/alknet-core/src/lib.rs @@ -67,9 +67,16 @@ pub mod testutil; pub use auth::{AuthProtocol, AuthResult, AuthServiceImpl}; pub use auth::{AuthToken, ConfigIdentityProvider, Identity, IdentityProvider}; pub use call::{ - AccessControl, CallError, Handler, OperationContext, OperationEnv, OperationRegistry, - OperationRegistryBuilder, OperationSpec, OperationType, ResponseEnvelope, + decode as decode_frame, decode_with_remainder as decode_frame_with_remainder, + encode as encode_frame, }; +pub use call::{ + register_default_operations, services_list_spec, services_schema_spec, AccessControl, + CallError, EventEnvelope, FrameDecodeError, Handler, OperationContext, OperationEnv, + OperationRegistry, OperationRegistryBuilder, OperationSpec, OperationType, PendingRequestMap, + ResponseEnvelope, +}; +pub use call::{CALL_ABORTED, CALL_COMPLETED, CALL_ERROR, CALL_REQUESTED, CALL_RESPONDED}; pub use client::channel_manager::{ChannelManager, ForwardRequest}; pub use client::connect::{ClientSession, ConnectError, ConnectOptions, TransportMode}; pub use config::{