diff --git a/.gitignore b/.gitignore index 2c45399..ed2d55c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ target/ -node_modules/ \ No newline at end of file +node_modules/ +.worktrees/ \ No newline at end of file diff --git a/tasks/call/client/from-call-streaming-forwarding.md b/tasks/call/client/from-call-streaming-forwarding.md new file mode 100644 index 0000000..c8eddc4 --- /dev/null +++ b/tasks/call/client/from-call-streaming-forwarding.md @@ -0,0 +1,172 @@ +--- +id: call/client/from-call-streaming-forwarding +name: Implement from_call streaming forwarding handler (Subscription → CallConnection::subscribe → StreamingHandler) +status: pending +depends_on: [call/registry/streaming-handler-handlerkind] +scope: narrow +risk: medium +impact: component +level: implementation +--- + +## Description + +Branch `from_call`'s forwarding handler construction on `op_type` so that a +`Subscription` op discovered via `services/list` + `services/schema` registers a +`StreamingHandler` (`HandlerKind::Stream`) that calls +`CallConnection::subscribe()` and forwards the remote stream end-to-end. +`Query`/`Mutation` ops keep the existing `make_forwarding_handler()` (single +`call_with_payload()`, `HandlerKind::Once`). This closes the gap where a +`from_call`-imported `Subscription` truncated to the first value. + +This task depends on `call/registry/streaming-handler-handlerkind` (which +introduces `HandlerKind::Stream` and `make_streaming_handler`). The +`CallConnection::subscribe()` client-side path already works (it returns +`impl Stream`); this task wires it into the forwarding +handler. + +### The branch in build_bundles + +`build_bundles` currently constructs one `make_forwarding_handler()` per +discovered op and wraps in `HandlerKind::Once`. Branch on +`op_summary.op_type` (parsed from `services/schema`): + +- `Query` / `Mutation` → `make_forwarding_handler()` (existing), wrap in + `HandlerKind::Once` +- `Subscription` → `make_streaming_forwarding_handler()` (new), wrap in + `HandlerKind::Stream` + +The `op_type` is already parsed in `rebuild_spec_for` (it reads `schema.op_type` +and produces `OperationType::Subscription`). The `OpSummary` needs to carry the +`op_type` (or the spec's `op_type` is read from the constructed `spec`). Read +`spec.op_type` after `rebuild_spec_for` to decide the handler kind. + +### make_streaming_forwarding_handler + +```rust +fn make_streaming_forwarding_handler( + connection: Arc, + remote_name: String, + credentials_auth_token: Option, +) -> StreamingHandler { + use crate::registry::registration::make_streaming_handler; + make_streaming_handler(move |input, context| { + let connection = Arc::clone(&connection); + let remote_name = remote_name.clone(); + let auth_token = credentials_auth_token.clone(); + // The streaming forwarding handler calls subscribe() and forwards the + // remote stream. forwarded_for is populated from context.identity + // (ADR-032), same as the request/response forwarding handler. + async move { + // Build the payload (same as build_forwarded_payload, but for subscribe) + let payload = build_forwarded_payload(&remote_name, input, &context, auth_token.as_deref()); + // CallConnection::subscribe takes (operation_id, input); for the + // forwarded payload path, we need a subscribe_with_payload variant, + // OR we call subscribe(remote_name, input) and let it build the + // payload. The forwarded_for + auth_token need to be in the payload, + // so a subscribe_with_payload variant is needed (mirrors + // call_with_payload). Check if CallConnection::subscribe can accept + // a full payload — if not, add subscribe_with_payload(). + let stream = connection.subscribe_with_payload(payload).await; + // Map the impl Stream to BoxStream + Box::pin(stream) as ResponseStream + } + }) +} +``` + +**Coordinate with `CallConnection::subscribe`**: the existing +`subscribe(operation_id, input)` builds the payload internally and does NOT +populate `forwarded_for` or `auth_token`. The forwarding handler needs those +fields (ADR-032). Two options: + +1. Add `CallConnection::subscribe_with_payload(payload: Value)` (mirrors + `call_with_payload`) that takes a caller-constructed payload. The forwarding + handler builds the payload with `build_forwarded_payload` and calls + `subscribe_with_payload`. +2. Extend `subscribe()` to accept optional `forwarded_for` / `auth_token`. + +Option 1 mirrors the existing `call` / `call_with_payload` split and is cleaner. +Add `subscribe_with_payload()` alongside `subscribe()`. + +### forwarded_for on the streaming payload + +The streaming forwarding handler populates `forwarded_for` from +`context.identity` exactly as the request/response forwarding handler does +(ADR-032 §3) — reuse `build_forwarded_payload()`. The `auth_token` (hub's own +call-protocol token) is also populated identically. No new payload-construction +code; reuse the existing `build_forwarded_payload`. + +### Abort cascade (ADR-016 §6) + +The streaming forwarding handler's `parent_request_id` participates in the +abort cascade: if the parent is aborted, the cascade reaches this handler, +which sends `call.aborted` to the remote node; the remote node cascades to its +own descendants. Cross-node abort is transparent. The `subscribe_with_payload` +path registers the request in `PendingRequestMap` (the existing `subscribe()` +does this); abort handling is already wired. Verify the streaming forwarding +handler's stream is dropped on parent abort (the `SubscriptionStream`'s `Drop` +or the pending entry's removal handles it). + +### What this task does NOT do + +- **No `OperationEnv::invoke_streaming()`.** Composition is request/response-only. +- **No server-side dispatch changes.** The server-side streaming branch is + `call/protocol/dispatch-streaming-branch`. +- **No gateway changes.** The gateway streaming path is `http/gateway/invoke-streaming`. + +## Acceptance Criteria + +- [ ] `build_bundles` branches on `spec.op_type`: `Subscription` → streaming + forwarding handler (`HandlerKind::Stream`), `Query`/`Mutation` → existing + `HandlerKind::Once` +- [ ] `make_streaming_forwarding_handler()` constructs a `StreamingHandler` +- [ ] Streaming forwarding handler calls `CallConnection::subscribe_with_payload()` + (or `subscribe()` with the forwarded payload) and forwards the remote stream +- [ ] `CallConnection::subscribe_with_payload(payload)` exists (mirrors + `call_with_payload`) OR `subscribe()` accepts the forwarded payload +- [ ] `forwarded_for` populated from `context.identity` (ADR-032) on the + streaming payload (reuse `build_forwarded_payload`) +- [ ] `auth_token` populated when present (reuse `build_forwarded_payload`) +- [ ] Remote stream forwarded end-to-end: each `call.responded` → stream item, + `call.completed` → stream end, `call.aborted` → stream dropped +- [ ] No truncation, no first-value fallback +- [ ] `composition_authority: None`, `scoped_env: None` for FromCall streaming + leaves (same as Query/Mutation FromCall leaves) +- [ ] Unit test: `build_bundles` with a `Subscription` op produces a + `HandlerKind::Stream` registration +- [ ] Unit test: `build_bundles` with a `Query` op produces `HandlerKind::Once` + (unchanged) +- [ ] Unit test: `make_streaming_forwarding_handler` produces a `StreamingHandler` + that calls `subscribe_with_payload` (verify via payload capture, mirroring + the existing `forwarding_handler_populates_forwarded_for` test) +- [ ] Integration test: subscription forwarding streams remote events (if + feasible with mock connection; otherwise document the integration test as + deferred to a connection-level integration test) +- [ ] `cargo test -p alknet-call` succeeds +- [ ] `cargo clippy -p alknet-call --all-targets` succeeds with no warnings +- [ ] `cargo fmt --check -p alknet-call` passes + +## References + +- docs/architecture/decisions/049-streaming-handler-for-subscriptions.md — ADR-049 §8 (from_call stream forwarding) +- docs/architecture/crates/call/client-and-adapters.md — §from_call (handler branched on op_type; Subscription → StreamingHandler via subscribe()) +- docs/architecture/decisions/017-call-protocol-client-and-adapter-contract.md — ADR-017 §3 (from_call flow), §6 (cross-node abort) +- docs/architecture/decisions/032-forwarded-for-identity.md — ADR-032 §3 (forwarded_for population) + +## Notes + +> The client-side `CallConnection::subscribe()` already works — this task wires +> it into the forwarding handler. The main new piece is +> `subscribe_with_payload()` (mirroring `call_with_payload`) so the forwarding +> handler can populate `forwarded_for` + `auth_token`. Reuse +> `build_forwarded_payload` — no new payload-construction code. The abort +> cascade is already wired via `PendingRequestMap`; verify the stream drops on +> parent abort. The `OpSummary` / `build_bundles` needs the `op_type` to +> branch — read it from the constructed `spec.op_type` (already parsed by +> `rebuild_spec_for`). Cross-node abort is transparent via +> `parent_request_id` (ADR-016 §6). + +## Summary + +> To be filled on completion \ No newline at end of file diff --git a/tasks/call/protocol/dispatch-streaming-branch.md b/tasks/call/protocol/dispatch-streaming-branch.md new file mode 100644 index 0000000..d6a37de --- /dev/null +++ b/tasks/call/protocol/dispatch-streaming-branch.md @@ -0,0 +1,174 @@ +--- +id: call/protocol/dispatch-streaming-branch +name: Wire Dispatcher::handle_stream streaming branch (Subscription → invoke_streaming → write each → call.completed) +status: pending +depends_on: [call/registry/invoke-streaming] +scope: narrow +risk: medium +impact: component +level: implementation +--- + +## Description + +Wire the server-side streaming dispatch branch in +`Dispatcher::handle_stream` / `dispatch_requested`. When a `call.requested` +arrives for a `Subscription` op, the dispatcher must call +`OperationRegistry::invoke_streaming()` and pump the resulting +`ResponseStream` to the wire: each `Ok(value)` → `call.responded` frame, +`Err` → `call.error` frame (terminal), natural stream end → `call.completed` +frame. This is the server-side path that makes `Subscription` operations work +end-to-end — without it, a `StreamingHandler`-registered op had no server-side +dispatch path. + +This task depends on `call/registry/invoke-streaming` (which provides +`invoke_streaming()`). It adds the `op_type` branch to the dispatch path and +the stream-to-wire pump. + +### The branch + +`dispatch_requested` currently unconditionally calls `registry.invoke()` and +returns one `ResponseEnvelope`. It needs to know the `op_type` to branch. Two +options: + +1. **Look up the registration to get `op_type` before dispatching.** The + `build_root_context` already looks up the registration; expose `op_type` + from it. Then branch: `Subscription` → streaming path, `Query`/`Mutation` → + existing `invoke()` path. +2. **Return an enum from `dispatch_requested`** (`DispatchResult::Once(ResponseEnvelope)` + | `DispatchResult::Stream(ResponseStream)`) and let `handle_stream` match on + it for the wire-writing loop. + +Pick the cleaner option. Option 1 keeps `dispatch_requested` returning a +`ResponseEnvelope` for the Once path but needs a separate streaming entry point +(e.g., `dispatch_requested_streaming` returning `ResponseStream`). Option 2 +unifies the dispatch entry but changes the return type. The spec frames it as +"branches on `op_type`" in `handle_stream`, suggesting the branch lives in the +dispatch layer. Document the choice. + +### Streaming dispatch path + +For a `Subscription` op: + +```rust +// In dispatch_requested (or a new dispatch_requested_streaming): +let context = self.build_root_context(...); +// deadline: None for subscriptions (unbounded — ADR-049 §6, call-protocol Timeouts) +// The build_root_context sets a 30s deadline; for the streaming path, set +// deadline to None AFTER construction (or pass a flag). The spec says +// "deadline: None for subscriptions (unbounded)". +let stream = self.registry.invoke_streaming(&operation_name, input, context); +stream // ResponseStream — pumped by handle_stream +``` + +### handle_stream streaming pump + +In `handle_stream`, after dispatching, if the result is a stream: + +```rust +// Read the ResponseStream, write each envelope as an EventEnvelope frame +let mut stream = tokio_stream_into_response_stream(...); // or use StreamExt +while let Some(envelope) = stream.next().await { + let event: EventEnvelope = envelope.into(); + if let Err(err) = writer.write_frame(&event).await { + warn!(error = %err, "failed to write streaming frame; closing stream"); + break; + } + // If the envelope was an error (Err result), the stream ends after it + // (the StreamingHandler's contract: Err is terminal). The stream's own + // end (None) triggers call.completed below. +} +// Natural stream end → write call.completed +let completed = EventEnvelope::completed(&request_id); +if let Err(err) = writer.write_frame(&completed).await { + warn!(error = %err, "failed to write call.completed"); +} +``` + +The `ResponseEnvelope → EventEnvelope` conversion (`into()`) already exists and +produces `call.responded` for `Ok` and `call.error` for `Err`. The +`call.completed` frame is written once when the stream ends naturally (not on +error — an `Err` envelope is terminal, the stream ends after it, and we do NOT +write `call.completed` after a `call.error`; the stream's `None` after an error +is not a "natural end"). Track whether the last envelope was an error to decide +whether to write `call.completed`. Alternatively, the `StreamingHandler`'s +contract is: `Err` ends the stream (the handler's stream yields the error then +`None`), so after the loop, only write `call.completed` if the stream did not +end on an error. Simplest correct approach: write `call.completed` only on +natural end (the stream returned `None` without the last item being an `Err`). +Track a `last_was_error` flag. + +### deadline: None for subscriptions + +`build_root_context` sets `deadline: Some(now + 30s)`. For the streaming path, +the spec says `deadline: None` (unbounded — subscriptions are long-running). Set +`context.deadline = None` after `build_root_context` for the streaming branch, +or add a parameter to `build_root_context`. The deadline bounds the +request/response call tree; a subscription has no such bound. Document this. + +### Abort cascade (ADR-016) + +If `call.aborted` arrives for a streaming request ID, the stream is dropped +(Rust `Drop` releases the handler's resources). The existing `handle_abort` +path already removes the pending entry and cascades. For the streaming branch, +the stream future being dropped (when the `handle_stream` task is cancelled or +the `call.aborted` is processed) releases the handler's resources via `Drop`. +No new abort code is needed — the existing `handle_abort` + the stream's `Drop` +handle it. Verify the streaming pump task is cancellable (it's a `tokio::spawn` +task; aborting the connection cancels it). + +### What this task does NOT do + +- **No client-side changes.** The client `CallConnection::subscribe()` already + works (it reads `call.responded` events until `call.completed`). This task is + server-side only. +- **No gateway changes.** `GatewayDispatch::invoke_streaming` is + `http/gateway/invoke-streaming`. + +## Acceptance Criteria + +- [ ] `dispatch_requested` (or a new `dispatch_requested_streaming`) branches on + `op_type`: `Subscription` → `invoke_streaming()`, `Query`/`Mutation` → + `invoke()` (existing) +- [ ] `handle_stream` pumps the `ResponseStream` for the streaming branch: + each `ResponseEnvelope` → `EventEnvelope` frame +- [ ] Natural stream end → `call.completed` frame written +- [ ] `Err` envelope → `call.error` frame written, stream ends after it (no + `call.completed` after an error) +- [ ] `deadline: None` for the streaming branch (unbounded subscriptions) +- [ ] Abort: `call.aborted` for a streaming request drops the stream (Drop + releases resources; existing `handle_abort` handles the pending entry) +- [ ] Existing `Query`/`Mutation` dispatch path unchanged (one + `call.responded`/`call.error` frame, no `call.completed`) +- [ ] Unit test: `Subscription` op dispatch → multiple `call.responded` frames + + `call.completed` on stream end +- [ ] Unit test: `Subscription` op handler yields `Err` → one `call.error` + frame, no `call.completed` after +- [ ] Unit test: `Query` op dispatch unchanged (one frame, no `call.completed`) +- [ ] Unit test: `call.aborted` for streaming request → stream dropped +- [ ] `cargo test -p alknet-call` succeeds +- [ ] `cargo clippy -p alknet-call --all-targets` succeeds with no warnings +- [ ] `cargo fmt --check -p alknet-call` passes + +## References + +- docs/architecture/decisions/049-streaming-handler-for-subscriptions.md — ADR-049 §6 (server-side dispatch branches on op_type) +- docs/architecture/crates/call/call-protocol.md — §CallAdapter Stream Handling (streaming branch: invoke_streaming → write each → call.completed; deadline: None; abort cascade) +- docs/architecture/decisions/016-abort-cascade-for-nested-calls.md — ADR-016 (stream drop on abort) + +## Notes + +> The streaming pump is a straightforward `while let Some(envelope) = stream.next().await` +> loop — not a complex abstraction. The tricky part is the `call.completed` +> semantics: write it on natural stream end, NOT after an `Err` (which is +> terminal). Track whether the last envelope was an error. `deadline: None` for +> subscriptions is a spec requirement — the 30s request/response deadline does +> not bound a long-running subscription. The abort cascade needs no new code: +> dropping the stream future (via task cancellation or `handle_abort`) releases +> the handler's resources through Rust's `Drop`. Pick the dispatch-entry shape +> (separate streaming method vs unified enum return) and document it — the spec +> frames it as a branch in `handle_stream`, so the branch should be visible there. + +## Summary + +> To be filled on completion \ No newline at end of file diff --git a/tasks/call/registry/invoke-streaming.md b/tasks/call/registry/invoke-streaming.md new file mode 100644 index 0000000..9393a9d --- /dev/null +++ b/tasks/call/registry/invoke-streaming.md @@ -0,0 +1,170 @@ +--- +id: call/registry/invoke-streaming +name: Implement OperationRegistry::invoke_streaming() returning ResponseStream +status: pending +depends_on: [call/registry/streaming-handler-handlerkind] +scope: narrow +risk: medium +impact: component +level: implementation +--- + +## Description + +Add `OperationRegistry::invoke_streaming()` — the streaming dispatch path that +`Subscription` operations use. This is the counterpart to `invoke()`: same +visibility + ACL checks, then dispatches to the `StreamingHandler` and returns +the `ResponseStream`. Pre-handler errors (not-found, forbidden, +`INVALID_OPERATION_TYPE` for a non-Subscription op) yield a single error +`ResponseEnvelope` and end the stream. + +This task depends on `call/registry/streaming-handler-handlerkind` (which +introduces `HandlerKind::Stream` and the `ResponseStream` alias). It adds only +the `invoke_streaming()` method — no other changes. + +### invoke_streaming() + +```rust +use futures::stream::{self, StreamExt}; + +impl OperationRegistry { + /// Dispatch a Subscription operation. Returns a stream of + /// ResponseEnvelopes. Pre-handler errors (not-found, forbidden, + /// INVALID_OPERATION_TYPE for a non-Subscription op) yield a single + /// error ResponseEnvelope and end the stream. + pub fn invoke_streaming( + &self, + name: &str, + input: Value, + context: OperationContext, + ) -> ResponseStream { + let request_id = context.request_id.clone(); + + // 1. Look up registration + let registration = match self.operations.get(name) { + Some(r) => r, + None => { + return Box::pin(stream::once(async move { + ResponseEnvelope::not_found(request_id, name) + })); + } + }; + + // 2. Visibility check (same as invoke) + if registration.spec.visibility == Visibility::Internal && !context.internal { + return Box::pin(stream::once(async move { + ResponseEnvelope::not_found(request_id, name) + })); + } + + // 3. ACL check (same as invoke) + let acl = ®istration.spec.access_control; + let identity = if context.internal { + context.handler_identity.as_ref().and_then(|ca| ca.as_identity()) + } else { + context.identity.clone() + }; + if let AccessResult::Forbidden(message) = acl.check(identity.as_ref()) { + return Box::pin(stream::once(async move { + ResponseEnvelope::forbidden(request_id, message) + })); + } + + // 4. HandlerKind check — must be Stream for invoke_streaming + let streaming_handler = match ®istration.handler { + HandlerKind::Stream(h) => Arc::clone(h), + HandlerKind::Once(_) => { + return Box::pin(stream::once(async move { + ResponseEnvelope::error( + request_id, + CallError::invalid_operation_type( + "invoke_streaming() called on a Query/Mutation op; use invoke()" + ), + ) + })); + } + }; + + // 5. Dispatch — the handler returns the stream + streaming_handler(input, context) + } +} +``` + +The visibility + ACL checks are **identical** to `invoke()` — extract them into +a private helper if it reduces duplication, but the spec requires the security +axis to be provably identical between `invoke()` and `invoke_streaming()`. The +two methods diverge only on the return shape (single envelope vs stream) and +the handler-kind guard (Once vs Stream). + +### Pre-handler errors as single-item streams + +A pre-handler error (not-found, forbidden, wrong kind) produces a +`ResponseStream` that yields exactly one error `ResponseEnvelope` and then ends. +This matches the single-response path's behavior, just on a stream — the caller +(`Dispatcher::handle_stream` streaming branch, `GatewayDispatch::invoke_streaming`) +drains the stream and writes frames; a one-item error stream produces one +`call.error` frame and closes. + +Use `futures::stream::once(async move { ... })` to build these single-item +streams. The error envelope carries the `request_id` from the context. + +### What this task does NOT do + +- **No `OperationEnv::invoke_streaming()`.** Composition stays + request/response-only (ADR-049). `OperationEnv::invoke()` errors on + `Subscription` (handled in `streaming-handler-handlerkind` via the + `HandlerKind::Stream` match in `LocalOperationEnv` → `registry.invoke()` and + `OverlayOperationEnv` direct match). No streaming variant is added to the + trait. +- **No dispatch-loop wiring.** `Dispatcher::handle_stream` streaming branch is + `call/protocol/dispatch-streaming-branch`. +- **No gateway wiring.** `GatewayDispatch::invoke_streaming` is + `http/gateway/invoke-streaming`. + +## Acceptance Criteria + +- [ ] `OperationRegistry::invoke_streaming()` method exists +- [ ] Returns `ResponseStream` (`Pin + Send>>`) +- [ ] Not-found op → single-item stream with `NOT_FOUND` error envelope, then ends +- [ ] Internal op from external call → single-item stream with `NOT_FOUND`, then ends +- [ ] ACL denied → single-item stream with `FORBIDDEN`, then ends +- [ ] `HandlerKind::Once` op (Query/Mutation) → single-item stream with + `INVALID_OPERATION_TYPE`, then ends +- [ ] `HandlerKind::Stream` op (Subscription) → dispatches the `StreamingHandler`, + returns its stream +- [ ] Visibility + ACL checks identical to `invoke()` (same authority switch: + internal → handler_identity, external → identity) +- [ ] Unit test: `invoke_streaming()` on a registered `Subscription` op yields + the handler's stream items +- [ ] Unit test: `invoke_streaming()` on unknown op yields one `NOT_FOUND` then ends +- [ ] Unit test: `invoke_streaming()` on a `Query` op yields one + `INVALID_OPERATION_TYPE` then ends +- [ ] Unit test: `invoke_streaming()` on Internal op from external context yields + one `NOT_FOUND` then ends +- [ ] Unit test: `invoke_streaming()` ACL denied yields one `FORBIDDEN` then ends +- [ ] Unit test: `invoke_streaming()` internal call uses handler_identity for ACL +- [ ] `cargo test -p alknet-call` succeeds +- [ ] `cargo clippy -p alknet-call --all-targets` succeeds with no warnings +- [ ] `cargo fmt --check -p alknet-call` passes + +## References + +- docs/architecture/decisions/049-streaming-handler-for-subscriptions.md — ADR-049 §3 (invoke_streaming), §4 (invoke errors on Subscription), §5 (OperationEnv request/response-only) +- docs/architecture/crates/call/operation-registry.md — §OperationRegistry (invoke_streaming signature, pre-handler errors as single-item streams) + +## Notes + +> The visibility + ACL checks MUST be identical to `invoke()` — the spec calls +> this out explicitly: "invoke_streaming() performs the same visibility + ACL +> checks as invoke()". Extract a shared helper if it helps, but the security +> axis must be provably identical. Pre-handler errors become single-item streams +> (one error envelope, then end) — this matches the single-response path's +> behavior, just on a stream. Do NOT add `OperationEnv::invoke_streaming()` — +> composition is request/response-only by design (ADR-049 §5); stream +> composition is a handler-level concern. The `futures` crate's `stream::once` +> and `StreamExt` are the tools for building single-item streams. + +## Summary + +> To be filled on completion \ No newline at end of file diff --git a/tasks/call/registry/streaming-handler-handlerkind.md b/tasks/call/registry/streaming-handler-handlerkind.md new file mode 100644 index 0000000..550008a --- /dev/null +++ b/tasks/call/registry/streaming-handler-handlerkind.md @@ -0,0 +1,256 @@ +--- +id: call/registry/streaming-handler-handlerkind +name: Introduce StreamingHandler, HandlerKind, ResponseStream types and migrate HandlerRegistration to HandlerKind +status: pending +depends_on: [] +scope: broad +risk: medium +impact: component +level: implementation +--- + +## Description + +ADR-049 restores the streaming handler path that the Rust port dropped when it +collapsed the TS `OperationHandler` / `SubscriptionHandler` union into a single +`Handler`. This task introduces the new types (`StreamingHandler`, `HandlerKind`, +`ResponseStream`, `make_streaming_handler`), adds the `INVALID_OPERATION_TYPE` +protocol error code, changes `HandlerRegistration.handler` from `Handler` to +`HandlerKind`, updates the builder to absorb the wrapping, adds registration-time +validation, updates `invoke()` to error on `Stream`, updates the overlay env to +match on `HandlerKind`, and migrates **every existing construction site** to wrap +in `HandlerKind::Once`. + +This is the foundational breaking change — all downstream streaming tasks depend +on it. It is broad in surface area (touches `registration.rs`, `wire.rs`, +`connection.rs`, and every test/adapter that constructs a `HandlerRegistration`) +but each individual change is mechanical. The goal: after this task, the codebase +compiles with two handler kinds, `Query`/`Mutation` ops work exactly as before +(wrapped in `HandlerKind::Once`), and `Subscription` ops are rejected by `invoke()` +with `INVALID_OPERATION_TYPE` (the streaming dispatch path `invoke_streaming()` +is added in `call/registry/invoke-streaming`). + +### New types (registration.rs) + +```rust +use futures::stream::Stream; + +/// Streaming handler — Subscription operations. Returns a stream of +/// ResponseEnvelopes: each Ok(value) → call.responded, an Err → call.error +/// (terminal — stream ends), natural stream end → call.completed. +pub type StreamingHandler = Arc< + dyn Fn(Value, OperationContext) + -> Pin + Send>> + + Send + Sync, +>; + +/// Type alias for the boxed stream shape used by `invoke_streaming()` and +/// `StreamingHandler` return values. `futures::stream::BoxStream<'static, T>` +/// = `Pin + Send>>` — the concrete library is a +/// two-way-door implementation detail (ADR-049); the alias exists so the two +/// spellings refer to the same type. +pub type ResponseStream = Pin + Send>>; + +/// Which dispatch path a handler uses — locked by ADR-049. Validated against +/// `spec.op_type` at registration: Query/Mutation → Once; Subscription → Stream. +/// Mismatch is a startup error. +pub enum HandlerKind { + Once(Handler), + Stream(StreamingHandler), +} +``` + +`make_streaming_handler()` helper (analogue of `make_handler()`): + +```rust +pub fn make_streaming_handler(f: S) -> StreamingHandler +where + S: Fn(Value, OperationContext) -> St + Send + Sync + 'static, + St: Stream + Send + 'static, +{ + Arc::new(move |input, context| Box::pin(f(input, context))) +} +``` + +### INVALID_OPERATION_TYPE error code (wire.rs) + +Add the sixth protocol-level error code to `CallError`: + +```rust +pub fn invalid_operation_type(message: impl Into) -> Self { + Self::new("INVALID_OPERATION_TYPE", message, false) +} +``` + +`retryable: false`, `details: None`. This is a permanent client-side programming +error (wrong dispatch path for the operation's type), not a transient failure. +Clients should treat unknown codes as `INTERNAL` with `retryable: false` (the +existing rule); `INVALID_OPERATION_TYPE` is distinct from `INVALID_INPUT` (schema +mismatch) and `INTERNAL` (handler failure). + +### HandlerRegistration.handler → HandlerKind + +```rust +pub struct HandlerRegistration { + pub spec: OperationSpec, + pub handler: HandlerKind, // was: Handler + pub provenance: OperationProvenance, + pub composition_authority: Option, + pub scoped_env: Option, + pub capabilities: Capabilities, +} +``` + +`HandlerRegistration::new()` takes `handler: HandlerKind` (callers wrap in +`HandlerKind::Once(...)` or `HandlerKind::Stream(...)`). + +### Builder absorbs HandlerKind wrapping + +The builder inspects `spec.op_type` and wraps automatically — `.with_local()` +and `.with_leaf()` / `.with_leaf_provenance()` take the raw `Handler` (for +Query/Mutation) and wrap it in `HandlerKind::Once`. For Subscription ops, add a +parallel method pair (`.with_local_streaming()` / `.with_leaf_streaming()`) that +takes a `StreamingHandler` and wraps in `HandlerKind::Stream`. The builder +validates `handler` kind matches `spec.op_type` and reports mismatch as a +startup error. + +The two-method-pair approach is preferred over a typed enum input because it +keeps the common case (Query/Mutation, `Handler`) on the existing signatures +and makes the streaming case explicit at the call site. Document this choice. + +### register() validation + +`OperationRegistry::register()` validates that `handler` is the right +`HandlerKind` for `spec.op_type`: + +- `Query` / `Mutation` → `HandlerKind::Once` +- `Subscription` → `HandlerKind::Stream` + +Mismatch is a startup error. Change `register()` to return `Result<(), String>` +(preferred — startup errors should be explicit, not panics) with a clear message +(`"handler kind mismatch: {op_type} requires HandlerKind::{Once|Stream}"`). +Update all `register()` call sites to handle the Result (the builder's `store()` +and tests). Alternatively panic with a clear message — but `Result` is cleaner +for a startup error and matches the `AdapterError` pattern used elsewhere. + +### invoke() errors on Stream + +`OperationRegistry::invoke()` matches on `registration.handler`: + +- `HandlerKind::Once(handler)` → existing dispatch path (unchanged) +- `HandlerKind::Stream(_)` → return `ResponseEnvelope::error(request_id, + CallError::invalid_operation_type("invoke() called on a Subscription op; + use invoke_streaming()"))` + +This is the guard that prevents a streaming op from being silently truncated +through the request/response path. The `invoke_streaming()` method itself is +added in `call/registry/invoke-streaming`. + +### OverlayOperationEnv (connection.rs) + +`OverlayOperationEnv::invoke_with_policy` dispatches directly (it does NOT call +`registry.invoke()` — it reads the handler from the overlay and calls it). After +the type change, `registration.handler` is `HandlerKind`, so the env must match: + +- `HandlerKind::Once(handler)` → `handler(input, context).await` (existing path) +- `HandlerKind::Stream(_)` → return `ResponseEnvelope::error(parent.request_id, + CallError::invalid_operation_type("OperationEnv::invoke() called on a + Subscription op; composition is request/response-only"))` + +`LocalOperationEnv` calls `self.registry.invoke()` which already errors on +`Stream` — no change needed there. `PeerCompositeEnv` delegates to +session/connection/base envs — no change needed there either. + +### Migration of existing construction sites + +Every site that constructs `HandlerRegistration::new(spec, handler, ...)` must +wrap `handler` in `HandlerKind::Once(handler)`. This is mechanical. Sites +include (non-exhaustive — find them all with a grep for `HandlerRegistration::new`): + +- `crates/alknet-call/src/registry/registration.rs` (tests) +- `crates/alknet-call/src/registry/env.rs` (tests) +- `crates/alknet-call/src/registry/discovery.rs` (`services_list_handler`, + `services_schema_handler` construction — these are `Query` ops) +- `crates/alknet-call/src/protocol/dispatch.rs` (tests) +- `crates/alknet-call/src/protocol/connection.rs` (tests, + `imported_registration` helper) +- `crates/alknet-call/src/client/from_call.rs` (`build_bundles`, + `make_forwarding_handler`, tests) +- `crates/alknet-http/src/gateway/dispatch.rs` (tests) +- `crates/alknet-http/src/server/gateway_routes.rs` (tests) +- `crates/alknet-http/src/adapters/from_openapi.rs` (`build_registration`) +- `crates/alknet-http/src/adapters/from_mcp/mod.rs` (`build_registration`) + +The builder sites (`with_local`, `with_leaf`, `with_leaf_provenance`, `with`) +are updated by the builder-absorbs-wrapping change above — their callers pass +raw `Handler` and the builder wraps. Direct `HandlerRegistration::new()` calls +need the explicit `HandlerKind::Once(...)` wrap. + +## Acceptance Criteria + +- [ ] `StreamingHandler` type alias in `registration.rs` +- [ ] `ResponseStream` type alias (`Pin + Send>>`) +- [ ] `HandlerKind` enum with `Once(Handler)` and `Stream(StreamingHandler)` variants +- [ ] `make_streaming_handler()` helper compiles and works +- [ ] `CallError::invalid_operation_type()` constructor in `wire.rs` +- [ ] `HandlerRegistration.handler` field is `HandlerKind` (not `Handler`) +- [ ] `HandlerRegistration::new()` takes `HandlerKind` +- [ ] Builder `with_local` / `with_leaf` / `with_leaf_provenance` wrap `Handler` in + `HandlerKind::Once` for Query/Mutation +- [ ] Builder `with_local_streaming` / `with_leaf_streaming` wrap `StreamingHandler` + in `HandlerKind::Stream` for Subscription +- [ ] Builder validates `handler` kind matches `spec.op_type` — mismatch is a + startup error +- [ ] `OperationRegistry::register()` validates `HandlerKind` matches `op_type` + (returns `Result<(), String>` or panics with clear message) +- [ ] `OperationRegistry::invoke()` dispatches `HandlerKind::Once` (existing path) +- [ ] `OperationRegistry::invoke()` returns `INVALID_OPERATION_TYPE` for + `HandlerKind::Stream` +- [ ] `OverlayOperationEnv::invoke_with_policy` matches on `HandlerKind`: + `Once` → dispatch, `Stream` → `INVALID_OPERATION_TYPE` +- [ ] `LocalOperationEnv` propagates `INVALID_OPERATION_TYPE` via `registry.invoke()` + (no code change needed — verify) +- [ ] All existing `HandlerRegistration::new()` call sites wrap in + `HandlerKind::Once(...)` +- [ ] All existing builder call sites compile (builder absorbs wrapping) +- [ ] Unit test: `invoke()` on a `Subscription` op (registered with + `HandlerKind::Stream`) returns `INVALID_OPERATION_TYPE` +- [ ] Unit test: `invoke()` on a `Query` op (registered with `HandlerKind::Once`) + dispatches normally +- [ ] Unit test: `register()` rejects `HandlerKind::Once` for a `Subscription` spec +- [ ] Unit test: `register()` rejects `HandlerKind::Stream` for a `Query` spec +- [ ] Unit test: `OverlayOperationEnv::invoke()` on a `Stream`-kind overlay op + returns `INVALID_OPERATION_TYPE` +- [ ] Unit test: `make_streaming_handler` produces a working `StreamingHandler` +- [ ] `cargo test -p alknet-call` succeeds +- [ ] `cargo test -p alknet-http` succeeds +- [ ] `cargo clippy -p alknet-call --all-targets` succeeds with no warnings +- [ ] `cargo clippy -p alknet-http --all-targets` succeeds with no warnings +- [ ] `cargo fmt --check -p alknet-call -p alknet-http` passes + +## References + +- docs/architecture/decisions/049-streaming-handler-for-subscriptions.md — ADR-049 (the decision) +- docs/architecture/crates/call/operation-registry.md — §Handler (StreamingHandler, HandlerKind, ResponseStream, make_streaming_handler), §OperationRegistry (register validation, invoke errors on Stream), §HandlerRegistration +- docs/architecture/crates/call/call-protocol.md — §call.error Payload (INVALID_OPERATION_TYPE protocol code) +- docs/architecture/decisions/023-operation-error-schemas.md — ADR-023 (amended: six protocol codes) + +## Notes + +> This is the foundational breaking change. The `HandlerRegistration.handler` +> type flip from `Handler` to `HandlerKind` ripples to every construction site, +> but each change is mechanical (`Handler` → `HandlerKind::Once(handler)`). The +> builder absorbs the wrapping for the common case. The load-bearing parts are: +> (1) `register()` validation catches kind/op_type mismatch at startup, (2) +> `invoke()` errors on `Stream` (the guard that prevents silent truncation), (3) +> `OverlayOperationEnv` matches on `HandlerKind` (it dispatches directly, not via +> `registry.invoke()`). `LocalOperationEnv` needs no change — it delegates to +> `registry.invoke()` which handles it. Do NOT add `invoke_streaming()` in this +> task — that's `call/registry/invoke-streaming`. The `futures` crate is already +> a dependency of `alknet-call`. The two-method-pair builder API +> (`with_local`/`with_local_streaming`) is preferred over a typed enum input — +> it keeps the common case on existing signatures and makes streaming explicit. + +## Summary + +> To be filled on completion \ No newline at end of file diff --git a/tasks/http/adapters/from-openapi-sse-streaming.md b/tasks/http/adapters/from-openapi-sse-streaming.md new file mode 100644 index 0000000..a30b2ce --- /dev/null +++ b/tasks/http/adapters/from-openapi-sse-streaming.md @@ -0,0 +1,243 @@ +--- +id: http/adapters/from-openapi-sse-streaming +name: Implement from_openapi Subscription forwarding as StreamingHandler (SSE response → BoxStream) +status: pending +depends_on: [call/registry/streaming-handler-handlerkind] +scope: narrow +risk: medium +impact: component +level: implementation +--- + +## Description + +Branch `from_openapi`'s forwarding handler construction on `op_type` so that a +`Subscription` op (detected via `text/event-stream` response content type) +registers a `StreamingHandler` (`HandlerKind::Stream`) that streams the SSE +response chunks as `ResponseEnvelope::ok()` items. `Query`/`Mutation` ops keep +the existing `Handler` (`HandlerKind::Once`) that returns a single +`ResponseEnvelope`. This closes the gap where a `from_openapi`-imported +`Subscription` returned only the last SSE event. + +This task depends on `call/registry/streaming-handler-handlerkind` (which +introduces `HandlerKind::Stream` and `make_streaming_handler`). The existing +`from_openapi` code already detects `Subscription` (`detect_op_type` checks for +`text/event-stream`) and has an SSE parser (`parse_sse_frames`); this task +rewires the subscription path from "collect all events, return last" to "stream +events as they arrive". + +### The branch in build_registration + +`build_registration` currently always builds a `Handler` (via `make_handler`) and +wraps in `HandlerKind::Once` (after `streaming-handler-handlerkind`). Branch on +`op_type`: + +- `Query` / `Mutation` → existing `make_handler` + `forward()` (single response), + `HandlerKind::Once` +- `Subscription` → new `make_streaming_handler` + `forward_stream()` (SSE + streaming), `HandlerKind::Stream` + +The `op_type` is already computed by `detect_op_type` and available in +`build_registration`. The `HandlerRegistration::new()` call at the end wraps in +the right `HandlerKind` based on `op_type`. + +### forward_stream() — the streaming forward function + +```rust +async fn forward_stream( + http_client: &Arc, + base_url: &str, + path_template: &str, + method: &str, + auth_scheme: &Option, + default_headers: &HashMap, + namespace: &str, + error_status_codes: &[(u16, String)], + input: Value, + context: OperationContext, +) -> ResponseStream { + let request_id = context.request_id.clone(); + + // 1. Build the request (same as forward()) + let (http_method, url, body, headers) = match build_request(...) { + Ok(parts) => parts, + Err(err) => { + return Box::pin(stream::once(async move { + ResponseEnvelope::error(request_id, err) + })); + } + }; + + // 2. Send with Accept: text/event-stream + let request_builder = http_client.client() + .request(http_method, url.as_str()) + .headers(headers) + .header(ACCEPT, "text/event-stream"); + let request_builder = match body.as_ref() { + Some(b) => request_builder.body(serde_json::to_string(b).unwrap_or("null".to_string())), + None => request_builder, + }; + + let response: reqwest::Response = match request_builder.send().await { + Ok(r) => r, + Err(err) => { + return Box::pin(stream::once(async move { + ResponseEnvelope::error(request_id, CallError::internal(format!("HTTP request failed: {err}"))) + })); + } + }; + + let status = response.status(); + if !status.is_success() { + // Non-2xx → single error envelope, stream ends + let code = error_status_codes.iter() + .find(|(s, _)| *s == status.as_u16()) + .map(|(_, c)| c.clone()) + .unwrap_or_else(|| format!("HTTP_{}", status.as_u16())); + let message = format!("HTTP {}: {}", status.as_u16(), status.canonical_reason().unwrap_or("")); + return Box::pin(stream::once(async move { + ResponseEnvelope::error(request_id, CallError::new(code, message, false)) + })); + } + + // 3. Stream the SSE chunks → ResponseEnvelope::ok() per data: frame + let request_id_stream = request_id.clone(); + let sse_stream = response.bytes_stream() + .scan(String::new(), move |buffer, chunk_result| { + // Parse SSE frames from the chunk, emit each as a ResponseEnvelope::ok() + // This is the streaming analogue of stream_subscription() + let request_id = request_id_stream.clone(); + async move { + match chunk_result { + Ok(chunk) => { + buffer.push_str(&String::from_utf8_lossy(&chunk)); + let (events, remaining) = parse_sse_frames(buffer); + *buffer = remaining; + // Emit each event as a ResponseEnvelope::ok() + let envelopes: Vec = events.into_iter() + .map(|e| { + let parsed = if e.data.trim().is_empty() { + Value::Null + } else { + serde_json::from_str(&e.data).unwrap_or(Value::String(e.data.clone())) + }; + ResponseEnvelope::ok(&request_id, parsed) + }) + .collect(); + Some((envelopes,)) // yield the batch + } + Err(err) => { + let error = CallError::internal(format!("SSE stream error: {err}")); + Some(vec![ResponseEnvelope::error(request_id, error)]) + } + } + } + }) + .flat_map(|envelopes| stream::iter(envelopes)); + + Box::pin(sse_stream) +} +``` + +The exact combinator shape (`scan` + `flat_map`, or a custom `Stream` impl, or +`unfold`) is an implementation detail — the contract is: each SSE `data:` frame +becomes a `ResponseEnvelope::ok()`; an HTTP error (non-2xx) becomes a single +`ResponseEnvelope::error()` and ends the stream; SSE stream end ends the +`ResponseStream` (→ `call.completed` on the wire). Reuse the existing +`parse_sse_frames` parser — it already handles multi-event buffers, partial +trailing lines, comments, multi-line data, BOM. + +### Remove stream_subscription() (the collect-all placeholder) + +The existing `stream_subscription()` collects all SSE events and returns the +last one as a single `ResponseEnvelope`. This is the placeholder that +truncates. Remove it (or repurpose its SSE-parsing logic into the streaming +`forward_stream`). The `parse_sse_frames` function stays (it's reused by +`forward_stream`); only the collect-all `stream_subscription` wrapper goes. + +### build_registration wiring + +```rust +let handler = if op_type == OperationType::Subscription { + // Streaming handler — HandlerKind::Stream + let stream_handler = make_streaming_handler(move |input, context| { + // clone captured vars + async move { + forward_stream(&http_client, &base_url, &path_template, &method_upper, + &auth_scheme, &default_headers, &namespace, &error_status_codes, + input, context).await + } + }); + HandlerKind::Stream(stream_handler) +} else { + // Request/response handler — HandlerKind::Once (existing) + let once_handler = make_handler(move |input, context| { + // clone captured vars + async move { + forward(&http_client, &base_url, &path_template, &method_upper, + &auth_scheme, &default_headers, &namespace, &error_status_codes, + op_type, input, context).await + } + }); + HandlerKind::Once(once_handler) +}; + +HandlerRegistration::new(spec, handler, OperationProvenance::FromOpenAPI, None, None, capabilities) +``` + +### What this task does NOT do + +- **No `from_mcp` changes.** `from_mcp` handlers are always `HandlerKind::Once` + (MCP tools are request/response — ADR-041; ADR-049 confirms this is unchanged). +- **No gateway changes.** The gateway `/subscribe` SSE path is + `http/server/subscribe-sse-streaming`. +- **No `OperationRegistry` changes.** `invoke_streaming()` is provided by + `call/registry/invoke-streaming`. + +## Acceptance Criteria + +- [ ] `build_registration` branches on `op_type`: `Subscription` → + `HandlerKind::Stream` (streaming forward), `Query`/`Mutation` → + `HandlerKind::Once` (existing forward) +- [ ] `forward_stream()` streams SSE chunks as `ResponseEnvelope::ok()` items +- [ ] Each SSE `data:` frame → one `ResponseEnvelope::ok()` +- [ ] HTTP error (non-2xx) → single `ResponseEnvelope::error()`, stream ends +- [ ] SSE stream end → `ResponseStream` ends (→ `call.completed` on wire) +- [ ] `parse_sse_frames` reused (multi-event, partial trailing, comments, + multi-line data, BOM — all handled) +- [ ] `stream_subscription()` (collect-all placeholder) removed or repurposed +- [ ] `Query`/`Mutation` forwarding unchanged (existing `forward()` path) +- [ ] `Accept: text/event-stream` header sent for Subscription requests +- [ ] Unit test: `Subscription` op registration is `HandlerKind::Stream` +- [ ] Unit test: `Query` op registration is `HandlerKind::Once` (unchanged) +- [ ] Integration test: `Subscription` forwarding streams multiple + `ResponseEnvelope::ok()` items from an SSE server (one per `data:` frame) +- [ ] Integration test: `Subscription` forwarding on HTTP error → one + `ResponseEnvelope::error()`, stream ends +- [ ] Integration test: `Query` forwarding unchanged (single response) +- [ ] `cargo test -p alknet-http` succeeds +- [ ] `cargo clippy -p alknet-http --all-targets` succeeds with no warnings +- [ ] `cargo fmt --check -p alknet-http` passes + +## References + +- docs/architecture/decisions/049-streaming-handler-for-subscriptions.md — ADR-049 §9 (from_openapi SSE forwarding) +- docs/architecture/crates/http/http-adapters.md — §Forwarding handler (Subscription → HandlerKind::Stream, SSE → BoxStream) +- docs/architecture/crates/http/http-mcp.md — from_mcp handlers always HandlerKind::Once (unchanged) + +## Notes + +> The existing `stream_subscription()` is the placeholder that truncates — it +> collects all SSE events and returns the last. Replace it with `forward_stream()` +> that yields each SSE event as a stream item. Reuse `parse_sse_frames` (it's +> already correct for multi-event buffers, partial lines, comments, BOM). The +> combinator shape (`scan` + `flat_map`, `unfold`, or custom `Stream`) is an +> implementation detail — the contract is one `ResponseEnvelope::ok()` per +> `data:` frame, error on HTTP failure, end on SSE close. `from_mcp` is +> unchanged — MCP tools are request/response (ADR-041), always +> `HandlerKind::Once`. The `futures` crate's `StreamExt::scan` / `flat_map` / +> `unfold` are the likely tools. + +## Summary + +> To be filled on completion \ No newline at end of file diff --git a/tasks/http/gateway/invoke-streaming.md b/tasks/http/gateway/invoke-streaming.md new file mode 100644 index 0000000..7d9600f --- /dev/null +++ b/tasks/http/gateway/invoke-streaming.md @@ -0,0 +1,130 @@ +--- +id: http/gateway/invoke-streaming +name: Implement GatewayDispatch::invoke_streaming() returning BoxStream +status: pending +depends_on: [call/registry/invoke-streaming] +scope: narrow +risk: medium +impact: component +level: implementation +--- + +## Description + +Add `GatewayDispatch::invoke_streaming()` — the streaming analogue of +`invoke()`, returning a `BoxStream`. The security invariants +are identical to `invoke()`: `internal: false`, `forwarded_for: None`, same +capabilities, same `scoped_env`, same ACL check before dispatch. The two methods +diverge only on the return shape (stream vs single envelope). The HTTP +`/subscribe` handler calls this and pipes the stream to SSE. + +This task depends on `call/registry/invoke-streaming` (which provides +`OperationRegistry::invoke_streaming()`). It adds the `GatewayDispatch` method +only — the `/subscribe` SSE wiring is `http/server/subscribe-sse-streaming`. + +### invoke_streaming() + +```rust +use futures::stream::BoxStream; + +impl GatewayDispatch { + /// Invoke a Subscription operation as a wire-ingress caller. The streaming + /// analogue of `invoke()`. Security invariants identical to `invoke()`: + /// `internal: false`, `forwarded_for: None`, same capabilities, same + /// scoped_env, same ACL. Diverges only on return shape (stream vs single + /// envelope). Returns a `BoxStream`; the `/subscribe` + /// handler pipes it to SSE. + pub async fn invoke_streaming( + &self, + identity: Option, + op: &str, + input: Value, + ) -> BoxStream<'static, ResponseEnvelope> { + let operation_name = strip_leading_slash(op).to_string(); + let request_id = uuid::Uuid::new_v4().to_string(); + let context = self.build_root_context(&request_id, &operation_name, identity); + // The registry's invoke_streaming returns ResponseStream + // (Pin + Send>>), which IS + // BoxStream<'static, ResponseEnvelope>. Box::pin / BoxStream are the + // same type — the alias just spells it out. + self.registry.invoke_streaming(&operation_name, input, context) + } +} +``` + +`build_root_context` is reused unchanged — it constructs the root +`OperationContext` with `internal: false`, `forwarded_for: None`, fresh +`request_id`, deadline, registration bundle's `composition_authority` / +`capabilities` / `scoped_env`. The security axis is provably identical between +`invoke()` and `invoke_streaming()` because they share `build_root_context`. + +### deadline: None for streaming + +The spec says `deadline: None` for subscriptions (unbounded). `build_root_context` +sets `deadline: Some(now + 30s)`. For the streaming path, set +`context.deadline = None` after `build_root_context`, OR add a streaming flag to +`build_root_context`. Coordinate with `call/protocol/dispatch-streaming-branch` +which has the same concern — extract a shared approach (e.g., a +`build_root_context_streaming` variant or a `deadline: None` override). The +gateway and the call dispatcher both need `deadline: None` for subscriptions; +don't duplicate the logic. + +### What this task does NOT do + +- **No `/subscribe` SSE wiring.** That's `http/server/subscribe-sse-streaming`. +- **No `OperationRegistry` changes.** `invoke_streaming()` is provided by + `call/registry/invoke-streaming`. +- **No `to_mcp` changes.** MCP excludes `Subscription` (ADR-041); `to_mcp` never + calls `invoke_streaming()`. + +## Acceptance Criteria + +- [ ] `GatewayDispatch::invoke_streaming()` method exists +- [ ] Returns `BoxStream<'static, ResponseEnvelope>` (or `ResponseStream` — + same type) +- [ ] Builds root `OperationContext` via `build_root_context` (same as `invoke()`) +- [ ] Root context has `internal: false`, `forwarded_for: None`, fresh + `request_id` +- [ ] `handler_identity`, `capabilities`, `scoped_env` from registration bundle + (same as `invoke()`) +- [ ] `deadline: None` for the streaming path (unbounded subscriptions) +- [ ] Calls `OperationRegistry::invoke_streaming()` +- [ ] Security invariants identical to `invoke()` (shared `build_root_context`) +- [ ] Unit test: `invoke_streaming()` on a registered `Subscription` op returns + the handler's stream +- [ ] Unit test: `invoke_streaming()` on unknown op returns a stream yielding + one `NOT_FOUND` +- [ ] Unit test: `invoke_streaming()` on Internal op from external returns + stream yielding one `NOT_FOUND` (not leaked) +- [ ] Unit test: `invoke_streaming()` with `None` identity + restricted op + returns stream yielding one `FORBIDDEN` +- [ ] Unit test: `invoke_streaming()` on a `Query` op returns stream yielding + one `INVALID_OPERATION_TYPE` +- [ ] Unit test: `invoke()` (existing) on a `Subscription` op returns + `INVALID_OPERATION_TYPE` (verifies the guard from + `streaming-handler-handlerkind` holds through the gateway) +- [ ] `cargo test -p alknet-http` succeeds +- [ ] `cargo clippy -p alknet-http --all-targets` succeeds with no warnings +- [ ] `cargo fmt --check -p alknet-http` passes + +## References + +- docs/architecture/decisions/049-streaming-handler-for-subscriptions.md — ADR-049 §7 (GatewayDispatch::invoke_streaming) +- docs/architecture/crates/http/http-server.md — §Streaming projection (invoke_streaming security invariants identical to invoke) +- docs/architecture/crates/call/operation-registry.md — §OperationRegistry (invoke_streaming) + +## Notes + +> The security axis MUST be provably identical between `invoke()` and +> `invoke_streaming()` — they share `build_root_context`. The two methods +> diverge only on the return shape. `deadline: None` for subscriptions is a +> shared concern with `call/protocol/dispatch-streaming-branch` — extract a +> shared approach (a streaming-variant of `build_root_context` or a +> `deadline: None` override) rather than duplicating. `to_mcp` never calls +> `invoke_streaming()` (MCP excludes `Subscription` — ADR-041); do not add +> streaming to the MCP gateway. The `futures` crate is already a dependency of +> `alknet-http`. + +## Summary + +> To be filled on completion \ No newline at end of file diff --git a/tasks/http/server/subscribe-sse-streaming.md b/tasks/http/server/subscribe-sse-streaming.md new file mode 100644 index 0000000..d03d526 --- /dev/null +++ b/tasks/http/server/subscribe-sse-streaming.md @@ -0,0 +1,156 @@ +--- +id: http/server/subscribe-sse-streaming +name: Wire /subscribe handler to GatewayDispatch::invoke_streaming() and pipe BoxStream to SSE +status: pending +depends_on: [http/gateway/invoke-streaming] +scope: narrow +risk: medium +impact: component +level: implementation +--- + +## Description + +Replace the `/subscribe` handler's one-event placeholder +(`subscribe_stream_from_envelope`, which calls `GatewayDispatch::invoke()` and +wraps the single `ResponseEnvelope` in a one-event SSE stream) with the real +streaming path: call `GatewayDispatch::invoke_streaming()` and pipe the +`BoxStream` to SSE. Each `Ok(value)` → SSE `data:` frame; +`Err` → SSE error event + close (terminal); natural stream end → close (normal +end, corresponds to `call.completed` on the wire). On `call.aborted` or HTTP +client disconnect, drop the stream (Drop releases handler resources, abort +cascade runs per ADR-016). + +This task depends on `http/gateway/invoke-streaming` (which provides +`GatewayDispatch::invoke_streaming()`). It rewrites `subscribe_handler` and +removes the placeholder helpers. + +### subscribe_handler rewrite + +```rust +pub(crate) async fn subscribe_handler( + State(state): State, + ResolvedIdentity(identity): ResolvedIdentity, + Json(request): Json, +) -> Sse { + let stream = if is_internal_op(&state.registry, &request.operation) { + // Internal ops return NOT_FOUND (don't leak existence) — single error event + subscribe_stream_internal_error(request.operation) + } else { + let dispatch = state.dispatch(); + let envelope_stream = dispatch + .invoke_streaming(identity, &request.operation, request.input) + .await; + // Pipe the BoxStream to SSE frames + subscribe_stream_from_envelope_stream(envelope_stream) + }; + Sse::new(stream) +} +``` + +### subscribe_stream_from_envelope_stream + +Map each `ResponseEnvelope` in the `BoxStream` to an SSE `Event`: + +```rust +fn subscribe_stream_from_envelope_stream( + stream: BoxStream<'static, ResponseEnvelope>, +) -> SubscribeStream { + Box::pin(stream.map(|envelope| { + match envelope.result { + Ok(output) => { + let data = serde_json::to_string(&output) + .unwrap_or_else(|_| "null".to_string()); + Ok(Event::default().data(data)) + } + Err(error) => { + let payload = serde_json::to_value(&error).unwrap_or(Value::Null); + let data = serde_json::to_string(&payload) + .unwrap_or_else(|_| "null".to_string()); + Ok(Event::default().event("error").data(data)) + } + } + })) +} +``` + +The `Err` case produces an SSE error event — the stream ends after it (the +`StreamingHandler`'s contract: `Err` is terminal). The natural stream end +(stream yields `None`) closes the SSE stream (axum's `Sse` wrapper handles the +close when the underlying stream ends). + +### Remove the placeholder + +Delete `subscribe_stream_from_envelope` (the one-event placeholder) and +`envelope_to_sse_stream` (the single-envelope-to-stream helper). The new +`subscribe_stream_from_envelope_stream` replaces them. Keep +`subscribe_stream_internal_error` (Internal ops still return a single +`NOT_FOUND` error event — they don't reach `invoke_streaming()`). + +### Client disconnect / abort + +axum's `Sse` response detects when the HTTP client disconnects (the response +writer closes) and drops the stream future. `Drop` releases the handler's +resources, and the abort cascade runs per ADR-016. No explicit disconnect +handling is needed — Rust's `Drop` + axum's response-drop handle it. Verify the +stream is dropped (not leaked) on disconnect. + +### What this task does NOT do + +- **No `GatewayDispatch` changes.** `invoke_streaming()` is provided by + `http/gateway/invoke-streaming`. +- **No `to_mcp` changes.** MCP has no `/subscribe` equivalent (ADR-041). +- **No `from_openapi` changes.** `from_openapi` SSE forwarding is + `http/adapters/from-openapi-sse-streaming`. + +## Acceptance Criteria + +- [ ] `subscribe_handler` calls `GatewayDispatch::invoke_streaming()` (not + `invoke()`) +- [ ] `subscribe_stream_from_envelope_stream` maps `BoxStream` + to SSE `Event`s +- [ ] `Ok(value)` → SSE `data:` frame with output serialized as JSON +- [ ] `Err` → SSE error event (`event: error`) with `CallError` serialized, then + stream ends (terminal) +- [ ] Natural stream end → SSE stream closes (normal end) +- [ ] Internal op → single `NOT_FOUND` error event (unchanged — + `subscribe_stream_internal_error` kept) +- [ ] Client disconnect → stream dropped (Drop releases resources; abort cascade) +- [ ] Placeholder helpers (`subscribe_stream_from_envelope`, + `envelope_to_sse_stream`) removed +- [ ] `SubscribeStream` type alias still `BoxStream<'static, Result>` +- [ ] Unit test: `/subscribe` on a `Subscription` op streams multiple `data:` + frames (one per `call.responded`) +- [ ] Unit test: `/subscribe` on a `Subscription` op that yields `Err` → one + `event:error` frame, then stream closes +- [ ] Unit test: `/subscribe` on Internal op → `event:error` with `NOT_FOUND` + (unchanged) +- [ ] Unit test: `/subscribe` on unknown op → `event:error` with `NOT_FOUND` +- [ ] Unit test: `/subscribe` on `Query` op → `event:error` with + `INVALID_OPERATION_TYPE` (the guard holds through the gateway) +- [ ] Unit test: response `Content-Type` is `text/event-stream` +- [ ] `cargo test -p alknet-http` succeeds +- [ ] `cargo clippy -p alknet-http --all-targets` succeeds with no warnings +- [ ] `cargo fmt --check -p alknet-http` passes + +## References + +- docs/architecture/decisions/049-streaming-handler-for-subscriptions.md — ADR-049 §7 (HTTP /subscribe pipes BoxStream to SSE) +- docs/architecture/crates/http/http-server.md — §Streaming projection (SSE — the gateway's /subscribe) +- docs/architecture/decisions/016-abort-cascade-for-nested-calls.md — ADR-016 (stream drop on disconnect/abort) + +## Notes + +> This replaces the one-event placeholder with the real streaming path. The +> `Err` envelope is terminal — the SSE stream ends after the error event (no +> `data:` frame after an `event:error`). Natural stream end closes the SSE +> stream (axum handles the close when the underlying stream ends). Client +> disconnect drops the stream future via Rust's `Drop` — no explicit handling +> needed. Keep `subscribe_stream_internal_error` (Internal ops return +> `NOT_FOUND` without reaching `invoke_streaming()` — they don't leak +> existence). The `futures::StreamExt::map` combinator is the tool for mapping +> the envelope stream to SSE events. + +## Summary + +> To be filled on completion \ No newline at end of file diff --git a/tasks/review-streaming-impl.md b/tasks/review-streaming-impl.md new file mode 100644 index 0000000..724471f --- /dev/null +++ b/tasks/review-streaming-impl.md @@ -0,0 +1,210 @@ +--- +id: review-streaming-impl +name: Review ADR-049 streaming handler implementation for spec conformance and end-to-end correctness +status: pending +depends_on: [call/protocol/dispatch-streaming-branch, call/client/from-call-streaming-forwarding, http/gateway/invoke-streaming, http/server/subscribe-sse-streaming, http/adapters/from-openapi-sse-streaming] +scope: broad +risk: low +impact: phase +level: review +--- + +## Description + +Review the ADR-049 streaming handler implementation across `alknet-call` and +`alknet-http` for spec conformance, end-to-end correctness, and pattern +consistency. This is the quality checkpoint after the streaming handler work — +the most significant cross-cutting change since the initial call/http +implementation. All five implementation tasks must be complete before this +review. + +### Review Checklist + +1. **Type surface conformance** (operation-registry.md §Handler): + - `StreamingHandler` type alias matches spec (`Arc Pin + Send>> + Send + Sync>`) + - `ResponseStream` alias = `Pin + Send>>` (= `futures::stream::BoxStream<'static, ResponseEnvelope>`) + - `HandlerKind` enum with `Once(Handler)` and `Stream(StreamingHandler)` variants + - `make_streaming_handler()` helper (analogue of `make_handler()`) + - `HandlerRegistration.handler` is `HandlerKind` (not `Handler`) + - `INVALID_OPERATION_TYPE` is the sixth protocol-level code in `CallError` + (`retryable: false`, `details: None`) + +2. **Registry conformance** (operation-registry.md §OperationRegistry): + - `register()` validates `HandlerKind` matches `spec.op_type` (Once for + Query/Mutation, Stream for Subscription) — mismatch is a startup error + - `invoke()` dispatches `HandlerKind::Once` (existing path); returns + `INVALID_OPERATION_TYPE` for `HandlerKind::Stream` + - `invoke_streaming()` dispatches `HandlerKind::Stream`; returns + `INVALID_OPERATION_TYPE` for `HandlerKind::Once` + - `invoke_streaming()` pre-handler errors (not-found, forbidden, + INVALID_OPERATION_TYPE) yield a single error `ResponseEnvelope` and end + the stream + - `invoke_streaming()` visibility + ACL checks identical to `invoke()` + (same authority switch: internal → handler_identity, external → identity) + - `OperationEnv` is request/response-only — no `invoke_streaming()` on the + trait; `invoke()` on a `Subscription` returns `INVALID_OPERATION_TYPE` + (via `LocalOperationEnv` → `registry.invoke()` and `OverlayOperationEnv` + direct match) + +3. **Builder conformance** (operation-registry.md §OperationRegistryBuilder): + - `with_local` / `with_leaf` / `with_leaf_provenance` wrap `Handler` in + `HandlerKind::Once` for Query/Mutation + - `with_local_streaming` / `with_leaf_streaming` wrap `StreamingHandler` in + `HandlerKind::Stream` for Subscription + - Builder validates `handler` kind matches `spec.op_type` + +4. **Call-protocol dispatch conformance** (call-protocol.md §CallAdapter Stream Handling): + - `Dispatcher::handle_stream` branches on `op_type`: `Subscription` → + `invoke_streaming()` → pump stream; `Query`/`Mutation` → `invoke()` (existing) + - Streaming pump: each `ResponseEnvelope` → `EventEnvelope` frame + - Natural stream end → `call.completed` frame + - `Err` envelope → `call.error` frame, stream ends after it (NO + `call.completed` after an error) + - `deadline: None` for subscriptions (unbounded) + - Abort: `call.aborted` drops the stream (Drop releases resources; ADR-016) + +5. **from_call streaming forwarding** (client-and-adapters.md §from_call): + - `build_bundles` branches on `op_type`: `Subscription` → + `make_streaming_forwarding_handler` (`HandlerKind::Stream`), + `Query`/`Mutation` → `make_forwarding_handler` (`HandlerKind::Once`) + - Streaming forwarding handler calls `CallConnection::subscribe_with_payload()` + (or `subscribe()` with forwarded payload) + - `forwarded_for` populated from `context.identity` (ADR-032) + - Remote stream forwarded end-to-end (no truncation, no first-value fallback) + - `composition_authority: None`, `scoped_env: None` for FromCall streaming leaves + +6. **Gateway dispatch conformance** (http-server.md §Streaming projection): + - `GatewayDispatch::invoke_streaming()` exists, returns + `BoxStream` + - Security invariants identical to `invoke()` (shared `build_root_context`): + `internal: false`, `forwarded_for: None`, same capabilities, same scoped_env, + same ACL + - `deadline: None` for the streaming path + - `to_mcp` does NOT call `invoke_streaming()` (MCP excludes Subscription) + +7. **/subscribe SSE conformance** (http-server.md §Streaming projection): + - `subscribe_handler` calls `GatewayDispatch::invoke_streaming()` (not `invoke()`) + - Each `Ok(value)` → SSE `data:` frame + - `Err` → SSE error event (`event: error`), stream ends after (terminal) + - Natural stream end → SSE stream closes + - Internal op → single `NOT_FOUND` error event (no leak) + - Client disconnect → stream dropped (Drop; abort cascade) + - Placeholder helpers removed (`subscribe_stream_from_envelope`, + `envelope_to_sse_stream`) + +8. **from_openapi SSE conformance** (http-adapters.md §Forwarding handler): + - `build_registration` branches on `op_type`: `Subscription` → + `HandlerKind::Stream` (streaming forward), `Query`/`Mutation` → + `HandlerKind::Once` (existing) + - `forward_stream()` streams SSE chunks as `ResponseEnvelope::ok()` items + - HTTP error → single `ResponseEnvelope::error()`, stream ends + - SSE stream end → `ResponseStream` ends + - `parse_sse_frames` reused (multi-event, partial, comments, BOM) + - `stream_subscription()` collect-all placeholder removed + - `from_mcp` unchanged (always `HandlerKind::Once`) + +9. **ADR conformance**: + - ADR-049: all 9 decisions implemented (StreamingHandler, HandlerKind, + invoke_streaming, invoke errors on Subscription, OperationEnv + request/response-only, server-side dispatch branch, + GatewayDispatch::invoke_streaming, from_call stream forwarding, + from_openapi SSE forwarding) + - ADR-023 amended: six protocol codes (INVALID_OPERATION_TYPE added) + - ADR-016: abort cascade on streaming (stream drop) + - ADR-032: forwarded_for on streaming forwarding handlers + +10. **End-to-end correctness**: + - A `Subscription` op registered with a `StreamingHandler` streams + `call.responded` events through: server dispatch → wire → HTTP `/subscribe` + SSE (or `from_call` forwarding → remote stream, or `from_openapi` SSE + forwarding) + - `invoke()` on a `Subscription` returns `INVALID_OPERATION_TYPE` (not silent + truncation) + - `invoke_streaming()` on a `Query`/`Mutation` returns + `INVALID_OPERATION_TYPE` + - `OperationEnv::invoke()` on a `Subscription` returns + `INVALID_OPERATION_TYPE` (composition is request/response-only) + +11. **Pattern consistency**: + - `invoke()` and `invoke_streaming()` share visibility + ACL logic (security + axis provably identical) + - `GatewayDispatch::invoke()` and `invoke_streaming()` share + `build_root_context` (security axis provably identical) + - `HandlerKind` makes the "one or the other, matching op_type" invariant + type-level (not two `Option`s validated at runtime) + - Existing `Query`/`Mutation` handlers unchanged (wrapped in + `HandlerKind::Once`, dispatch path identical) + +12. **Test coverage**: + - Unit tests for `HandlerKind` validation at `register()` (both mismatch + directions) + - Unit tests for `invoke()` / `invoke_streaming()` cross-kind errors + (`INVALID_OPERATION_TYPE` both directions) + - Unit tests for `invoke_streaming()` pre-handler errors (not-found, + forbidden, internal-from-external) + - Unit tests for `invoke_streaming()` ACL authority switch (internal → + handler_identity) + - Unit test for `make_streaming_handler` + - Unit/integration tests for server-side streaming dispatch (multiple + `call.responded` + `call.completed`; `Err` → `call.error`, no + `call.completed` after) + - Unit/integration tests for `from_call` streaming forwarding + - Unit/integration tests for `from_openapi` SSE streaming forwarding + - Unit tests for `/subscribe` SSE (multiple `data:` frames; `event:error`; + `INVALID_OPERATION_TYPE` for `Query` op via `/subscribe`) + - Unit tests for `GatewayDispatch::invoke_streaming()` (all error paths) + +## Acceptance Criteria + +- [ ] All type surface matches operation-registry.md §Handler +- [ ] All registry methods match operation-registry.md §OperationRegistry +- [ ] Builder wraps HandlerKind correctly per op_type +- [ ] Call-protocol dispatch branches on op_type correctly +- [ ] from_call streaming forwarding works end-to-end +- [ ] GatewayDispatch::invoke_streaming security invariants identical to invoke +- [ ] /subscribe SSE pipes BoxStream correctly +- [ ] from_openapi SSE streaming works (no truncation) +- [ ] from_mcp unchanged (always HandlerKind::Once) +- [ ] ADR-049 all 9 decisions implemented +- [ ] ADR-023 amended (six protocol codes) +- [ ] invoke() / invoke_streaming() / OperationEnv::invoke() cross-kind errors + all return INVALID_OPERATION_TYPE +- [ ] Existing Query/Mutation handlers unchanged +- [ ] Test coverage adequate for all streaming functionality +- [ ] `cargo fmt --check -p alknet-call -p alknet-http` passes +- [ ] `cargo clippy -p alknet-call -p alknet-http --all-targets` passes with no warnings +- [ ] All tests pass (`cargo test -p alknet-call -p alknet-http`) + +## References + +- docs/architecture/decisions/049-streaming-handler-for-subscriptions.md — ADR-049 +- docs/architecture/crates/call/operation-registry.md — Handler, OperationRegistry, HandlerRegistration +- docs/architecture/crates/call/call-protocol.md — CallAdapter Stream Handling, call.error Payload +- docs/architecture/crates/call/client-and-adapters.md — from_call streaming forwarding +- docs/architecture/crates/http/http-server.md — Streaming projection (SSE) +- docs/architecture/crates/http/http-adapters.md — Forwarding handler (from_openapi) +- docs/architecture/crates/http/http-mcp.md — from_mcp always HandlerKind::Once +- docs/architecture/decisions/023-operation-error-schemas.md — ADR-023 (amended: six codes) +- docs/architecture/decisions/016-abort-cascade-for-nested-calls.md — ADR-016 (stream drop) +- docs/architecture/decisions/032-forwarded-for-identity.md — ADR-032 (forwarded_for) + +## Notes + +> This is the quality checkpoint for the ADR-049 streaming handler work — the +> most significant cross-cutting change since the initial call/http +> implementation. The review should verify the end-to-end streaming path works: +> a `Subscription` op registered with a `StreamingHandler` streams +> `call.responded` events through every projection (server dispatch → wire, +> HTTP `/subscribe` SSE, `from_call` forwarding, `from_openapi` SSE forwarding). +> The load-bearing invariants: (1) `invoke()` / `invoke_streaming()` / +> `OperationEnv::invoke()` cross-kind errors all return +> `INVALID_OPERATION_TYPE` (no silent truncation), (2) the security axis is +> provably identical between `invoke()` and `invoke_streaming()` (shared +> `build_root_context` + shared visibility/ACL logic), (3) `HandlerKind` makes +> the kind/op_type invariant type-level, (4) existing `Query`/`Mutation` +> handlers are unchanged. If deviations are found, document and fix before +> considering the streaming handler work complete. + +## Summary + +> To be filled on completion \ No newline at end of file