--- id: call/registry/invoke-streaming name: Implement OperationRegistry::invoke_streaming() returning ResponseStream status: completed depends_on: [call/registry/streaming-handler-handlerkind] scope: narrow risk: medium impact: component level: implementation --- ## Description Add `OperationRegistry::invoke_streaming()` — the streaming dispatch path that `Subscription` operations use. This is the counterpart to `invoke()`: same visibility + ACL checks, then dispatches to the `StreamingHandler` and returns the `ResponseStream`. Pre-handler errors (not-found, forbidden, `INVALID_OPERATION_TYPE` for a non-Subscription op) yield a single error `ResponseEnvelope` and end the stream. This task depends on `call/registry/streaming-handler-handlerkind` (which introduces `HandlerKind::Stream` and the `ResponseStream` alias). It adds only the `invoke_streaming()` method — no other changes. ### invoke_streaming() ```rust use futures::stream::{self, StreamExt}; impl OperationRegistry { /// Dispatch a Subscription operation. Returns a stream of /// ResponseEnvelopes. Pre-handler errors (not-found, forbidden, /// INVALID_OPERATION_TYPE for a non-Subscription op) yield a single /// error ResponseEnvelope and end the stream. pub fn invoke_streaming( &self, name: &str, input: Value, context: OperationContext, ) -> ResponseStream { let request_id = context.request_id.clone(); // 1. Look up registration let registration = match self.operations.get(name) { Some(r) => r, None => { return Box::pin(stream::once(async move { ResponseEnvelope::not_found(request_id, name) })); } }; // 2. Visibility check (same as invoke) if registration.spec.visibility == Visibility::Internal && !context.internal { return Box::pin(stream::once(async move { ResponseEnvelope::not_found(request_id, name) })); } // 3. ACL check (same as invoke) let acl = ®istration.spec.access_control; let identity = if context.internal { context.handler_identity.as_ref().and_then(|ca| ca.as_identity()) } else { context.identity.clone() }; if let AccessResult::Forbidden(message) = acl.check(identity.as_ref()) { return Box::pin(stream::once(async move { ResponseEnvelope::forbidden(request_id, message) })); } // 4. HandlerKind check — must be Stream for invoke_streaming let streaming_handler = match ®istration.handler { HandlerKind::Stream(h) => Arc::clone(h), HandlerKind::Once(_) => { return Box::pin(stream::once(async move { ResponseEnvelope::error( request_id, CallError::invalid_operation_type( "invoke_streaming() called on a Query/Mutation op; use invoke()" ), ) })); } }; // 5. Dispatch — the handler returns the stream streaming_handler(input, context) } } ``` The visibility + ACL checks are **identical** to `invoke()` — extract them into a private helper if it reduces duplication, but the spec requires the security axis to be provably identical between `invoke()` and `invoke_streaming()`. The two methods diverge only on the return shape (single envelope vs stream) and the handler-kind guard (Once vs Stream). ### Pre-handler errors as single-item streams A pre-handler error (not-found, forbidden, wrong kind) produces a `ResponseStream` that yields exactly one error `ResponseEnvelope` and then ends. This matches the single-response path's behavior, just on a stream — the caller (`Dispatcher::handle_stream` streaming branch, `GatewayDispatch::invoke_streaming`) drains the stream and writes frames; a one-item error stream produces one `call.error` frame and closes. Use `futures::stream::once(async move { ... })` to build these single-item streams. The error envelope carries the `request_id` from the context. ### What this task does NOT do - **No `OperationEnv::invoke_streaming()`.** Composition stays request/response-only (ADR-049). `OperationEnv::invoke()` errors on `Subscription` (handled in `streaming-handler-handlerkind` via the `HandlerKind::Stream` match in `LocalOperationEnv` → `registry.invoke()` and `OverlayOperationEnv` direct match). No streaming variant is added to the trait. - **No dispatch-loop wiring.** `Dispatcher::handle_stream` streaming branch is `call/protocol/dispatch-streaming-branch`. - **No gateway wiring.** `GatewayDispatch::invoke_streaming` is `http/gateway/invoke-streaming`. ## Acceptance Criteria - [ ] `OperationRegistry::invoke_streaming()` method exists - [ ] Returns `ResponseStream` (`Pin + Send>>`) - [ ] Not-found op → single-item stream with `NOT_FOUND` error envelope, then ends - [ ] Internal op from external call → single-item stream with `NOT_FOUND`, then ends - [ ] ACL denied → single-item stream with `FORBIDDEN`, then ends - [ ] `HandlerKind::Once` op (Query/Mutation) → single-item stream with `INVALID_OPERATION_TYPE`, then ends - [ ] `HandlerKind::Stream` op (Subscription) → dispatches the `StreamingHandler`, returns its stream - [ ] Visibility + ACL checks identical to `invoke()` (same authority switch: internal → handler_identity, external → identity) - [ ] Unit test: `invoke_streaming()` on a registered `Subscription` op yields the handler's stream items - [ ] Unit test: `invoke_streaming()` on unknown op yields one `NOT_FOUND` then ends - [ ] Unit test: `invoke_streaming()` on a `Query` op yields one `INVALID_OPERATION_TYPE` then ends - [ ] Unit test: `invoke_streaming()` on Internal op from external context yields one `NOT_FOUND` then ends - [ ] Unit test: `invoke_streaming()` ACL denied yields one `FORBIDDEN` then ends - [ ] Unit test: `invoke_streaming()` internal call uses handler_identity for ACL - [ ] `cargo test -p alknet-call` succeeds - [ ] `cargo clippy -p alknet-call --all-targets` succeeds with no warnings - [ ] `cargo fmt --check -p alknet-call` passes ## References - docs/architecture/decisions/049-streaming-handler-for-subscriptions.md — ADR-049 §3 (invoke_streaming), §4 (invoke errors on Subscription), §5 (OperationEnv request/response-only) - docs/architecture/crates/call/operation-registry.md — §OperationRegistry (invoke_streaming signature, pre-handler errors as single-item streams) ## Notes > The visibility + ACL checks MUST be identical to `invoke()` — the spec calls > this out explicitly: "invoke_streaming() performs the same visibility + ACL > checks as invoke()". Extract a shared helper if it helps, but the security > axis must be provably identical. Pre-handler errors become single-item streams > (one error envelope, then end) — this matches the single-response path's > behavior, just on a stream. Do NOT add `OperationEnv::invoke_streaming()` — > composition is request/response-only by design (ADR-049 §5); stream > composition is a handler-level concern. The `futures` crate's `stream::once` > and `StreamExt` are the tools for building single-item streams. ## Summary > Added OperationRegistry::invoke_streaming() in crates/alknet-call/src/registry/registration.rs — the streaming dispatch path for Subscription operations. Same visibility + ACL checks as invoke() (provably identical security axis), then dispatches the StreamingHandler and returns its ResponseStream. Pre-handler errors (not-found, forbidden, INVALID_OPERATION_TYPE for non-Subscription ops) yield a single error ResponseEnvelope via stream::once, then end. Added 6 unit tests covering all paths (subscription dispatch, unknown op, query op cross-kind error, internal op from external, ACL denied, internal call using handler_identity).