4 Commits

Author SHA1 Message Date
c34b4d2df4 docs(call): mark call/protocol/dispatch-streaming-branch completed — server-side streaming dispatch 2026-07-02 09:59:00 +00:00
2905e55e72 Merge branch 'feat/call/protocol/dispatch-streaming-branch' into develop 2026-07-02 09:58:37 +00:00
b673b7f317 docs(http): mark http/gateway/invoke-streaming completed — GatewayDispatch::invoke_streaming() 2026-07-02 09:55:15 +00:00
4ac8d308e6 feat(http/gateway/invoke-streaming): add GatewayDispatch::invoke_streaming
Add the streaming analogue of invoke() returning BoxStream<ResponseEnvelope>.
Security invariants are identical to invoke() (internal: false,
forwarded_for: None, same capabilities/scoped_env/ACL) — shared via a
build_root_context_inner helper with a bounded flag. The streaming path
sets deadline: None (unbounded subscriptions, ADR-049 §6). Calls
OperationRegistry::invoke_streaming() (already on develop). to_mcp is
unchanged (MCP excludes Subscription, ADR-041).

Tests cover: subscription dispatch, leading-slash strip, unknown op
NOT_FOUND, internal op NOT_FOUND (not leaked), None identity FORBIDDEN,
Query op INVALID_OPERATION_TYPE, invoke() on Subscription returns
INVALID_OPERATION_TYPE (guard holds through gateway), and
build_root_context_streaming sets deadline: None while carrying the
registration bundle.
2026-07-02 09:54:14 +00:00
3 changed files with 278 additions and 6 deletions

View File

