Files
alknet/docs/architecture/decisions/049-streaming-handler-for-subscriptions.md
glm-5.2 7ecc11610a docs(arch): ADR-049 — streaming handler for subscription operations
The call protocol spec describes streaming (call.responded*N +
call.completed, PendingRequestMap::Subscribe, CallConnection::subscribe),
but the server-side Handler type returned a single ResponseEnvelope —
a Subscription op had no way to produce a stream. The TS predecessor
(@alkdev/operations) had separate OperationHandler / SubscriptionHandler
types; the Rust port collapsed them, losing the streaming path. This
restores it end-to-end: StreamingHandler type, HandlerKind on
HandlerRegistration validated against op_type, invoke_streaming() on
OperationRegistry, server-side dispatch branches on op_type, new
INVALID_OPERATION_TYPE protocol code for wrong-dispatch-path misuse,
GatewayDispatch::invoke_streaming() for /subscribe SSE, from_call stream
forwarding via CallConnection::subscribe(), from_openapi SSE forwarding.
OperationEnv::invoke() stays request/response-only (stream composition is
handler-level, not protocol-level). Amends ADR-023's protocol-code list
(five → six). Tracks the stream-operators library as OQ-41 (feature
extension, not an unmade decision).
2026-07-02 07:43:01 +00:00

337 lines
15 KiB
Markdown

