174 lines
9.1 KiB
Markdown
174 lines
9.1 KiB
Markdown
---
|
|
id: call/protocol/dispatch-streaming-branch
|
|
name: Wire Dispatcher::handle_stream streaming branch (Subscription → invoke_streaming → write each → call.completed)
|
|
status: completed
|
|
depends_on: [call/registry/invoke-streaming]
|
|
scope: narrow
|
|
risk: medium
|
|
impact: component
|
|
level: implementation
|
|
---
|
|
|
|
## Description
|
|
|
|
Wire the server-side streaming dispatch branch in
|
|
`Dispatcher::handle_stream` / `dispatch_requested`. When a `call.requested`
|
|
arrives for a `Subscription` op, the dispatcher must call
|
|
`OperationRegistry::invoke_streaming()` and pump the resulting
|
|
`ResponseStream` to the wire: each `Ok(value)` → `call.responded` frame,
|
|
`Err` → `call.error` frame (terminal), natural stream end → `call.completed`
|
|
frame. This is the server-side path that makes `Subscription` operations work
|
|
end-to-end — without it, a `StreamingHandler`-registered op had no server-side
|
|
dispatch path.
|
|
|
|
This task depends on `call/registry/invoke-streaming` (which provides
|
|
`invoke_streaming()`). It adds the `op_type` branch to the dispatch path and
|
|
the stream-to-wire pump.
|
|
|
|
### The branch
|
|
|
|
`dispatch_requested` currently unconditionally calls `registry.invoke()` and
|
|
returns one `ResponseEnvelope`. It needs to know the `op_type` to branch. Two
|
|
options:
|
|
|
|
1. **Look up the registration to get `op_type` before dispatching.** The
|
|
`build_root_context` already looks up the registration; expose `op_type`
|
|
from it. Then branch: `Subscription` → streaming path, `Query`/`Mutation` →
|
|
existing `invoke()` path.
|
|
2. **Return an enum from `dispatch_requested`** (`DispatchResult::Once(ResponseEnvelope)`
|
|
| `DispatchResult::Stream(ResponseStream)`) and let `handle_stream` match on
|
|
it for the wire-writing loop.
|
|
|
|
Pick the cleaner option. Option 1 keeps `dispatch_requested` returning a
|
|
`ResponseEnvelope` for the Once path but needs a separate streaming entry point
|
|
(e.g., `dispatch_requested_streaming` returning `ResponseStream`). Option 2
|
|
unifies the dispatch entry but changes the return type. The spec frames it as
|
|
"branches on `op_type`" in `handle_stream`, suggesting the branch lives in the
|
|
dispatch layer. Document the choice.
|
|
|
|
### Streaming dispatch path
|
|
|
|
For a `Subscription` op:
|
|
|
|
```rust
|
|
// In dispatch_requested (or a new dispatch_requested_streaming):
|
|
let context = self.build_root_context(...);
|
|
// deadline: None for subscriptions (unbounded — ADR-049 §6, call-protocol Timeouts)
|
|
// The build_root_context sets a 30s deadline; for the streaming path, set
|
|
// deadline to None AFTER construction (or pass a flag). The spec says
|
|
// "deadline: None for subscriptions (unbounded)".
|
|
let stream = self.registry.invoke_streaming(&operation_name, input, context);
|
|
stream // ResponseStream — pumped by handle_stream
|
|
```
|
|
|
|
### handle_stream streaming pump
|
|
|
|
In `handle_stream`, after dispatching, if the result is a stream:
|
|
|
|
```rust
|
|
// Read the ResponseStream, write each envelope as an EventEnvelope frame
|
|
let mut stream = tokio_stream_into_response_stream(...); // or use StreamExt
|
|
while let Some(envelope) = stream.next().await {
|
|
let event: EventEnvelope = envelope.into();
|
|
if let Err(err) = writer.write_frame(&event).await {
|
|
warn!(error = %err, "failed to write streaming frame; closing stream");
|
|
break;
|
|
}
|
|
// If the envelope was an error (Err result), the stream ends after it
|
|
// (the StreamingHandler's contract: Err is terminal). The stream's own
|
|
// end (None) triggers call.completed below.
|
|
}
|
|
// Natural stream end → write call.completed
|
|
let completed = EventEnvelope::completed(&request_id);
|
|
if let Err(err) = writer.write_frame(&completed).await {
|
|
warn!(error = %err, "failed to write call.completed");
|
|
}
|
|
```
|
|
|
|
The `ResponseEnvelope → EventEnvelope` conversion (`into()`) already exists and
|
|
produces `call.responded` for `Ok` and `call.error` for `Err`. The
|
|
`call.completed` frame is written once when the stream ends naturally (not on
|
|
error — an `Err` envelope is terminal, the stream ends after it, and we do NOT
|
|
write `call.completed` after a `call.error`; the stream's `None` after an error
|
|
is not a "natural end"). Track whether the last envelope was an error to decide
|
|
whether to write `call.completed`. Alternatively, the `StreamingHandler`'s
|
|
contract is: `Err` ends the stream (the handler's stream yields the error then
|
|
`None`), so after the loop, only write `call.completed` if the stream did not
|
|
end on an error. Simplest correct approach: write `call.completed` only on
|
|
natural end (the stream returned `None` without the last item being an `Err`).
|
|
Track a `last_was_error` flag.
|
|
|
|
### deadline: None for subscriptions
|
|
|
|
`build_root_context` sets `deadline: Some(now + 30s)`. For the streaming path,
|
|
the spec says `deadline: None` (unbounded — subscriptions are long-running). Set
|
|
`context.deadline = None` after `build_root_context` for the streaming branch,
|
|
or add a parameter to `build_root_context`. The deadline bounds the
|
|
request/response call tree; a subscription has no such bound. Document this.
|
|
|
|
### Abort cascade (ADR-016)
|
|
|
|
If `call.aborted` arrives for a streaming request ID, the stream is dropped
|
|
(Rust `Drop` releases the handler's resources). The existing `handle_abort`
|
|
path already removes the pending entry and cascades. For the streaming branch,
|
|
the stream future being dropped (when the `handle_stream` task is cancelled or
|
|
the `call.aborted` is processed) releases the handler's resources via `Drop`.
|
|
No new abort code is needed — the existing `handle_abort` + the stream's `Drop`
|
|
handle it. Verify the streaming pump task is cancellable (it's a `tokio::spawn`
|
|
task; aborting the connection cancels it).
|
|
|
|
### What this task does NOT do
|
|
|
|
- **No client-side changes.** The client `CallConnection::subscribe()` already
|
|
works (it reads `call.responded` events until `call.completed`). This task is
|
|
server-side only.
|
|
- **No gateway changes.** `GatewayDispatch::invoke_streaming` is
|
|
`http/gateway/invoke-streaming`.
|
|
|
|
## Acceptance Criteria
|
|
|
|
- [ ] `dispatch_requested` (or a new `dispatch_requested_streaming`) branches on
|
|
`op_type`: `Subscription` → `invoke_streaming()`, `Query`/`Mutation` →
|
|
`invoke()` (existing)
|
|
- [ ] `handle_stream` pumps the `ResponseStream` for the streaming branch:
|
|
each `ResponseEnvelope` → `EventEnvelope` frame
|
|
- [ ] Natural stream end → `call.completed` frame written
|
|
- [ ] `Err` envelope → `call.error` frame written, stream ends after it (no
|
|
`call.completed` after an error)
|
|
- [ ] `deadline: None` for the streaming branch (unbounded subscriptions)
|
|
- [ ] Abort: `call.aborted` for a streaming request drops the stream (Drop
|
|
releases resources; existing `handle_abort` handles the pending entry)
|
|
- [ ] Existing `Query`/`Mutation` dispatch path unchanged (one
|
|
`call.responded`/`call.error` frame, no `call.completed`)
|
|
- [ ] Unit test: `Subscription` op dispatch → multiple `call.responded` frames
|
|
+ `call.completed` on stream end
|
|
- [ ] Unit test: `Subscription` op handler yields `Err` → one `call.error`
|
|
frame, no `call.completed` after
|
|
- [ ] Unit test: `Query` op dispatch unchanged (one frame, no `call.completed`)
|
|
- [ ] Unit test: `call.aborted` for streaming request → stream dropped
|
|
- [ ] `cargo test -p alknet-call` succeeds
|
|
- [ ] `cargo clippy -p alknet-call --all-targets` succeeds with no warnings
|
|
- [ ] `cargo fmt --check -p alknet-call` passes
|
|
|
|
## References
|
|
|
|
- docs/architecture/decisions/049-streaming-handler-for-subscriptions.md — ADR-049 §6 (server-side dispatch branches on op_type)
|
|
- docs/architecture/crates/call/call-protocol.md — §CallAdapter Stream Handling (streaming branch: invoke_streaming → write each → call.completed; deadline: None; abort cascade)
|
|
- docs/architecture/decisions/016-abort-cascade-for-nested-calls.md — ADR-016 (stream drop on abort)
|
|
|
|
## Notes
|
|
|
|
> The streaming pump is a straightforward `while let Some(envelope) = stream.next().await`
|
|
> loop — not a complex abstraction. The tricky part is the `call.completed`
|
|
> semantics: write it on natural stream end, NOT after an `Err` (which is
|
|
> terminal). Track whether the last envelope was an error. `deadline: None` for
|
|
> subscriptions is a spec requirement — the 30s request/response deadline does
|
|
> not bound a long-running subscription. The abort cascade needs no new code:
|
|
> dropping the stream future (via task cancellation or `handle_abort`) releases
|
|
> the handler's resources through Rust's `Drop`. Pick the dispatch-entry shape
|
|
> (separate streaming method vs unified enum return) and document it — the spec
|
|
> frames it as a branch in `handle_stream`, so the branch should be visible there.
|
|
|
|
## Summary
|
|
|
|
> Added DispatchResult enum (Once(ResponseEnvelope) | Stream(ResponseStream)) and Dispatcher::dispatch() branching on op_type (looked up via registry.registration). handle_stream matches on DispatchResult — the branch is visible there (spec framing). Streaming pump writes each ResponseEnvelope → EventEnvelope frame; call.completed on natural end only when !last_was_error (Err is terminal, no call.completed after). deadline: None for streaming branch. Abort via Drop (no new code). Existing Query/Mutation path unchanged. Added 7 unit tests (dispatch branch, deadline clearing, pump frames, error terminal, query unchanged, unknown op, abort drops stream). 306 tests pass. |