Compare commits
6 Commits
feat/call/
...
c77024cdf5
| Author | SHA1 | Date | |
|---|---|---|---|
| c77024cdf5 | |||
| 9e4d17b1c5 | |||
| c34b4d2df4 | |||
| 2905e55e72 | |||
| b673b7f317 | |||
| 4ac8d308e6 |
@@ -26,6 +26,7 @@ use alknet_call::registry::env::LocalOperationEnv;
|
|||||||
use alknet_call::registry::registration::OperationRegistry;
|
use alknet_call::registry::registration::OperationRegistry;
|
||||||
use alknet_core::auth::{AuthToken, Identity, IdentityProvider};
|
use alknet_core::auth::{AuthToken, Identity, IdentityProvider};
|
||||||
use alknet_core::types::Capabilities;
|
use alknet_core::types::Capabilities;
|
||||||
|
use futures::stream::BoxStream;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
|
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
@@ -70,11 +71,43 @@ impl GatewayDispatch {
|
|||||||
self.registry.invoke(&operation_name, input, context).await
|
self.registry.invoke(&operation_name, input, context).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn invoke_streaming(
|
||||||
|
&self,
|
||||||
|
identity: Option<Identity>,
|
||||||
|
op: &str,
|
||||||
|
input: Value,
|
||||||
|
) -> BoxStream<'static, ResponseEnvelope> {
|
||||||
|
let operation_name = strip_leading_slash(op).to_string();
|
||||||
|
let request_id = uuid::Uuid::new_v4().to_string();
|
||||||
|
let context = self.build_root_context_streaming(&request_id, &operation_name, identity);
|
||||||
|
self.registry
|
||||||
|
.invoke_streaming(&operation_name, input, context)
|
||||||
|
}
|
||||||
|
|
||||||
fn build_root_context(
|
fn build_root_context(
|
||||||
&self,
|
&self,
|
||||||
request_id: &str,
|
request_id: &str,
|
||||||
operation_name: &str,
|
operation_name: &str,
|
||||||
identity: Option<Identity>,
|
identity: Option<Identity>,
|
||||||
|
) -> OperationContext {
|
||||||
|
self.build_root_context_inner(request_id, operation_name, identity, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_root_context_streaming(
|
||||||
|
&self,
|
||||||
|
request_id: &str,
|
||||||
|
operation_name: &str,
|
||||||
|
identity: Option<Identity>,
|
||||||
|
) -> OperationContext {
|
||||||
|
self.build_root_context_inner(request_id, operation_name, identity, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_root_context_inner(
|
||||||
|
&self,
|
||||||
|
request_id: &str,
|
||||||
|
operation_name: &str,
|
||||||
|
identity: Option<Identity>,
|
||||||
|
bounded: bool,
|
||||||
) -> OperationContext {
|
) -> OperationContext {
|
||||||
let registration = self.registry.registration(operation_name);
|
let registration = self.registry.registration(operation_name);
|
||||||
let (composition_authority, capabilities, scoped_env) = match registration {
|
let (composition_authority, capabilities, scoped_env) = match registration {
|
||||||
@@ -97,7 +130,7 @@ impl GatewayDispatch {
|
|||||||
forwarded_for: None,
|
forwarded_for: None,
|
||||||
capabilities,
|
capabilities,
|
||||||
metadata: HashMap::new(),
|
metadata: HashMap::new(),
|
||||||
deadline: Some(Instant::now() + DEFAULT_TIMEOUT),
|
deadline: bounded.then(|| Instant::now() + DEFAULT_TIMEOUT),
|
||||||
scoped_env,
|
scoped_env,
|
||||||
env,
|
env,
|
||||||
abort_policy: AbortPolicy::default(),
|
abort_policy: AbortPolicy::default(),
|
||||||
@@ -117,10 +150,11 @@ mod tests {
|
|||||||
services_list_handler, services_list_spec, services_schema_handler, services_schema_spec,
|
services_list_handler, services_list_spec, services_schema_handler, services_schema_spec,
|
||||||
};
|
};
|
||||||
use alknet_call::registry::registration::{
|
use alknet_call::registry::registration::{
|
||||||
make_handler, HandlerKind, HandlerRegistration, OperationProvenance,
|
make_handler, make_streaming_handler, HandlerKind, HandlerRegistration, OperationProvenance,
|
||||||
};
|
};
|
||||||
use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility};
|
use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility};
|
||||||
use alknet_core::auth::AuthToken;
|
use alknet_core::auth::AuthToken;
|
||||||
|
use futures::stream::StreamExt;
|
||||||
use std::sync::Mutex as StdMutex;
|
use std::sync::Mutex as StdMutex;
|
||||||
|
|
||||||
struct StaticIdentityProvider {
|
struct StaticIdentityProvider {
|
||||||
@@ -235,6 +269,53 @@ mod tests {
|
|||||||
registry
|
registry
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn subscription_spec(name: &str, visibility: Visibility, acl: AccessControl) -> OperationSpec {
|
||||||
|
OperationSpec::new(
|
||||||
|
name,
|
||||||
|
OperationType::Subscription,
|
||||||
|
visibility,
|
||||||
|
serde_json::json!({}),
|
||||||
|
serde_json::json!({}),
|
||||||
|
vec![],
|
||||||
|
acl,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn echo_streaming_handler() -> HandlerKind {
|
||||||
|
HandlerKind::Stream(make_streaming_handler(|input, context| {
|
||||||
|
futures::stream::iter(vec![ResponseEnvelope::ok(context.request_id, input)])
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn registry_with_subscription(
|
||||||
|
name: &str,
|
||||||
|
visibility: Visibility,
|
||||||
|
acl: AccessControl,
|
||||||
|
) -> OperationRegistry {
|
||||||
|
let mut registry = OperationRegistry::new();
|
||||||
|
registry
|
||||||
|
.register(HandlerRegistration::new(
|
||||||
|
subscription_spec(name, visibility, acl),
|
||||||
|
echo_streaming_handler(),
|
||||||
|
OperationProvenance::Local,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
Capabilities::new(),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
registry
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn collect_stream(
|
||||||
|
mut stream: BoxStream<'static, ResponseEnvelope>,
|
||||||
|
) -> Vec<ResponseEnvelope> {
|
||||||
|
let mut out = Vec::new();
|
||||||
|
while let Some(env) = stream.next().await {
|
||||||
|
out.push(env);
|
||||||
|
}
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
fn dispatch(
|
fn dispatch(
|
||||||
registry: Arc<OperationRegistry>,
|
registry: Arc<OperationRegistry>,
|
||||||
provider: Arc<dyn IdentityProvider>,
|
provider: Arc<dyn IdentityProvider>,
|
||||||
@@ -548,4 +629,195 @@ mod tests {
|
|||||||
fn assert_concrete<T: Sized>() {}
|
fn assert_concrete<T: Sized>() {}
|
||||||
assert_concrete::<GatewayDispatch>();
|
assert_concrete::<GatewayDispatch>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_streaming_on_subscription_returns_handler_stream() {
|
||||||
|
let registry = Arc::new(registry_with_subscription(
|
||||||
|
"events/stream",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
));
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let stream = dp.invoke_streaming(None, "events/stream", serde_json::json!({ "v": 7 }));
|
||||||
|
let items = collect_stream(stream).await;
|
||||||
|
assert_eq!(items.len(), 1);
|
||||||
|
assert_eq!(items[0].result, Ok(serde_json::json!({ "v": 7 })));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_streaming_strips_leading_slash_from_operation_name() {
|
||||||
|
let registry = Arc::new(registry_with_subscription(
|
||||||
|
"events/stream",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
));
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let stream = dp.invoke_streaming(None, "/events/stream", serde_json::json!({}));
|
||||||
|
let items = collect_stream(stream).await;
|
||||||
|
assert_eq!(items.len(), 1);
|
||||||
|
assert!(items[0].result.is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_streaming_on_unknown_op_yields_single_not_found() {
|
||||||
|
let registry = Arc::new(OperationRegistry::new());
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let stream = dp.invoke_streaming(None, "no/such", serde_json::json!({}));
|
||||||
|
let items = collect_stream(stream).await;
|
||||||
|
assert_eq!(items.len(), 1);
|
||||||
|
match &items[0].result {
|
||||||
|
Err(e) => {
|
||||||
|
assert_eq!(e.code, "NOT_FOUND");
|
||||||
|
assert!(e.message.contains("no/such"));
|
||||||
|
}
|
||||||
|
other => panic!("expected NOT_FOUND, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_streaming_on_internal_op_from_external_yields_not_found() {
|
||||||
|
let registry = Arc::new(registry_with_subscription(
|
||||||
|
"secret/stream",
|
||||||
|
Visibility::Internal,
|
||||||
|
AccessControl::default(),
|
||||||
|
));
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let stream = dp.invoke_streaming(None, "secret/stream", serde_json::json!({}));
|
||||||
|
let items = collect_stream(stream).await;
|
||||||
|
assert_eq!(items.len(), 1);
|
||||||
|
match &items[0].result {
|
||||||
|
Err(e) => {
|
||||||
|
assert_eq!(e.code, "NOT_FOUND");
|
||||||
|
assert!(e.message.contains("secret/stream"));
|
||||||
|
}
|
||||||
|
other => panic!("expected NOT_FOUND, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_streaming_with_none_identity_and_restricted_op_yields_forbidden() {
|
||||||
|
let registry = Arc::new(registry_with_subscription(
|
||||||
|
"admin/stream",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl {
|
||||||
|
required_scopes: vec!["admin".to_string()],
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
));
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let stream = dp.invoke_streaming(None, "admin/stream", serde_json::json!({}));
|
||||||
|
let items = collect_stream(stream).await;
|
||||||
|
assert_eq!(items.len(), 1);
|
||||||
|
match &items[0].result {
|
||||||
|
Err(e) => {
|
||||||
|
assert_eq!(e.code, "FORBIDDEN");
|
||||||
|
assert_eq!(e.message, "authentication required");
|
||||||
|
}
|
||||||
|
other => panic!("expected FORBIDDEN, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_streaming_on_query_op_yields_invalid_operation_type() {
|
||||||
|
let registry = Arc::new(registry_with(
|
||||||
|
"echo/run",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
));
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let stream = dp.invoke_streaming(None, "echo/run", serde_json::json!({}));
|
||||||
|
let items = collect_stream(stream).await;
|
||||||
|
assert_eq!(items.len(), 1);
|
||||||
|
match &items[0].result {
|
||||||
|
Err(e) => assert_eq!(e.code, "INVALID_OPERATION_TYPE"),
|
||||||
|
other => panic!("expected INVALID_OPERATION_TYPE, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_on_subscription_op_returns_invalid_operation_type() {
|
||||||
|
let registry = Arc::new(registry_with_subscription(
|
||||||
|
"events/stream",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
));
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let response = dp
|
||||||
|
.invoke(None, "events/stream", serde_json::json!({}))
|
||||||
|
.await;
|
||||||
|
match response.result {
|
||||||
|
Err(e) => assert_eq!(e.code, "INVALID_OPERATION_TYPE"),
|
||||||
|
other => panic!("expected INVALID_OPERATION_TYPE, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_root_context_streaming_sets_deadline_none() {
|
||||||
|
let registry = Arc::new(registry_with_subscription(
|
||||||
|
"events/stream",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
));
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let ctx = dp.build_root_context_streaming("req-st-1", "events/stream", None);
|
||||||
|
assert!(!ctx.internal, "internal must be false for wire-ingress");
|
||||||
|
assert!(ctx.forwarded_for.is_none(), "forwarded_for must be None");
|
||||||
|
assert!(ctx.parent_request_id.is_none(), "root has no parent");
|
||||||
|
assert!(
|
||||||
|
ctx.deadline.is_none(),
|
||||||
|
"deadline must be None for streaming"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_root_context_streaming_carries_registration_bundle_fields() {
|
||||||
|
let authority = alknet_call::registry::context::CompositionAuthority::new(
|
||||||
|
"agent",
|
||||||
|
["fs:read".to_string()],
|
||||||
|
);
|
||||||
|
let scoped = ScopedPeerEnv::new(["fs/readFile"]);
|
||||||
|
let caps = Capabilities::new().with_api_key("google", "k".to_string());
|
||||||
|
|
||||||
|
let mut registry = OperationRegistry::new();
|
||||||
|
registry
|
||||||
|
.register(HandlerRegistration::new(
|
||||||
|
subscription_spec(
|
||||||
|
"agent/stream",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
),
|
||||||
|
echo_streaming_handler(),
|
||||||
|
OperationProvenance::Local,
|
||||||
|
Some(authority),
|
||||||
|
Some(scoped.clone()),
|
||||||
|
caps,
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
let registry = Arc::new(registry);
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let ctx = dp.build_root_context_streaming("req-st-2", "agent/stream", None);
|
||||||
|
assert!(ctx.handler_identity.is_some());
|
||||||
|
assert_eq!(ctx.handler_identity.as_ref().unwrap().label, "agent");
|
||||||
|
assert!(ctx.scoped_env.allows("fs/readFile"));
|
||||||
|
assert!(ctx.capabilities.get("google").is_some());
|
||||||
|
assert!(ctx.deadline.is_none());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,7 +17,8 @@ use axum::response::sse::Event;
|
|||||||
use axum::response::{IntoResponse, Json, Response, Sse};
|
use axum::response::{IntoResponse, Json, Response, Sse};
|
||||||
use axum::routing::{get, post};
|
use axum::routing::{get, post};
|
||||||
use axum::Router;
|
use axum::Router;
|
||||||
use futures::stream::{self, BoxStream, Stream};
|
use futures::stream::{self, BoxStream};
|
||||||
|
use futures::StreamExt;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde_json::{json, Value};
|
use serde_json::{json, Value};
|
||||||
|
|
||||||
@@ -163,18 +164,29 @@ pub(crate) async fn subscribe_handler(
|
|||||||
subscribe_stream_internal_error(request.operation)
|
subscribe_stream_internal_error(request.operation)
|
||||||
} else {
|
} else {
|
||||||
let dispatch = state.dispatch();
|
let dispatch = state.dispatch();
|
||||||
let envelope = dispatch
|
let envelope_stream =
|
||||||
.invoke(identity, &request.operation, request.input)
|
dispatch.invoke_streaming(identity, &request.operation, request.input);
|
||||||
.await;
|
subscribe_stream_from_envelope_stream(envelope_stream)
|
||||||
subscribe_stream_from_envelope(envelope)
|
|
||||||
};
|
};
|
||||||
Sse::new(stream)
|
Sse::new(stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type SubscribeStream = BoxStream<'static, Result<Event, Infallible>>;
|
pub type SubscribeStream = BoxStream<'static, Result<Event, Infallible>>;
|
||||||
|
|
||||||
fn subscribe_stream_from_envelope(envelope: ResponseEnvelope) -> SubscribeStream {
|
fn subscribe_stream_from_envelope_stream(
|
||||||
Box::pin(envelope_to_sse_stream(envelope))
|
stream: BoxStream<'static, ResponseEnvelope>,
|
||||||
|
) -> SubscribeStream {
|
||||||
|
Box::pin(stream.map(|envelope| match envelope.result {
|
||||||
|
Ok(output) => {
|
||||||
|
let data = serde_json::to_string(&output).unwrap_or_else(|_| "null".to_string());
|
||||||
|
Ok(Event::default().data(data))
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
let payload = serde_json::to_value(&error).unwrap_or(Value::Null);
|
||||||
|
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "null".to_string());
|
||||||
|
Ok(Event::default().event("error").data(data))
|
||||||
|
}
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn subscribe_stream_internal_error(operation: String) -> SubscribeStream {
|
fn subscribe_stream_internal_error(operation: String) -> SubscribeStream {
|
||||||
@@ -263,24 +275,6 @@ fn is_internal_op(registry: &OperationRegistry, operation: &str) -> bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn envelope_to_sse_stream(
|
|
||||||
envelope: ResponseEnvelope,
|
|
||||||
) -> impl Stream<Item = Result<Event, Infallible>> {
|
|
||||||
stream::once(async move {
|
|
||||||
match envelope.result {
|
|
||||||
Ok(output) => {
|
|
||||||
let data = serde_json::to_string(&output).unwrap_or_else(|_| "null".to_string());
|
|
||||||
Ok(Event::default().data(data))
|
|
||||||
}
|
|
||||||
Err(error) => {
|
|
||||||
let payload = serde_json::to_value(&error).unwrap_or(Value::Null);
|
|
||||||
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "null".to_string());
|
|
||||||
Ok(Event::default().event("error").data(data))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn error_event(operation: &str) -> Result<Event, Infallible> {
|
fn error_event(operation: &str) -> Result<Event, Infallible> {
|
||||||
let error = CallError::not_found(operation);
|
let error = CallError::not_found(operation);
|
||||||
let payload = serde_json::to_value(&error).unwrap_or(Value::Null);
|
let payload = serde_json::to_value(&error).unwrap_or(Value::Null);
|
||||||
@@ -295,7 +289,7 @@ mod tests {
|
|||||||
services_list_handler, services_list_spec, services_schema_handler, services_schema_spec,
|
services_list_handler, services_list_spec, services_schema_handler, services_schema_spec,
|
||||||
};
|
};
|
||||||
use alknet_call::registry::registration::{
|
use alknet_call::registry::registration::{
|
||||||
make_handler, HandlerKind, HandlerRegistration, OperationProvenance,
|
make_handler, make_streaming_handler, HandlerKind, HandlerRegistration, OperationProvenance,
|
||||||
};
|
};
|
||||||
use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType};
|
use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType};
|
||||||
use alknet_core::auth::{AuthToken, Identity};
|
use alknet_core::auth::{AuthToken, Identity};
|
||||||
@@ -425,6 +419,73 @@ mod tests {
|
|||||||
Arc::new(registry)
|
Arc::new(registry)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn subscription_spec(name: &str, visibility: Visibility, acl: AccessControl) -> OperationSpec {
|
||||||
|
OperationSpec::new(
|
||||||
|
name,
|
||||||
|
OperationType::Subscription,
|
||||||
|
visibility,
|
||||||
|
json!({}),
|
||||||
|
json!({}),
|
||||||
|
vec![],
|
||||||
|
acl,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn multi_event_streaming_handler(
|
||||||
|
outputs: Vec<Value>,
|
||||||
|
) -> alknet_call::registry::registration::StreamingHandler {
|
||||||
|
make_streaming_handler(move |_input, ctx| {
|
||||||
|
let request_id = ctx.request_id.clone();
|
||||||
|
let outputs = outputs.clone();
|
||||||
|
futures::stream::iter(
|
||||||
|
outputs
|
||||||
|
.into_iter()
|
||||||
|
.map(move |o| ResponseEnvelope::ok(request_id.clone(), o)),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn error_streaming_handler(error: CallError) -> HandlerKind {
|
||||||
|
HandlerKind::Stream(make_streaming_handler(move |_input, ctx| {
|
||||||
|
let request_id = ctx.request_id.clone();
|
||||||
|
let error = error.clone();
|
||||||
|
futures::stream::iter(vec![ResponseEnvelope::error(request_id, error)])
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn registry_with_subscription_stream(
|
||||||
|
name: &str,
|
||||||
|
outputs: Vec<Value>,
|
||||||
|
) -> Arc<OperationRegistry> {
|
||||||
|
let mut registry = OperationRegistry::new();
|
||||||
|
registry
|
||||||
|
.register(HandlerRegistration::new(
|
||||||
|
subscription_spec(name, Visibility::External, AccessControl::default()),
|
||||||
|
HandlerKind::Stream(multi_event_streaming_handler(outputs)),
|
||||||
|
OperationProvenance::Local,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
Capabilities::new(),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
Arc::new(registry)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn registry_with_subscription_error(name: &str, error: CallError) -> Arc<OperationRegistry> {
|
||||||
|
let mut registry = OperationRegistry::new();
|
||||||
|
registry
|
||||||
|
.register(HandlerRegistration::new(
|
||||||
|
subscription_spec(name, Visibility::External, AccessControl::default()),
|
||||||
|
error_streaming_handler(error),
|
||||||
|
OperationProvenance::Local,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
Capabilities::new(),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
Arc::new(registry)
|
||||||
|
}
|
||||||
|
|
||||||
fn registry_with_discovery_and_ops(
|
fn registry_with_discovery_and_ops(
|
||||||
inner_ops: Vec<HandlerRegistration>,
|
inner_ops: Vec<HandlerRegistration>,
|
||||||
) -> Arc<OperationRegistry> {
|
) -> Arc<OperationRegistry> {
|
||||||
@@ -771,15 +832,20 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn subscribe_streams_sse_data_event_until_completed() {
|
async fn subscribe_on_subscription_streams_multiple_data_frames() {
|
||||||
let router = build_router(registry_with_echo(), unused_provider());
|
let router = build_router(
|
||||||
|
registry_with_subscription_stream(
|
||||||
|
"events/stream",
|
||||||
|
vec![json!({ "n": 1 }), json!({ "n": 2 }), json!({ "n": 3 })],
|
||||||
|
),
|
||||||
|
unused_provider(),
|
||||||
|
);
|
||||||
let req = Request::builder()
|
let req = Request::builder()
|
||||||
.method("POST")
|
.method("POST")
|
||||||
.uri("/subscribe")
|
.uri("/subscribe")
|
||||||
.header("content-type", "application/json")
|
.header("content-type", "application/json")
|
||||||
.body(Body::from(
|
.body(Body::from(
|
||||||
serde_json::to_vec(&json!({ "operation": "echo/run", "input": { "v": 9 } }))
|
serde_json::to_vec(&json!({ "operation": "events/stream", "input": {} })).unwrap(),
|
||||||
.unwrap(),
|
|
||||||
))
|
))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let resp = router.oneshot(req).await.unwrap();
|
let resp = router.oneshot(req).await.unwrap();
|
||||||
@@ -797,10 +863,73 @@ mod tests {
|
|||||||
);
|
);
|
||||||
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
|
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
|
||||||
let body = String::from_utf8_lossy(&bytes);
|
let body = String::from_utf8_lossy(&bytes);
|
||||||
assert!(body.contains("data:"), "expected a data frame, got: {body}");
|
let data_frames = body.matches("data:").count();
|
||||||
|
assert_eq!(data_frames, 3, "expected 3 data frames, got: {body}");
|
||||||
|
assert!(body.contains("\"n\":1"), "expected n=1, got: {body}");
|
||||||
|
assert!(body.contains("\"n\":2"), "expected n=2, got: {body}");
|
||||||
|
assert!(body.contains("\"n\":3"), "expected n=3, got: {body}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn subscribe_on_subscription_that_yields_error_emits_error_event_then_closes() {
|
||||||
|
let router = build_router(
|
||||||
|
registry_with_subscription_error("events/fail", CallError::internal("handler blew up")),
|
||||||
|
unused_provider(),
|
||||||
|
);
|
||||||
|
let req = Request::builder()
|
||||||
|
.method("POST")
|
||||||
|
.uri("/subscribe")
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.body(Body::from(
|
||||||
|
serde_json::to_vec(&json!({ "operation": "events/fail", "input": {} })).unwrap(),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
let resp = router.oneshot(req).await.unwrap();
|
||||||
|
assert_eq!(resp.status(), StatusCode::OK);
|
||||||
|
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
|
||||||
|
let body = String::from_utf8_lossy(&bytes);
|
||||||
assert!(
|
assert!(
|
||||||
body.contains("\"v\":9"),
|
body.contains("event:error") || body.contains("event: error"),
|
||||||
"expected output payload, got: {body}"
|
"expected error event, got: {body}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
body.contains("INTERNAL"),
|
||||||
|
"expected INTERNAL code, got: {body}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
body.contains("handler blew up"),
|
||||||
|
"expected error message, got: {body}"
|
||||||
|
);
|
||||||
|
let data_frames = body.matches("data:").count();
|
||||||
|
assert_eq!(
|
||||||
|
data_frames, 1,
|
||||||
|
"expected exactly one data frame (the error payload), got: {body}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn subscribe_response_content_type_is_text_event_stream() {
|
||||||
|
let router = build_router(
|
||||||
|
registry_with_subscription_stream("events/stream", vec![json!({ "ok": true })]),
|
||||||
|
unused_provider(),
|
||||||
|
);
|
||||||
|
let req = Request::builder()
|
||||||
|
.method("POST")
|
||||||
|
.uri("/subscribe")
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.body(Body::from(
|
||||||
|
serde_json::to_vec(&json!({ "operation": "events/stream", "input": {} })).unwrap(),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
let resp = router.oneshot(req).await.unwrap();
|
||||||
|
let ctype = resp
|
||||||
|
.headers()
|
||||||
|
.get(axum::http::header::CONTENT_TYPE)
|
||||||
|
.map(|v| v.to_str().unwrap().to_string());
|
||||||
|
assert_eq!(
|
||||||
|
ctype.as_deref(),
|
||||||
|
Some("text/event-stream"),
|
||||||
|
"expected text/event-stream, got {ctype:?}"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -829,6 +958,59 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn subscribe_unknown_op_emits_not_found_error_event() {
|
||||||
|
let router = build_router(
|
||||||
|
registry_with_subscription_stream("events/stream", vec![json!({})]),
|
||||||
|
unused_provider(),
|
||||||
|
);
|
||||||
|
let req = Request::builder()
|
||||||
|
.method("POST")
|
||||||
|
.uri("/subscribe")
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.body(Body::from(
|
||||||
|
serde_json::to_vec(&json!({ "operation": "no/such", "input": {} })).unwrap(),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
let resp = router.oneshot(req).await.unwrap();
|
||||||
|
assert_eq!(resp.status(), StatusCode::OK);
|
||||||
|
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
|
||||||
|
let body = String::from_utf8_lossy(&bytes);
|
||||||
|
assert!(
|
||||||
|
body.contains("event:error") || body.contains("event: error"),
|
||||||
|
"expected error event, got: {body}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
body.contains("NOT_FOUND"),
|
||||||
|
"expected NOT_FOUND, got: {body}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn subscribe_on_query_op_emits_invalid_operation_type_error_event() {
|
||||||
|
let router = build_router(registry_with_echo(), unused_provider());
|
||||||
|
let req = Request::builder()
|
||||||
|
.method("POST")
|
||||||
|
.uri("/subscribe")
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.body(Body::from(
|
||||||
|
serde_json::to_vec(&json!({ "operation": "echo/run", "input": {} })).unwrap(),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
let resp = router.oneshot(req).await.unwrap();
|
||||||
|
assert_eq!(resp.status(), StatusCode::OK);
|
||||||
|
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
|
||||||
|
let body = String::from_utf8_lossy(&bytes);
|
||||||
|
assert!(
|
||||||
|
body.contains("event:error") || body.contains("event: error"),
|
||||||
|
"expected error event, got: {body}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
body.contains("INVALID_OPERATION_TYPE"),
|
||||||
|
"expected INVALID_OPERATION_TYPE, got: {body}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn is_internal_op_returns_false_for_unknown() {
|
fn is_internal_op_returns_false_for_unknown() {
|
||||||
let registry = OperationRegistry::new();
|
let registry = OperationRegistry::new();
|
||||||
|
|||||||
@@ -779,10 +779,11 @@ mod tests {
|
|||||||
let out = handle_inbound_envelope(&dp, &conn, request)
|
let out = handle_inbound_envelope(&dp, &conn, request)
|
||||||
.await
|
.await
|
||||||
.expect("response");
|
.expect("response");
|
||||||
assert_eq!(out.r#type, EVENT_ERROR);
|
assert_eq!(out.r#type, EVENT_RESPONDED);
|
||||||
|
assert_eq!(out.id, "sub-0");
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
out.payload.get("code"),
|
out.payload.get("output"),
|
||||||
Some(&serde_json::json!("INVALID_OPERATION_TYPE"))
|
Some(&serde_json::json!({ "n": 1 }))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1077,10 +1078,10 @@ mod tests {
|
|||||||
MockMsg::Binary(bytes) => {
|
MockMsg::Binary(bytes) => {
|
||||||
let env: EventEnvelope = serde_json::from_slice(&bytes).unwrap();
|
let env: EventEnvelope = serde_json::from_slice(&bytes).unwrap();
|
||||||
assert_eq!(env.id, "sub-ws-0");
|
assert_eq!(env.id, "sub-ws-0");
|
||||||
assert_eq!(env.r#type, EVENT_ERROR);
|
assert_eq!(env.r#type, EVENT_RESPONDED);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
env.payload.get("code"),
|
env.payload.get("output"),
|
||||||
Some(&serde_json::json!("INVALID_OPERATION_TYPE"))
|
Some(&serde_json::json!({ "n": 1 }))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
other => panic!("expected binary, got {other:?}"),
|
other => panic!("expected binary, got {other:?}"),
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
---
|
---
|
||||||
id: call/protocol/dispatch-streaming-branch
|
id: call/protocol/dispatch-streaming-branch
|
||||||
name: Wire Dispatcher::handle_stream streaming branch (Subscription → invoke_streaming → write each → call.completed)
|
name: Wire Dispatcher::handle_stream streaming branch (Subscription → invoke_streaming → write each → call.completed)
|
||||||
status: pending
|
status: completed
|
||||||
depends_on: [call/registry/invoke-streaming]
|
depends_on: [call/registry/invoke-streaming]
|
||||||
scope: narrow
|
scope: narrow
|
||||||
risk: medium
|
risk: medium
|
||||||
@@ -171,4 +171,4 @@ task; aborting the connection cancels it).
|
|||||||
|
|
||||||
## Summary
|
## Summary
|
||||||
|
|
||||||
> To be filled on completion
|
> Added DispatchResult enum (Once(ResponseEnvelope) | Stream(ResponseStream)) and Dispatcher::dispatch() branching on op_type (looked up via registry.registration). handle_stream matches on DispatchResult — the branch is visible there (spec framing). Streaming pump writes each ResponseEnvelope → EventEnvelope frame; call.completed on natural end only when !last_was_error (Err is terminal, no call.completed after). deadline: None for streaming branch. Abort via Drop (no new code). Existing Query/Mutation path unchanged. Added 7 unit tests (dispatch branch, deadline clearing, pump frames, error terminal, query unchanged, unknown op, abort drops stream). 306 tests pass.
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
---
|
---
|
||||||
id: http/gateway/invoke-streaming
|
id: http/gateway/invoke-streaming
|
||||||
name: Implement GatewayDispatch::invoke_streaming() returning BoxStream<ResponseEnvelope>
|
name: Implement GatewayDispatch::invoke_streaming() returning BoxStream<ResponseEnvelope>
|
||||||
status: pending
|
status: completed
|
||||||
depends_on: [call/registry/invoke-streaming]
|
depends_on: [call/registry/invoke-streaming]
|
||||||
scope: narrow
|
scope: narrow
|
||||||
risk: medium
|
risk: medium
|
||||||
@@ -127,4 +127,4 @@ don't duplicate the logic.
|
|||||||
|
|
||||||
## Summary
|
## Summary
|
||||||
|
|
||||||
> To be filled on completion
|
> Added GatewayDispatch::invoke_streaming() returning BoxStream<ResponseEnvelope>. Security axis provably identical to invoke() via shared build_root_context_inner(bounded: bool); extracted build_root_context_streaming for deadline: None (unbounded subscriptions). Calls OperationRegistry::invoke_streaming(). to_mcp untouched. Added 9 unit tests (all error paths + streaming dispatch + deadline: None verification). 243 tests pass.
|
||||||
Reference in New Issue
Block a user