# ADR-049: Streaming Handler for Subscription Operations
## Status
Accepted
## Context
The call protocol defines `Subscription` as a first-class operation type
(ADR-012 lists `subscribe` as one of four top-level protocol operations;
`OperationSpec.op_type` includes `Subscription`). The wire protocol supports
streaming: five event types (`call.requested`, `call.responded`,
`call.completed`, `call.aborted`, `call.error`), `PendingRequestMap::Subscribe`
with an mpsc channel, `CallConnection::subscribe()` returning
`impl Stream<Item = ResponseEnvelope>`, and a full streaming-subscribe example
in `call-protocol.md`. The **client side** works — a client can subscribe to a
remote stream and consume `call.responded` events until `call.completed`.
The **server side does not.** The `Handler` type in `alknet-call` is:
```rust
pub type Handler = Arc<
dyn Fn(Value, OperationContext) -> Pin<Box<dyn Future<Output = ResponseEnvelope> + Send>>
+ Send + Sync,
>;
```
It returns a single `ResponseEnvelope`. `OperationRegistry::invoke()` returns
one `ResponseEnvelope` and closes. `Dispatcher::handle_stream` calls
`dispatch_requested``registry.invoke()` → writes one `EventEnvelope` frame →
loops. A `Subscription` operation that should produce a *stream* of
`call.responded` events followed by `call.completed` has no way to express that
through this handler signature.
This is a **spec gap that should not have shipped.** The TypeScript predecessor
(`@alkdev/operations`, from which the Rust port was derived) had two distinct
handler types:
```typescript
type OperationHandler<I, O, C> = (input: I, context: C) => Promise<O> | O;
type SubscriptionHandler<I, O, C> = (input: I, context: C) => AsyncGenerator<O, void, unknown>;
```
The TS registry (`registry.ts:21`) stored them as a union, validated at
registration that `SUBSCRIPTION` ops get an `AsyncGeneratorFunction`, and the
server-side dispatch (`call.ts:341-349`, `buildCallHandler`) branched on
`op_type`: `SUBSCRIPTION` → iterate the async generator, `respond()` for each,
then `complete()`; else → `execute()`, single `respond()`. The Rust port
collapsed the union into a single `Handler` returning one `ResponseEnvelope`,
losing the streaming path. The fix is to restore it.
The downstream consequences of the gap:
- **`/subscribe` HTTP endpoint** (`GatewayDispatch::invoke()`
`subscribe_handler`) wraps a single `ResponseEnvelope` in a one-event SSE
stream. A real `Subscription` operation (e.g., `agent/chat` streaming LLM
tokens) cannot stream through it.
- **`from_call` forwarding** for a `Subscription` op calls
`CallConnection::call_with_payload()` (single response), not
`CallConnection::subscribe()` (stream). A `from_call`-imported subscription
truncates to the first value.
- **`from_openapi` forwarding** for a `text/event-stream` response returns one
`ResponseEnvelope` instead of streaming the SSE chunks.
All three are symptoms of the same root cause: the `Handler` type cannot
produce a stream.
## Decision
### 1. `StreamingHandler` type (alongside `Handler`)
Add a streaming handler type that returns a stream of `ResponseEnvelope`s,
mirroring the TS `SubscriptionHandler` / `OperationHandler` split:
```rust
pub type StreamingHandler = Arc<
dyn Fn(Value, OperationContext)
-> Pin<Box<dyn Stream<Item = ResponseEnvelope> + Send>>
+ Send + Sync,
>;
```
Each `Ok(value)` in the stream becomes a `call.responded` event. An `Err`
becomes a `call.error` event (terminal — the stream ends). Natural stream end
becomes `call.completed`. The dispatch path converts each `ResponseEnvelope` to
`EventEnvelope` exactly as it does today for the single-response case — no new
wire-format concept is introduced.
A `make_streaming_handler()` helper (analogue of `make_handler()`) wraps an
async generator / stream-producing closure into a `StreamingHandler`.
### 2. `HandlerKind` enum on `HandlerRegistration`
```rust
pub enum HandlerKind {
Once(Handler),
Stream(StreamingHandler),
}
pub struct HandlerRegistration {
pub spec: OperationSpec,
pub handler: HandlerKind, // validated against spec.op_type at registration
pub provenance: OperationProvenance,
pub composition_authority: Option<CompositionAuthority>,
pub scoped_env: Option<ScopedPeerEnv>,
pub capabilities: Capabilities,
}
```
Registration validates: `Query` / `Mutation``HandlerKind::Once`;
`Subscription``HandlerKind::Stream`. Mismatch is a startup error (same as
the TS `validateSubscriptionHandler`). The enum makes the "one or the other,
matching `op_type`" invariant type-level rather than two `Option`s validated at
runtime.
### 3. `OperationRegistry::invoke_streaming()`
```rust
impl OperationRegistry {
/// Dispatch a Subscription operation. Returns a stream of
/// ResponseEnvelopes. Errors (not-found, forbidden, invalid operation
/// type) yield a single ResponseEnvelope::error and end the stream.
pub fn invoke_streaming(
&self,
name: &str,
input: Value,
context: OperationContext,
) -> BoxStream<ResponseEnvelope>;
}
```
`invoke_streaming()` performs the same visibility + ACL checks as `invoke()`,
then dispatches to the `StreamingHandler`. Pre-handler errors (not-found,
forbidden) produce a single error `ResponseEnvelope` and end the stream
(matching the single-response path's behavior, just on a stream).
### 4. `OperationRegistry::invoke()` errors on `Subscription`
`invoke()` is the request/response dispatch path. Calling it on a
`Subscription` op is a type mismatch — a streaming operation dispatched through
the request/response path. It returns a `ResponseEnvelope` carrying
`CallError { code: "INVALID_OPERATION_TYPE", ... }` (a new protocol-level
code):
```
INVALID_OPERATION_TYPE
```
(`retryable: false`, `details: None`). This is the wire-format addition: a
sixth protocol-level error code. It signals "you called the wrong dispatch
method for this operation's type" — distinct from `INVALID_INPUT` (schema
mismatch) and `INTERNAL` (handler failure). Clients should treat unknown codes
as `INTERNAL` with `retryable: false` (the existing rule); `INVALID_OPERATION_
TYPE` is a permanent client-side programming error, not a transient failure.
### 5. `OperationEnv::invoke()` errors on `Subscription`
`OperationEnv::invoke()` (composition) stays request/response-only. It returns
a single `ResponseEnvelope`. Calling it on a `Subscription` op produces the
same `INVALID_OPERATION_TYPE` error — composition cannot truncate a stream to
its first value. This is a clean architectural boundary, not a deferral:
- **`OperationEnv` composition** is "call a child operation, get a result"
(the `OperationHandler` model). It is request/response by construction.
- **Stream composition** (filter, map, combine, window, dedupe) is a
handler-level concern. A handler that produces a stream transforms it with
stream operators at the handler level, not through `OperationEnv`. The
`@alkdev/pubsub` `operators.ts` is the prior art for this model: 13 operators
(`filter`, `map`, `take`, `batch`, `dedupe`, `window`, `chain`, `join`, etc.)
that operate on `AsyncIterable<T>`, distinct from the request/response
composition. In Rust, the analogues operate on `BoxStream<T>`.
- No `invoke_streaming()` is added to `OperationEnv`. The protocol composition
surface is request/response; stream manipulation is handler-internal.
### 6. Server-side dispatch branches on `op_type`
`Dispatcher::handle_stream` / `dispatch_requested` gains a branch on
`op_type`:
- `Subscription``registry.invoke_streaming()` → for each `ResponseEnvelope`
in the stream, write `EventEnvelope` to the wire → write `call.completed` on
stream end.
- `Query` / `Mutation``registry.invoke()` → write one `EventEnvelope`
(existing path, unchanged).
The streaming branch sets `deadline: None` for subscriptions (unbounded —
already specced in `call-protocol.md` Timeouts) and wires abort cascade
(ADR-016): if `call.aborted` arrives for a streaming request, the stream is
dropped (Rust `Drop` releases the handler's resources).
### 7. `GatewayDispatch::invoke_streaming()` (alknet-http)
The shared dispatch spine gains a streaming variant:
```rust
impl GatewayDispatch {
pub async fn invoke_streaming(
&self,
identity: Option<Identity>,
op: &str,
input: Value,
) -> BoxStream<ResponseEnvelope>;
}
```
`invoke_streaming()` builds the root `OperationContext` identically to
`invoke()` (same security invariants: `internal: false`, `forwarded_for:
None`, same capabilities, same `scoped_env`), then calls
`registry.invoke_streaming()`. The two gateways (`to_openapi`, `to_mcp`)
diverge only on wire-framing; the security axis is provably identical between
`invoke()` and `invoke_streaming()`.
The HTTP `/subscribe` handler calls `invoke_streaming()` and pipes the
`BoxStream<ResponseEnvelope>` to SSE: each `Ok(value)` → SSE `data:` frame,
`Err` → SSE error event + close, stream end → close. This replaces the current
one-event `subscribe_stream_from_envelope` with the real streaming path.
### 8. `from_call` stream forwarding
The `from_call` forwarding handler construction branches on `op_type` during
discovery:
- `Query` / `Mutation` → existing `make_forwarding_handler()` (calls
`CallConnection::call_with_payload()`, returns single `ResponseEnvelope`),
registered as `HandlerKind::Once`.
- `Subscription` → new `make_streaming_forwarding_handler()` (calls
`CallConnection::subscribe()`, returns `impl Stream<Item =
ResponseEnvelope>`, maps to `BoxStream<ResponseEnvelope>`), registered as
`HandlerKind::Stream`.
A `from_call`-imported `Subscription` op forwards the remote stream end-to-end:
the client-side `CallConnection::subscribe()` (already working) feeds a
`StreamingHandler` that produces the stream. No truncation, no first-value
fallback.
### 9. `from_openapi` SSE forwarding
The `from_openapi` forwarding handler construction branches on `op_type`
(determined by `detectOperationType``text/event-stream` response →
`Subscription`):
- `Query` / `Mutation` → existing forwarding handler (single HTTP request →
single `ResponseEnvelope`), `HandlerKind::Once`.
- `Subscription` → streaming forwarding handler (HTTP request → SSE response
stream → parse SSE chunks → `BoxStream<ResponseEnvelope>`), `HandlerKind::
Stream`.
The SSE parsing reuses the TS `parseSSEFrames` pattern: each SSE `data:` frame
becomes a `ResponseEnvelope::ok()`, SSE stream end becomes stream end (→
`call.completed`).
## Consequences
**Positive:**
- `Subscription` operations work end-to-end: server-side handler →
server-side dispatch → wire → HTTP `/subscribe` SSE → `from_call`
forwarding → `from_openapi` SSE forwarding. No truncation, no broken paths.
- The `Handler` / `StreamingHandler` split mirrors the TS prior art exactly,
making the Rust port faithful to its source.
- `HandlerKind` makes the "one or the other, matching `op_type`" invariant
type-level (a `Once` variant for `Query`/`Mutation`, a `Stream` variant for
`Subscription`) rather than a runtime check on two `Option`s.
- Existing handlers (echo, discovery, from_openapi Query/Mutation, from_mcp,
from_call Query/Mutation) are unchanged — they return a single
`ResponseEnvelope` and register as `HandlerKind::Once`. The streaming path
is additive to the existing handler surface.
- `OperationEnv` composition stays request/response, preserving the
composition model's simplicity. Stream composition is a handler-level
concern, cleanly separated.
- The new `INVALID_OPERATION_TYPE` protocol code catches dispatch-path misuse
(calling `invoke()` on a `Subscription`) at the protocol level instead of
silently producing wrong behavior.
**Negative:**
- `HandlerRegistration.handler` changes type from `Handler` to `HandlerKind`.
Existing code constructing `HandlerRegistration` bundles must wrap in
`HandlerKind::Once(...)`. This is a mechanical change across handler
construction sites (the builder's `.with_local()` / `.with_leaf()` /
`.with()` methods absorb the wrapping internally, so most assembly-layer
code is unaffected; direct `HandlerRegistration::new()` calls need the
wrap).
- A new protocol-level error code (`INVALID_OPERATION_TYPE`) is a wire-format
addition. Existing clients that treat unknown codes as `INTERNAL` with
`retryable: false` (the existing rule) handle it correctly — they just
don't distinguish it from `INTERNAL` until updated. The code is distinct
from all existing codes and from operation-level domain codes (no
`HTTP_` prefix, no collision with the five existing protocol codes).
- The `Dispatcher::handle_stream` streaming branch adds a stream-to-wire
pump (read stream → write frames → write `call.completed`). This is new
code in the hot dispatch path, but it is a straightforward `while let
Some(envelope) = stream.next().await` loop, not a complex abstraction.
## Door type
**One-way.** The `Handler` / `StreamingHandler` / `HandlerKind` API surface
is what handlers are written against across crates (`alknet-call`,
`alknet-http`, downstream consumers). Changing it after handlers exist is a
rewrite. The `INVALID_OPERATION_TYPE` wire code is also one-way — once
emitted, clients may handle it, and removing it would break those handlers.
The `HandlerKind` enum shape (`Once(Handler) | Stream(StreamingHandler)`) is
the one-way commitment: two handler variants, validated against `op_type`.
The concrete `BoxStream` library choice (`futures::stream::BoxStream` vs a
custom type) is a two-way-door implementation detail within the one-way
decision.
## References
- ADR-012: Call Protocol Stream Model (defines `subscribe` as a top-level
protocol operation; the streaming path this ADR implements)
- ADR-017: Call Protocol Client and Adapter Contract (the adapter contract
this ADR extends with the `StreamingHandler` variant)
- ADR-022: Handler Registration, Provenance, and Composition Authority
(`HandlerRegistration` gains `HandlerKind`)
- ADR-015: Privilege Model and Authority Context (visibility/ACL checks run
identically in `invoke_streaming()` as in `invoke()`)
- ADR-016: Abort Cascade for Nested Calls (stream drop on abort; cascade
through streaming handlers)
- ADR-023: Operation Error Schemas (`INVALID_OPERATION_TYPE` is a
protocol-level code, distinct from operation-level domain codes)
- ADR-029: Peer-Graph Routing Model (`from_call` forwarding handlers gain the
streaming variant)
- ADR-009: One-Way Door Decision Framework (the `Handler` / `StreamingHandler`
split is a one-way door — handler API surface)
- `@alkdev/operations/src/types.ts:62-78` — TS prior art
(`OperationHandler` / `SubscriptionHandler` split)
- `@alkdev/operations/src/registry.ts:65-75` — TS prior art
(`validateSubscriptionHandler` — runtime validation against `op_type`)
- `@alkdev/operations/src/call.ts:341-349` — TS prior art (`buildCallHandler`
branches on `op_type`: `SUBSCRIPTION` → iterate + complete; else → execute)
- `@alkdev/pubsub/src/operators.ts` — stream operators prior art (filter,
map, batch, dedupe, window, chain, join — handler-level stream
composition, distinct from `OperationEnv` request/response composition)
- Spec documents amended: `operation-registry.md`, `call-protocol.md`,
`http-server.md`, `http-adapters.md`, `http-mcp.md`, `client-and-adapters.md`