feat(core): implement OperationEnv local dispatch, EventEnvelope, and frame encoding
Add local dispatch for OperationEnv with invoke() method, EventEnvelope wire format struct, 4-byte BE length-prefixed frame encoding/decoding, PendingRequestMap for call/subscribe correlation, call protocol event type constants, and default /services/list and /services/schema operations.
This commit is contained in:
@@ -19,8 +19,8 @@ impl OperationEnv {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn invoke(&self, namespace: &str, operation: &str, input: Value) -> ResponseEnvelope {
|
pub fn invoke(&self, namespace: &str, operation: &str, input: Value) -> ResponseEnvelope {
|
||||||
let name = format!("{namespace}/{operation}");
|
let name = format!("/{namespace}/{operation}");
|
||||||
let request_id = format!("env-{name}");
|
let request_id = format!("env{name}");
|
||||||
let context = OperationContext {
|
let context = OperationContext {
|
||||||
request_id: request_id.clone(),
|
request_id: request_id.clone(),
|
||||||
parent_request_id: None,
|
parent_request_id: None,
|
||||||
@@ -31,6 +31,10 @@ impl OperationEnv {
|
|||||||
};
|
};
|
||||||
self.registry.invoke(&name, input, context)
|
self.registry.invoke(&name, input, context)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn registry_ref(&self) -> &OperationRegistry {
|
||||||
|
&self.registry
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -59,9 +63,9 @@ mod tests {
|
|||||||
fn operation_env_local_invoke() {
|
fn operation_env_local_invoke() {
|
||||||
let registry = OperationRegistryBuilder::new()
|
let registry = OperationRegistryBuilder::new()
|
||||||
.with(
|
.with(
|
||||||
make_spec("auth/verify", "auth"),
|
make_spec("/auth/verify", "auth"),
|
||||||
Arc::new(|_input, _ctx| {
|
Arc::new(|_input, _ctx| {
|
||||||
ResponseEnvelope::ok("env-auth/verify", serde_json::json!({"verified": true}))
|
ResponseEnvelope::ok("env-/auth/verify", serde_json::json!({"verified": true}))
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
.build();
|
.build();
|
||||||
@@ -85,7 +89,7 @@ mod tests {
|
|||||||
fn operation_env_invoke_trusted() {
|
fn operation_env_invoke_trusted() {
|
||||||
let registry = OperationRegistryBuilder::new()
|
let registry = OperationRegistryBuilder::new()
|
||||||
.with(
|
.with(
|
||||||
make_spec("auth/verify", "auth"),
|
make_spec("/auth/verify", "auth"),
|
||||||
Arc::new(|_input, ctx| {
|
Arc::new(|_input, ctx| {
|
||||||
assert!(ctx.trusted);
|
assert!(ctx.trusted);
|
||||||
ResponseEnvelope::ok(&ctx.request_id, serde_json::json!({"ok": true}))
|
ResponseEnvelope::ok(&ctx.request_id, serde_json::json!({"ok": true}))
|
||||||
|
|||||||
141
crates/alknet-core/src/call/envelope.rs
Normal file
141
crates/alknet-core/src/call/envelope.rs
Normal file
@@ -0,0 +1,141 @@
|
|||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use serde_json::Value;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub struct EventEnvelope {
|
||||||
|
#[serde(rename = "type")]
|
||||||
|
pub r#type: String,
|
||||||
|
pub id: String,
|
||||||
|
pub payload: Value,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventEnvelope {
|
||||||
|
pub fn new(event_type: impl Into<String>, id: impl Into<String>, payload: Value) -> Self {
|
||||||
|
Self {
|
||||||
|
r#type: event_type.into(),
|
||||||
|
id: id.into(),
|
||||||
|
payload,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn call_requested(id: impl Into<String>, payload: Value) -> Self {
|
||||||
|
Self::new(super::events::CALL_REQUESTED, id, payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn call_responded(id: impl Into<String>, payload: Value) -> Self {
|
||||||
|
Self::new(super::events::CALL_RESPONDED, id, payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn call_completed(id: impl Into<String>, payload: Value) -> Self {
|
||||||
|
Self::new(super::events::CALL_COMPLETED, id, payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn call_aborted(id: impl Into<String>, payload: Value) -> Self {
|
||||||
|
Self::new(super::events::CALL_ABORTED, id, payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn call_error(
|
||||||
|
id: impl Into<String>,
|
||||||
|
code: impl Into<String>,
|
||||||
|
message: impl Into<String>,
|
||||||
|
retryable: bool,
|
||||||
|
) -> Self {
|
||||||
|
Self::new(
|
||||||
|
super::events::CALL_ERROR,
|
||||||
|
id,
|
||||||
|
serde_json::json!({
|
||||||
|
"code": code.into(),
|
||||||
|
"message": message.into(),
|
||||||
|
"retryable": retryable,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_envelope_new() {
|
||||||
|
let env = EventEnvelope::new(
|
||||||
|
"call.requested",
|
||||||
|
"req-1",
|
||||||
|
serde_json::json!({"key": "value"}),
|
||||||
|
);
|
||||||
|
assert_eq!(env.r#type, "call.requested");
|
||||||
|
assert_eq!(env.id, "req-1");
|
||||||
|
assert_eq!(env.payload, serde_json::json!({"key": "value"}));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_envelope_serialization() {
|
||||||
|
let env = EventEnvelope::new(
|
||||||
|
"call.requested",
|
||||||
|
"req-1",
|
||||||
|
serde_json::json!({"key": "value"}),
|
||||||
|
);
|
||||||
|
let serialized = serde_json::to_string(&env).unwrap();
|
||||||
|
let deserialized: EventEnvelope = serde_json::from_str(&serialized).unwrap();
|
||||||
|
assert_eq!(deserialized.r#type, "call.requested");
|
||||||
|
assert_eq!(deserialized.id, "req-1");
|
||||||
|
assert_eq!(deserialized.payload, serde_json::json!({"key": "value"}));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_envelope_serialization_type_field() {
|
||||||
|
let env = EventEnvelope::new("call.requested", "req-1", serde_json::json!(null));
|
||||||
|
let serialized = serde_json::to_string(&env).unwrap();
|
||||||
|
assert!(serialized.contains("\"type\""));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_envelope_deserialization() {
|
||||||
|
let json = r#"{"type":"call.responded","id":"req-42","payload":{"result":"ok"}}"#;
|
||||||
|
let env: EventEnvelope = serde_json::from_str(json).unwrap();
|
||||||
|
assert_eq!(env.r#type, "call.responded");
|
||||||
|
assert_eq!(env.id, "req-42");
|
||||||
|
assert_eq!(env.payload["result"], "ok");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_envelope_call_requested() {
|
||||||
|
let env = EventEnvelope::call_requested("req-1", serde_json::json!({"op": "test"}));
|
||||||
|
assert_eq!(env.r#type, "call.requested");
|
||||||
|
assert_eq!(env.id, "req-1");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_envelope_call_responded() {
|
||||||
|
let env = EventEnvelope::call_responded("req-1", serde_json::json!({"data": 42}));
|
||||||
|
assert_eq!(env.r#type, "call.responded");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_envelope_call_completed() {
|
||||||
|
let env = EventEnvelope::call_completed("req-1", serde_json::json!(null));
|
||||||
|
assert_eq!(env.r#type, "call.completed");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_envelope_call_aborted() {
|
||||||
|
let env = EventEnvelope::call_aborted("req-1", serde_json::json!({"reason": "cancelled"}));
|
||||||
|
assert_eq!(env.r#type, "call.aborted");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_envelope_call_error() {
|
||||||
|
let env = EventEnvelope::call_error("req-1", "TIMEOUT", "timed out", true);
|
||||||
|
assert_eq!(env.r#type, "call.error");
|
||||||
|
assert_eq!(env.id, "req-1");
|
||||||
|
assert_eq!(env.payload["code"], "TIMEOUT");
|
||||||
|
assert_eq!(env.payload["message"], "timed out");
|
||||||
|
assert_eq!(env.payload["retryable"], true);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_envelope_empty_id() {
|
||||||
|
let env = EventEnvelope::new("event.broadcast", "", serde_json::json!({"msg": "hello"}));
|
||||||
|
assert_eq!(env.id, "");
|
||||||
|
}
|
||||||
|
}
|
||||||
28
crates/alknet-core/src/call/events.rs
Normal file
28
crates/alknet-core/src/call/events.rs
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
pub const CALL_REQUESTED: &str = "call.requested";
|
||||||
|
pub const CALL_RESPONDED: &str = "call.responded";
|
||||||
|
pub const CALL_COMPLETED: &str = "call.completed";
|
||||||
|
pub const CALL_ABORTED: &str = "call.aborted";
|
||||||
|
pub const CALL_ERROR: &str = "call.error";
|
||||||
|
|
||||||
|
pub const SERVICE_LIST: &str = "/services/list";
|
||||||
|
pub const SERVICE_SCHEMA: &str = "/services/schema";
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_type_constants() {
|
||||||
|
assert_eq!(CALL_REQUESTED, "call.requested");
|
||||||
|
assert_eq!(CALL_RESPONDED, "call.responded");
|
||||||
|
assert_eq!(CALL_COMPLETED, "call.completed");
|
||||||
|
assert_eq!(CALL_ABORTED, "call.aborted");
|
||||||
|
assert_eq!(CALL_ERROR, "call.error");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn service_operation_constants() {
|
||||||
|
assert_eq!(SERVICE_LIST, "/services/list");
|
||||||
|
assert_eq!(SERVICE_SCHEMA, "/services/schema");
|
||||||
|
}
|
||||||
|
}
|
||||||
168
crates/alknet-core/src/call/frame.rs
Normal file
168
crates/alknet-core/src/call/frame.rs
Normal file
@@ -0,0 +1,168 @@
|
|||||||
|
use crate::call::envelope::EventEnvelope;
|
||||||
|
|
||||||
|
pub fn encode(envelope: &EventEnvelope) -> Vec<u8> {
|
||||||
|
let json = serde_json::to_vec(envelope).expect("EventEnvelope serialization must not fail");
|
||||||
|
let len = json.len() as u32;
|
||||||
|
let mut frame = Vec::with_capacity(4 + json.len());
|
||||||
|
frame.extend_from_slice(&len.to_be_bytes());
|
||||||
|
frame.extend_from_slice(&json);
|
||||||
|
frame
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn decode(data: &[u8]) -> Result<EventEnvelope, FrameDecodeError> {
|
||||||
|
if data.len() < 4 {
|
||||||
|
return Err(FrameDecodeError::TooShort {
|
||||||
|
expected: 4,
|
||||||
|
actual: data.len(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
let len = u32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
|
||||||
|
if data.len() < 4 + len {
|
||||||
|
return Err(FrameDecodeError::Incomplete {
|
||||||
|
expected: 4 + len,
|
||||||
|
actual: data.len(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
let body = &data[4..4 + len];
|
||||||
|
let envelope: EventEnvelope = serde_json::from_slice(body).map_err(FrameDecodeError::Json)?;
|
||||||
|
Ok(envelope)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn decode_with_remainder(data: &[u8]) -> Result<(EventEnvelope, usize), FrameDecodeError> {
|
||||||
|
if data.len() < 4 {
|
||||||
|
return Err(FrameDecodeError::TooShort {
|
||||||
|
expected: 4,
|
||||||
|
actual: data.len(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
let len = u32::from_be_bytes([data[0], data[1], data[2], data[3]]) as usize;
|
||||||
|
let total = 4 + len;
|
||||||
|
if data.len() < total {
|
||||||
|
return Err(FrameDecodeError::Incomplete {
|
||||||
|
expected: total,
|
||||||
|
actual: data.len(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
let body = &data[4..total];
|
||||||
|
let envelope: EventEnvelope = serde_json::from_slice(body).map_err(FrameDecodeError::Json)?;
|
||||||
|
Ok((envelope, total))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub enum FrameDecodeError {
|
||||||
|
#[error("frame too short: expected at least {expected} bytes, got {actual}")]
|
||||||
|
TooShort { expected: usize, actual: usize },
|
||||||
|
#[error("incomplete frame: expected {expected} bytes, got {actual}")]
|
||||||
|
Incomplete { expected: usize, actual: usize },
|
||||||
|
#[error("JSON deserialization error: {0}")]
|
||||||
|
Json(#[from] serde_json::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::call::events;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn frame_encode_decode_round_trip() {
|
||||||
|
let envelope = EventEnvelope::new(
|
||||||
|
events::CALL_REQUESTED,
|
||||||
|
"req-1",
|
||||||
|
json!({"namespace": "auth", "operation": "verify"}),
|
||||||
|
);
|
||||||
|
let frame = encode(&envelope);
|
||||||
|
let decoded = decode(&frame).unwrap();
|
||||||
|
assert_eq!(decoded, envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn frame_encode_starts_with_length_prefix() {
|
||||||
|
let envelope = EventEnvelope::new(events::CALL_REQUESTED, "req-1", json!({}));
|
||||||
|
let frame = encode(&envelope);
|
||||||
|
let json = serde_json::to_vec(&envelope).unwrap();
|
||||||
|
let expected_len = json.len() as u32;
|
||||||
|
let stored_len = u32::from_be_bytes([frame[0], frame[1], frame[2], frame[3]]);
|
||||||
|
assert_eq!(stored_len, expected_len);
|
||||||
|
assert_eq!(frame.len(), 4 + json.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn frame_decode_too_short() {
|
||||||
|
let data = [0u8; 2];
|
||||||
|
let result = decode(&data);
|
||||||
|
assert!(result.is_err());
|
||||||
|
let err = result.unwrap_err();
|
||||||
|
assert!(matches!(
|
||||||
|
err,
|
||||||
|
FrameDecodeError::TooShort {
|
||||||
|
expected: 4,
|
||||||
|
actual: 2
|
||||||
|
}
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn frame_decode_incomplete() {
|
||||||
|
let len = 100u32;
|
||||||
|
let mut data = Vec::new();
|
||||||
|
data.extend_from_slice(&len.to_be_bytes());
|
||||||
|
data.extend_from_slice(&[0u8; 10]);
|
||||||
|
let result = decode(&data);
|
||||||
|
assert!(result.is_err());
|
||||||
|
let err = result.unwrap_err();
|
||||||
|
assert!(matches!(
|
||||||
|
err,
|
||||||
|
FrameDecodeError::Incomplete {
|
||||||
|
expected: 104,
|
||||||
|
actual: 14
|
||||||
|
}
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn frame_decode_invalid_json() {
|
||||||
|
let json = b"not valid json";
|
||||||
|
let mut data = Vec::new();
|
||||||
|
data.extend_from_slice(&(json.len() as u32).to_be_bytes());
|
||||||
|
data.extend_from_slice(json);
|
||||||
|
let result = decode(&data);
|
||||||
|
assert!(result.is_err());
|
||||||
|
assert!(matches!(result.unwrap_err(), FrameDecodeError::Json(_)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn frame_decode_with_remainder() {
|
||||||
|
let envelope = EventEnvelope::new(events::CALL_RESPONDED, "req-1", json!({"result": 42}));
|
||||||
|
let frame = encode(&envelope);
|
||||||
|
let mut extended = frame.clone();
|
||||||
|
extended.extend_from_slice(&[0u8; 50]);
|
||||||
|
let (decoded, consumed) = decode_with_remainder(&extended).unwrap();
|
||||||
|
assert_eq!(decoded, envelope);
|
||||||
|
assert_eq!(consumed, frame.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn frame_encode_decode_empty_payload() {
|
||||||
|
let envelope = EventEnvelope::new(events::CALL_COMPLETED, "req-1", json!(null));
|
||||||
|
let frame = encode(&envelope);
|
||||||
|
let decoded = decode(&frame).unwrap();
|
||||||
|
assert_eq!(decoded, envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn frame_encode_decode_large_payload() {
|
||||||
|
let large_data: Vec<i32> = (0..1000).collect();
|
||||||
|
let envelope = EventEnvelope::new(events::CALL_RESPONDED, "req-big", json!(large_data));
|
||||||
|
let frame = encode(&envelope);
|
||||||
|
let decoded = decode(&frame).unwrap();
|
||||||
|
assert_eq!(decoded, envelope);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn frame_decode_with_remainder_too_short() {
|
||||||
|
let data = [0u8; 1];
|
||||||
|
let result = decode_with_remainder(&data);
|
||||||
|
assert!(result.is_err());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,11 +1,21 @@
|
|||||||
pub mod context;
|
pub mod context;
|
||||||
pub mod env;
|
pub mod env;
|
||||||
|
pub mod envelope;
|
||||||
|
pub mod events;
|
||||||
|
pub mod frame;
|
||||||
|
pub mod pending;
|
||||||
pub mod registry;
|
pub mod registry;
|
||||||
pub mod response;
|
pub mod response;
|
||||||
|
pub mod services;
|
||||||
pub mod spec;
|
pub mod spec;
|
||||||
|
|
||||||
pub use context::OperationContext;
|
pub use context::OperationContext;
|
||||||
pub use env::OperationEnv;
|
pub use env::OperationEnv;
|
||||||
|
pub use envelope::EventEnvelope;
|
||||||
|
pub use events::{CALL_ABORTED, CALL_COMPLETED, CALL_ERROR, CALL_REQUESTED, CALL_RESPONDED};
|
||||||
|
pub use frame::{decode, decode_with_remainder, encode, FrameDecodeError};
|
||||||
|
pub use pending::PendingRequestMap;
|
||||||
pub use registry::{Handler, OperationRegistry, OperationRegistryBuilder};
|
pub use registry::{Handler, OperationRegistry, OperationRegistryBuilder};
|
||||||
pub use response::{CallError, ResponseEnvelope};
|
pub use response::{CallError, ResponseEnvelope};
|
||||||
|
pub use services::{register_default_operations, services_list_spec, services_schema_spec};
|
||||||
pub use spec::{AccessControl, OperationSpec, OperationType};
|
pub use spec::{AccessControl, OperationSpec, OperationType};
|
||||||
|
|||||||
265
crates/alknet-core/src/call/pending.rs
Normal file
265
crates/alknet-core/src/call/pending.rs
Normal file
@@ -0,0 +1,265 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
|
use serde_json::Value;
|
||||||
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
|
use crate::call::response::CallError;
|
||||||
|
|
||||||
|
enum PendingEntry {
|
||||||
|
Call {
|
||||||
|
tx: oneshot::Sender<Result<Value, CallError>>,
|
||||||
|
timeout: Instant,
|
||||||
|
},
|
||||||
|
Subscribe {
|
||||||
|
tx: mpsc::Sender<Result<Value, CallError>>,
|
||||||
|
timeout: Option<Instant>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct PendingRequestMap {
|
||||||
|
pending: HashMap<String, PendingEntry>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PendingRequestMap {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
pending: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn insert_call(
|
||||||
|
&mut self,
|
||||||
|
request_id: impl Into<String>,
|
||||||
|
tx: oneshot::Sender<Result<Value, CallError>>,
|
||||||
|
timeout: Instant,
|
||||||
|
) {
|
||||||
|
self.pending
|
||||||
|
.insert(request_id.into(), PendingEntry::Call { tx, timeout });
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn insert_subscribe(
|
||||||
|
&mut self,
|
||||||
|
request_id: impl Into<String>,
|
||||||
|
tx: mpsc::Sender<Result<Value, CallError>>,
|
||||||
|
timeout: Option<Instant>,
|
||||||
|
) {
|
||||||
|
self.pending
|
||||||
|
.insert(request_id.into(), PendingEntry::Subscribe { tx, timeout });
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn resolve_call(&mut self, request_id: &str, value: Result<Value, CallError>) -> bool {
|
||||||
|
if let Some(PendingEntry::Call { tx, .. }) = self.pending.remove(request_id) {
|
||||||
|
let _ = tx.send(value);
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn push_subscribe(&mut self, request_id: &str, value: Result<Value, CallError>) -> bool {
|
||||||
|
match self.pending.get_mut(request_id) {
|
||||||
|
Some(PendingEntry::Subscribe { tx, .. }) => tx.try_send(value).is_ok(),
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn complete_subscribe(&mut self, request_id: &str) -> bool {
|
||||||
|
self.pending.remove(request_id).is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn abort(&mut self, request_id: &str) -> bool {
|
||||||
|
self.pending.remove(request_id).is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn contains(&self, request_id: &str) -> bool {
|
||||||
|
self.pending.contains_key(request_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn len(&self) -> usize {
|
||||||
|
self.pending.len()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.pending.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn sweep_expired(&mut self, now: Instant) -> usize {
|
||||||
|
let expired: Vec<String> = self
|
||||||
|
.pending
|
||||||
|
.iter()
|
||||||
|
.filter(|(_, entry)| match entry {
|
||||||
|
PendingEntry::Call { timeout, .. } => *timeout <= now,
|
||||||
|
PendingEntry::Subscribe { timeout, .. } => timeout.is_some_and(|t| t <= now),
|
||||||
|
})
|
||||||
|
.map(|(id, _)| id.clone())
|
||||||
|
.collect();
|
||||||
|
let count = expired.len();
|
||||||
|
for id in &expired {
|
||||||
|
self.pending.remove(id);
|
||||||
|
}
|
||||||
|
count
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for PendingRequestMap {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pending_request_map_insert_and_resolve_call() {
|
||||||
|
let mut map = PendingRequestMap::new();
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let timeout = Instant::now() + Duration::from_secs(30);
|
||||||
|
map.insert_call("req-1", tx, timeout);
|
||||||
|
assert!(map.contains("req-1"));
|
||||||
|
assert_eq!(map.len(), 1);
|
||||||
|
|
||||||
|
let result = map.resolve_call("req-1", Ok(serde_json::json!({"status": "ok"})));
|
||||||
|
assert!(result);
|
||||||
|
assert!(map.is_empty());
|
||||||
|
|
||||||
|
let response = rx.await.unwrap();
|
||||||
|
assert!(response.is_ok());
|
||||||
|
assert_eq!(response.unwrap(), serde_json::json!({"status": "ok"}));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pending_request_map_resolve_unknown_call() {
|
||||||
|
let mut map = PendingRequestMap::new();
|
||||||
|
let result = map.resolve_call("unknown", Ok(serde_json::json!(null)));
|
||||||
|
assert!(!result);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pending_request_map_insert_and_push_subscribe() {
|
||||||
|
let mut map = PendingRequestMap::new();
|
||||||
|
let (tx, mut rx) = mpsc::channel(16);
|
||||||
|
map.insert_subscribe("sub-1", tx, None);
|
||||||
|
assert!(map.contains("sub-1"));
|
||||||
|
|
||||||
|
let pushed = map.push_subscribe("sub-1", Ok(serde_json::json!({"item": 1})));
|
||||||
|
assert!(pushed);
|
||||||
|
|
||||||
|
let response = rx.recv().await.unwrap();
|
||||||
|
assert!(response.is_ok());
|
||||||
|
assert_eq!(response.unwrap(), serde_json::json!({"item": 1}));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pending_request_map_complete_subscribe() {
|
||||||
|
let mut map = PendingRequestMap::new();
|
||||||
|
let (tx, mut rx) = mpsc::channel(16);
|
||||||
|
map.insert_subscribe("sub-1", tx, None);
|
||||||
|
|
||||||
|
map.push_subscribe("sub-1", Ok(serde_json::json!({"item": 1})));
|
||||||
|
let completed = map.complete_subscribe("sub-1");
|
||||||
|
assert!(completed);
|
||||||
|
assert!(map.is_empty());
|
||||||
|
|
||||||
|
let _ = rx.recv().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pending_request_map_abort_call() {
|
||||||
|
let mut map = PendingRequestMap::new();
|
||||||
|
let (tx, _rx) = oneshot::channel();
|
||||||
|
let timeout = Instant::now() + Duration::from_secs(30);
|
||||||
|
map.insert_call("req-1", tx, timeout);
|
||||||
|
|
||||||
|
let aborted = map.abort("req-1");
|
||||||
|
assert!(aborted);
|
||||||
|
assert!(map.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pending_request_map_abort_unknown() {
|
||||||
|
let mut map = PendingRequestMap::new();
|
||||||
|
let aborted = map.abort("unknown");
|
||||||
|
assert!(!aborted);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pending_request_map_sweep_expired() {
|
||||||
|
let mut map = PendingRequestMap::new();
|
||||||
|
let (tx1, _rx1) = oneshot::channel();
|
||||||
|
let (tx2, _rx2) = oneshot::channel();
|
||||||
|
let past = Instant::now() - Duration::from_secs(1);
|
||||||
|
let future = Instant::now() + Duration::from_secs(30);
|
||||||
|
|
||||||
|
map.insert_call("expired-1", tx1, past);
|
||||||
|
map.insert_call("active-1", tx2, future);
|
||||||
|
|
||||||
|
let swept = map.sweep_expired(Instant::now());
|
||||||
|
assert_eq!(swept, 1);
|
||||||
|
assert!(!map.contains("expired-1"));
|
||||||
|
assert!(map.contains("active-1"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pending_request_map_sweep_subscribe_with_timeout() {
|
||||||
|
let mut map = PendingRequestMap::new();
|
||||||
|
let (tx1, _rx1) = mpsc::channel(16);
|
||||||
|
let (tx2, _rx2) = mpsc::channel(16);
|
||||||
|
let past = Some(Instant::now() - Duration::from_secs(1));
|
||||||
|
let future = Some(Instant::now() + Duration::from_secs(30));
|
||||||
|
|
||||||
|
map.insert_subscribe("expired-sub", tx1, past);
|
||||||
|
map.insert_subscribe("active-sub", tx2, future);
|
||||||
|
|
||||||
|
let swept = map.sweep_expired(Instant::now());
|
||||||
|
assert_eq!(swept, 1);
|
||||||
|
assert!(!map.contains("expired-sub"));
|
||||||
|
assert!(map.contains("active-sub"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pending_request_map_subscribe_no_timeout_not_swept() {
|
||||||
|
let mut map = PendingRequestMap::new();
|
||||||
|
let (tx, _rx) = mpsc::channel(16);
|
||||||
|
map.insert_subscribe("sub-no-timeout", tx, None);
|
||||||
|
|
||||||
|
let swept = map.sweep_expired(Instant::now());
|
||||||
|
assert_eq!(swept, 0);
|
||||||
|
assert!(map.contains("sub-no-timeout"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pending_request_map_push_unknown_subscribe() {
|
||||||
|
let mut map = PendingRequestMap::new();
|
||||||
|
let pushed = map.push_subscribe("unknown", Ok(serde_json::json!(null)));
|
||||||
|
assert!(!pushed);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pending_request_map_call_error_response() {
|
||||||
|
let mut map = PendingRequestMap::new();
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let timeout = Instant::now() + Duration::from_secs(30);
|
||||||
|
map.insert_call("req-err", tx, timeout);
|
||||||
|
|
||||||
|
let result = map.resolve_call(
|
||||||
|
"req-err",
|
||||||
|
Err(CallError {
|
||||||
|
code: "TIMEOUT".to_string(),
|
||||||
|
message: "request timed out".to_string(),
|
||||||
|
retryable: true,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
assert!(result);
|
||||||
|
assert!(map.is_empty());
|
||||||
|
|
||||||
|
let response = rx.await.unwrap();
|
||||||
|
assert!(response.is_err());
|
||||||
|
let err = response.unwrap_err();
|
||||||
|
assert_eq!(err.code, "TIMEOUT");
|
||||||
|
assert!(err.retryable);
|
||||||
|
}
|
||||||
|
}
|
||||||
207
crates/alknet-core/src/call/services.rs
Normal file
207
crates/alknet-core/src/call/services.rs
Normal file
@@ -0,0 +1,207 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use serde_json::Value;
|
||||||
|
|
||||||
|
use crate::call::context::OperationContext;
|
||||||
|
use crate::call::response::ResponseEnvelope;
|
||||||
|
use crate::call::spec::{AccessControl, OperationSpec, OperationType};
|
||||||
|
|
||||||
|
pub fn services_list_spec() -> OperationSpec {
|
||||||
|
OperationSpec {
|
||||||
|
name: super::events::SERVICE_LIST.to_string(),
|
||||||
|
namespace: "services".to_string(),
|
||||||
|
op_type: OperationType::Query,
|
||||||
|
input_schema: serde_json::json!({
|
||||||
|
"type": "object",
|
||||||
|
"properties": {},
|
||||||
|
}),
|
||||||
|
output_schema: serde_json::json!({
|
||||||
|
"type": "array",
|
||||||
|
"items": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"name": { "type": "string" },
|
||||||
|
"namespace": { "type": "string" },
|
||||||
|
"op_type": { "type": "string" },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
access_control: AccessControl {
|
||||||
|
required_scopes: vec![],
|
||||||
|
required_scopes_any: None,
|
||||||
|
resource_type: None,
|
||||||
|
resource_action: None,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn services_schema_spec() -> OperationSpec {
|
||||||
|
OperationSpec {
|
||||||
|
name: super::events::SERVICE_SCHEMA.to_string(),
|
||||||
|
namespace: "services".to_string(),
|
||||||
|
op_type: OperationType::Query,
|
||||||
|
input_schema: serde_json::json!({
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"name": { "type": "string" },
|
||||||
|
},
|
||||||
|
"required": ["name"],
|
||||||
|
}),
|
||||||
|
output_schema: serde_json::json!({
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"name": { "type": "string" },
|
||||||
|
"namespace": { "type": "string" },
|
||||||
|
"op_type": { "type": "string" },
|
||||||
|
"input_schema": { "type": "object" },
|
||||||
|
"output_schema": { "type": "object" },
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
access_control: AccessControl {
|
||||||
|
required_scopes: vec![],
|
||||||
|
required_scopes_any: None,
|
||||||
|
resource_type: None,
|
||||||
|
resource_action: None,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register_default_operations(registry: &mut crate::call::OperationRegistry) {
|
||||||
|
registry.register(services_list_spec(), Arc::new(services_list_handler));
|
||||||
|
registry.register(services_schema_spec(), Arc::new(services_schema_handler));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn services_list_handler(_input: Value, ctx: OperationContext) -> ResponseEnvelope {
|
||||||
|
let registry = &ctx.env.registry_ref();
|
||||||
|
let specs = registry.list_operations();
|
||||||
|
let ops: Vec<Value> = specs
|
||||||
|
.iter()
|
||||||
|
.map(|spec| {
|
||||||
|
serde_json::json!({
|
||||||
|
"name": spec.name,
|
||||||
|
"namespace": spec.namespace,
|
||||||
|
"op_type": format!("{:?}", spec.op_type).to_lowercase(),
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
ResponseEnvelope::ok(&ctx.request_id, serde_json::json!({ "operations": ops }))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn services_schema_handler(input: Value, ctx: OperationContext) -> ResponseEnvelope {
|
||||||
|
let name = match input.get("name").and_then(|v| v.as_str()) {
|
||||||
|
Some(n) => n.to_string(),
|
||||||
|
None => {
|
||||||
|
return ResponseEnvelope::err(
|
||||||
|
&ctx.request_id,
|
||||||
|
"INVALID_INPUT",
|
||||||
|
"missing required field: name",
|
||||||
|
false,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let registry = &ctx.env.registry_ref();
|
||||||
|
match registry.lookup(&name) {
|
||||||
|
Some((spec, _)) => ResponseEnvelope::ok(
|
||||||
|
&ctx.request_id,
|
||||||
|
serde_json::json!({
|
||||||
|
"name": spec.name,
|
||||||
|
"namespace": spec.namespace,
|
||||||
|
"op_type": format!("{:?}", spec.op_type).to_lowercase(),
|
||||||
|
"input_schema": spec.input_schema,
|
||||||
|
"output_schema": spec.output_schema,
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
None => ResponseEnvelope::err(
|
||||||
|
&ctx.request_id,
|
||||||
|
"NOT_FOUND",
|
||||||
|
format!("operation not found: {name}"),
|
||||||
|
false,
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::call::env::OperationEnv;
|
||||||
|
|
||||||
|
fn make_env() -> OperationEnv {
|
||||||
|
let mut registry = crate::call::OperationRegistry::new();
|
||||||
|
registry.register(services_list_spec(), Arc::new(services_list_handler));
|
||||||
|
registry.register(services_schema_spec(), Arc::new(services_schema_handler));
|
||||||
|
OperationEnv::local(registry)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn services_list_returns_operations() {
|
||||||
|
let env = make_env();
|
||||||
|
let result = env.invoke("services", "list", serde_json::json!({}));
|
||||||
|
assert!(result.result.is_ok());
|
||||||
|
let value = result.result.unwrap();
|
||||||
|
let ops = value.get("operations").unwrap().as_array().unwrap();
|
||||||
|
assert_eq!(ops.len(), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn services_schema_returns_spec() {
|
||||||
|
let env = make_env();
|
||||||
|
let result = env.invoke(
|
||||||
|
"services",
|
||||||
|
"schema",
|
||||||
|
serde_json::json!({"name": "/services/list"}),
|
||||||
|
);
|
||||||
|
assert!(result.result.is_ok());
|
||||||
|
let value = result.result.unwrap();
|
||||||
|
assert_eq!(value["name"], "/services/list");
|
||||||
|
assert_eq!(value["namespace"], "services");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn services_schema_missing_name() {
|
||||||
|
let env = make_env();
|
||||||
|
let result = env.invoke("services", "schema", serde_json::json!({}));
|
||||||
|
assert!(result.result.is_err());
|
||||||
|
let err = result.result.unwrap_err();
|
||||||
|
assert_eq!(err.code, "INVALID_INPUT");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn services_schema_not_found() {
|
||||||
|
let env = make_env();
|
||||||
|
let result = env.invoke(
|
||||||
|
"services",
|
||||||
|
"schema",
|
||||||
|
serde_json::json!({"name": "/nonexistent/op"}),
|
||||||
|
);
|
||||||
|
assert!(result.result.is_err());
|
||||||
|
let err = result.result.unwrap_err();
|
||||||
|
assert_eq!(err.code, "NOT_FOUND");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn services_list_spec_fields() {
|
||||||
|
let spec = services_list_spec();
|
||||||
|
assert_eq!(spec.name, "/services/list");
|
||||||
|
assert_eq!(spec.namespace, "services");
|
||||||
|
assert_eq!(spec.op_type, OperationType::Query);
|
||||||
|
assert!(!spec.access_control.has_restrictions());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn services_schema_spec_fields() {
|
||||||
|
let spec = services_schema_spec();
|
||||||
|
assert_eq!(spec.name, "/services/schema");
|
||||||
|
assert_eq!(spec.namespace, "services");
|
||||||
|
assert_eq!(spec.op_type, OperationType::Query);
|
||||||
|
assert!(!spec.access_control.has_restrictions());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn register_default_operations_adds_both() {
|
||||||
|
let mut registry = crate::call::OperationRegistry::new();
|
||||||
|
register_default_operations(&mut registry);
|
||||||
|
assert!(registry.lookup("/services/list").is_some());
|
||||||
|
assert!(registry.lookup("/services/schema").is_some());
|
||||||
|
assert_eq!(registry.list_operations().len(), 2);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -67,9 +67,16 @@ pub mod testutil;
|
|||||||
pub use auth::{AuthProtocol, AuthResult, AuthServiceImpl};
|
pub use auth::{AuthProtocol, AuthResult, AuthServiceImpl};
|
||||||
pub use auth::{AuthToken, ConfigIdentityProvider, Identity, IdentityProvider};
|
pub use auth::{AuthToken, ConfigIdentityProvider, Identity, IdentityProvider};
|
||||||
pub use call::{
|
pub use call::{
|
||||||
AccessControl, CallError, Handler, OperationContext, OperationEnv, OperationRegistry,
|
decode as decode_frame, decode_with_remainder as decode_frame_with_remainder,
|
||||||
OperationRegistryBuilder, OperationSpec, OperationType, ResponseEnvelope,
|
encode as encode_frame,
|
||||||
};
|
};
|
||||||
|
pub use call::{
|
||||||
|
register_default_operations, services_list_spec, services_schema_spec, AccessControl,
|
||||||
|
CallError, EventEnvelope, FrameDecodeError, Handler, OperationContext, OperationEnv,
|
||||||
|
OperationRegistry, OperationRegistryBuilder, OperationSpec, OperationType, PendingRequestMap,
|
||||||
|
ResponseEnvelope,
|
||||||
|
};
|
||||||
|
pub use call::{CALL_ABORTED, CALL_COMPLETED, CALL_ERROR, CALL_REQUESTED, CALL_RESPONDED};
|
||||||
pub use client::channel_manager::{ChannelManager, ForwardRequest};
|
pub use client::channel_manager::{ChannelManager, ForwardRequest};
|
||||||
pub use client::connect::{ClientSession, ConnectError, ConnectOptions, TransportMode};
|
pub use client::connect::{ClientSession, ConnectError, ConnectOptions, TransportMode};
|
||||||
pub use config::{
|
pub use config::{
|
||||||
|
|||||||
Reference in New Issue
Block a user