@@ -26,6 +26,7 @@ use alknet_call::registry::env::LocalOperationEnv;
use alknet_call::registry::registration::OperationRegistry;
use alknet_core::auth::{AuthToken, Identity, IdentityProvider};
use alknet_core::types::Capabilities;
use futures::stream::BoxStream;
use serde_json::Value;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
@@ -70,11 +71,43 @@ impl GatewayDispatch {
self.registry.invoke(&operation_name, input, context).await
}
pub fn invoke_streaming(
&self,
identity: Option<Identity>,
op: &str,
input: Value,
) -> BoxStream<'static, ResponseEnvelope> {
let operation_name = strip_leading_slash(op).to_string();
let request_id = uuid::Uuid::new_v4().to_string();
let context = self.build_root_context_streaming(&request_id, &operation_name, identity);
self.registry
.invoke_streaming(&operation_name, input, context)
}
fn build_root_context(
&self,
request_id: &str,
operation_name: &str,
identity: Option<Identity>,
) -> OperationContext {
self.build_root_context_inner(request_id, operation_name, identity, true)
}
fn build_root_context_streaming(
&self,
request_id: &str,
operation_name: &str,
identity: Option<Identity>,
) -> OperationContext {
self.build_root_context_inner(request_id, operation_name, identity, false)
}
fn build_root_context_inner(
&self,
request_id: &str,
operation_name: &str,
identity: Option<Identity>,
bounded: bool,
) -> OperationContext {
let registration = self.registry.registration(operation_name);
let (composition_authority, capabilities, scoped_env) = match registration {
@@ -97,7 +130,7 @@ impl GatewayDispatch {
forwarded_for: None,
capabilities,
metadata: HashMap::new(),
deadline: Some(Instant::now() + DEFAULT_TIMEOUT),
deadline: bounded.then(|| Instant::now() + DEFAULT_TIMEOUT),
scoped_env,
env,
abort_policy: AbortPolicy::default(),
@@ -117,10 +150,11 @@ mod tests {
services_list_handler, services_list_spec, services_schema_handler, services_schema_spec,
};
use alknet_call::registry::registration::{
make_handler, HandlerKind, HandlerRegistration, OperationProvenance,
make_handler, make_streaming_handler, HandlerKind, HandlerRegistration, OperationProvenance,
};
use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility};
use alknet_core::auth::AuthToken;
use futures::stream::StreamExt;
use std::sync::Mutex as StdMutex;
struct StaticIdentityProvider {
@@ -235,6 +269,53 @@ mod tests {
registry
}
fn subscription_spec(name: &str, visibility: Visibility, acl: AccessControl) -> OperationSpec {
OperationSpec::new(
name,
OperationType::Subscription,
visibility,
serde_json::json!({}),
serde_json::json!({}),
vec![],
acl,
)
}
fn echo_streaming_handler() -> HandlerKind {
HandlerKind::Stream(make_streaming_handler(|input, context| {
futures::stream::iter(vec![ResponseEnvelope::ok(context.request_id, input)])
}))
}
fn registry_with_subscription(
name: &str,
visibility: Visibility,
acl: AccessControl,
) -> OperationRegistry {
let mut registry = OperationRegistry::new();
registry
.register(HandlerRegistration::new(
subscription_spec(name, visibility, acl),
echo_streaming_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
))
.unwrap();
registry
}
async fn collect_stream(
mut stream: BoxStream<'static, ResponseEnvelope>,
) -> Vec<ResponseEnvelope> {
let mut out = Vec::new();
while let Some(env) = stream.next().await {
out.push(env);
}
out
}
fn dispatch(
registry: Arc<OperationRegistry>,
provider: Arc<dyn IdentityProvider>,
@@ -548,4 +629,195 @@ mod tests {
fn assert_concrete<T: Sized>() {}
assert_concrete::<GatewayDispatch>();
}
#[tokio::test]
async fn invoke_streaming_on_subscription_returns_handler_stream() {
let registry = Arc::new(registry_with_subscription(
"events/stream",
Visibility::External,
AccessControl::default(),
));
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let stream = dp.invoke_streaming(None, "events/stream", serde_json::json!({ "v": 7 }));
let items = collect_stream(stream).await;
assert_eq!(items.len(), 1);
assert_eq!(items[0].result, Ok(serde_json::json!({ "v": 7 })));
}
#[tokio::test]
async fn invoke_streaming_strips_leading_slash_from_operation_name() {
let registry = Arc::new(registry_with_subscription(
"events/stream",
Visibility::External,
AccessControl::default(),
));
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let stream = dp.invoke_streaming(None, "/events/stream", serde_json::json!({}));
let items = collect_stream(stream).await;
assert_eq!(items.len(), 1);
assert!(items[0].result.is_ok());
}
#[tokio::test]
async fn invoke_streaming_on_unknown_op_yields_single_not_found() {
let registry = Arc::new(OperationRegistry::new());
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let stream = dp.invoke_streaming(None, "no/such", serde_json::json!({}));
let items = collect_stream(stream).await;
assert_eq!(items.len(), 1);
match &items[0].result {
Err(e) => {
assert_eq!(e.code, "NOT_FOUND");
assert!(e.message.contains("no/such"));
}
other => panic!("expected NOT_FOUND, got {other:?}"),
}
}
#[tokio::test]
async fn invoke_streaming_on_internal_op_from_external_yields_not_found() {
let registry = Arc::new(registry_with_subscription(
"secret/stream",
Visibility::Internal,
AccessControl::default(),
));
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let stream = dp.invoke_streaming(None, "secret/stream", serde_json::json!({}));
let items = collect_stream(stream).await;
assert_eq!(items.len(), 1);
match &items[0].result {
Err(e) => {
assert_eq!(e.code, "NOT_FOUND");
assert!(e.message.contains("secret/stream"));
}
other => panic!("expected NOT_FOUND, got {other:?}"),
}
}
#[tokio::test]
async fn invoke_streaming_with_none_identity_and_restricted_op_yields_forbidden() {
let registry = Arc::new(registry_with_subscription(
"admin/stream",
Visibility::External,
AccessControl {
required_scopes: vec!["admin".to_string()],
..Default::default()
},
));
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let stream = dp.invoke_streaming(None, "admin/stream", serde_json::json!({}));
let items = collect_stream(stream).await;
assert_eq!(items.len(), 1);
match &items[0].result {
Err(e) => {
assert_eq!(e.code, "FORBIDDEN");
assert_eq!(e.message, "authentication required");
}
other => panic!("expected FORBIDDEN, got {other:?}"),
}
}
#[tokio::test]
async fn invoke_streaming_on_query_op_yields_invalid_operation_type() {
let registry = Arc::new(registry_with(
"echo/run",
Visibility::External,
AccessControl::default(),
));
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let stream = dp.invoke_streaming(None, "echo/run", serde_json::json!({}));
let items = collect_stream(stream).await;
assert_eq!(items.len(), 1);
match &items[0].result {
Err(e) => assert_eq!(e.code, "INVALID_OPERATION_TYPE"),
other => panic!("expected INVALID_OPERATION_TYPE, got {other:?}"),
}
}
#[tokio::test]
async fn invoke_on_subscription_op_returns_invalid_operation_type() {
let registry = Arc::new(registry_with_subscription(
"events/stream",
Visibility::External,
AccessControl::default(),
));
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let response = dp
.invoke(None, "events/stream", serde_json::json!({}))
.await;
match response.result {
Err(e) => assert_eq!(e.code, "INVALID_OPERATION_TYPE"),
other => panic!("expected INVALID_OPERATION_TYPE, got {other:?}"),
}
}
#[test]
fn build_root_context_streaming_sets_deadline_none() {
let registry = Arc::new(registry_with_subscription(
"events/stream",
Visibility::External,
AccessControl::default(),
));
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let ctx = dp.build_root_context_streaming("req-st-1", "events/stream", None);
assert!(!ctx.internal, "internal must be false for wire-ingress");
assert!(ctx.forwarded_for.is_none(), "forwarded_for must be None");
assert!(ctx.parent_request_id.is_none(), "root has no parent");
assert!(
ctx.deadline.is_none(),
"deadline must be None for streaming"
);
}
#[test]
fn build_root_context_streaming_carries_registration_bundle_fields() {
let authority = alknet_call::registry::context::CompositionAuthority::new(
"agent",
["fs:read".to_string()],
);
let scoped = ScopedPeerEnv::new(["fs/readFile"]);
let caps = Capabilities::new().with_api_key("google", "k".to_string());
let mut registry = OperationRegistry::new();
registry
.register(HandlerRegistration::new(
subscription_spec(
"agent/stream",
Visibility::External,
AccessControl::default(),
),
echo_streaming_handler(),
OperationProvenance::Local,
Some(authority),
Some(scoped.clone()),
caps,
))
.unwrap();
let registry = Arc::new(registry);
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let ctx = dp.build_root_context_streaming("req-st-2", "agent/stream", None);
assert!(ctx.handler_identity.is_some());
assert_eq!(ctx.handler_identity.as_ref().unwrap().label, "agent");
assert!(ctx.scoped_env.allows("fs/readFile"));
assert!(ctx.capabilities.get("google").is_some());
assert!(ctx.deadline.is_none());
}
}

View File

@@ -1,7 +1,7 @@
---
id: call/protocol/dispatch-streaming-branch
name: Wire Dispatcher::handle_stream streaming branch (Subscription → invoke_streaming → write each → call.completed)
status: pending
status: completed
depends_on: [call/registry/invoke-streaming]
scope: narrow
risk: medium
@@ -171,4 +171,4 @@ task; aborting the connection cancels it).
## Summary
> To be filled on completion
> Added DispatchResult enum (Once(ResponseEnvelope) | Stream(ResponseStream)) and Dispatcher::dispatch() branching on op_type (looked up via registry.registration). handle_stream matches on DispatchResult — the branch is visible there (spec framing). Streaming pump writes each ResponseEnvelope → EventEnvelope frame; call.completed on natural end only when !last_was_error (Err is terminal, no call.completed after). deadline: None for streaming branch. Abort via Drop (no new code). Existing Query/Mutation path unchanged. Added 7 unit tests (dispatch branch, deadline clearing, pump frames, error terminal, query unchanged, unknown op, abort drops stream). 306 tests pass.

View File

@@ -1,7 +1,7 @@
---
id: http/gateway/invoke-streaming
name: Implement GatewayDispatch::invoke_streaming() returning BoxStream<ResponseEnvelope>
status: pending
status: completed
depends_on: [call/registry/invoke-streaming]
scope: narrow
risk: medium
@@ -127,4 +127,4 @@ don't duplicate the logic.
## Summary
> To be filled on completion
> Added GatewayDispatch::invoke_streaming() returning BoxStream<ResponseEnvelope>. Security axis provably identical to invoke() via shared build_root_context_inner(bounded: bool); extracted build_root_context_streaming for deadline: None (unbounded subscriptions). Calls OperationRegistry::invoke_streaming(). to_mcp untouched. Added 9 unit tests (all error paths + streaming dispatch + deadline: None verification). 243 tests pass.