Decompose the ADR-049 streaming handler work into 8 dependency-ordered tasks: - call/registry/streaming-handler-handlerkind (foundation: StreamingHandler, HandlerKind, ResponseStream, INVALID_OPERATION_TYPE, migrate all sites) - call/registry/invoke-streaming (OperationRegistry::invoke_streaming) - call/protocol/dispatch-streaming-branch (server-side op_type branch) - call/client/from-call-streaming-forwarding (Subscription → subscribe()) - http/gateway/invoke-streaming (GatewayDispatch::invoke_streaming) - http/server/subscribe-sse-streaming (/subscribe pipes BoxStream to SSE) - http/adapters/from-openapi-sse-streaming (SSE → StreamingHandler) - review-streaming-impl (phase review checkpoint) Validated with taskgraph: 86 tasks, no cycles. Also ignore .worktrees/ so agents' worktree workspaces don't leak into git status.
8.4 KiB
id, name, status, depends_on, scope, risk, impact, level
| id | name | status | depends_on | scope | risk | impact | level | |
|---|---|---|---|---|---|---|---|---|
| call/protocol/dispatch-streaming-branch | Wire Dispatcher::handle_stream streaming branch (Subscription → invoke_streaming → write each → call.completed) | pending |
|
narrow | medium | component | implementation |
Description
Wire the server-side streaming dispatch branch in
Dispatcher::handle_stream / dispatch_requested. When a call.requested
arrives for a Subscription op, the dispatcher must call
OperationRegistry::invoke_streaming() and pump the resulting
ResponseStream to the wire: each Ok(value) → call.responded frame,
Err → call.error frame (terminal), natural stream end → call.completed
frame. This is the server-side path that makes Subscription operations work
end-to-end — without it, a StreamingHandler-registered op had no server-side
dispatch path.
This task depends on call/registry/invoke-streaming (which provides
invoke_streaming()). It adds the op_type branch to the dispatch path and
the stream-to-wire pump.
The branch
dispatch_requested currently unconditionally calls registry.invoke() and
returns one ResponseEnvelope. It needs to know the op_type to branch. Two
options:
- Look up the registration to get
op_typebefore dispatching. Thebuild_root_contextalready looks up the registration; exposeop_typefrom it. Then branch:Subscription→ streaming path,Query/Mutation→ existinginvoke()path. - Return an enum from
dispatch_requested(DispatchResult::Once(ResponseEnvelope)|DispatchResult::Stream(ResponseStream)) and lethandle_streammatch on it for the wire-writing loop.
Pick the cleaner option. Option 1 keeps dispatch_requested returning a
ResponseEnvelope for the Once path but needs a separate streaming entry point
(e.g., dispatch_requested_streaming returning ResponseStream). Option 2
unifies the dispatch entry but changes the return type. The spec frames it as
"branches on op_type" in handle_stream, suggesting the branch lives in the
dispatch layer. Document the choice.
Streaming dispatch path
For a Subscription op:
// In dispatch_requested (or a new dispatch_requested_streaming):
let context = self.build_root_context(...);
// deadline: None for subscriptions (unbounded — ADR-049 §6, call-protocol Timeouts)
// The build_root_context sets a 30s deadline; for the streaming path, set
// deadline to None AFTER construction (or pass a flag). The spec says
// "deadline: None for subscriptions (unbounded)".
let stream = self.registry.invoke_streaming(&operation_name, input, context);
stream // ResponseStream — pumped by handle_stream
handle_stream streaming pump
In handle_stream, after dispatching, if the result is a stream:
// Read the ResponseStream, write each envelope as an EventEnvelope frame
let mut stream = tokio_stream_into_response_stream(...); // or use StreamExt
while let Some(envelope) = stream.next().await {
let event: EventEnvelope = envelope.into();
if let Err(err) = writer.write_frame(&event).await {
warn!(error = %err, "failed to write streaming frame; closing stream");
break;
}
// If the envelope was an error (Err result), the stream ends after it
// (the StreamingHandler's contract: Err is terminal). The stream's own
// end (None) triggers call.completed below.
}
// Natural stream end → write call.completed
let completed = EventEnvelope::completed(&request_id);
if let Err(err) = writer.write_frame(&completed).await {
warn!(error = %err, "failed to write call.completed");
}
The ResponseEnvelope → EventEnvelope conversion (into()) already exists and
produces call.responded for Ok and call.error for Err. The
call.completed frame is written once when the stream ends naturally (not on
error — an Err envelope is terminal, the stream ends after it, and we do NOT
write call.completed after a call.error; the stream's None after an error
is not a "natural end"). Track whether the last envelope was an error to decide
whether to write call.completed. Alternatively, the StreamingHandler's
contract is: Err ends the stream (the handler's stream yields the error then
None), so after the loop, only write call.completed if the stream did not
end on an error. Simplest correct approach: write call.completed only on
natural end (the stream returned None without the last item being an Err).
Track a last_was_error flag.
deadline: None for subscriptions
build_root_context sets deadline: Some(now + 30s). For the streaming path,
the spec says deadline: None (unbounded — subscriptions are long-running). Set
context.deadline = None after build_root_context for the streaming branch,
or add a parameter to build_root_context. The deadline bounds the
request/response call tree; a subscription has no such bound. Document this.
Abort cascade (ADR-016)
If call.aborted arrives for a streaming request ID, the stream is dropped
(Rust Drop releases the handler's resources). The existing handle_abort
path already removes the pending entry and cascades. For the streaming branch,
the stream future being dropped (when the handle_stream task is cancelled or
the call.aborted is processed) releases the handler's resources via Drop.
No new abort code is needed — the existing handle_abort + the stream's Drop
handle it. Verify the streaming pump task is cancellable (it's a tokio::spawn
task; aborting the connection cancels it).
What this task does NOT do
- No client-side changes. The client
CallConnection::subscribe()already works (it readscall.respondedevents untilcall.completed). This task is server-side only. - No gateway changes.
GatewayDispatch::invoke_streamingishttp/gateway/invoke-streaming.
Acceptance Criteria
dispatch_requested(or a newdispatch_requested_streaming) branches onop_type:Subscription→invoke_streaming(),Query/Mutation→invoke()(existing)handle_streampumps theResponseStreamfor the streaming branch: eachResponseEnvelope→EventEnvelopeframe- Natural stream end →
call.completedframe written Errenvelope →call.errorframe written, stream ends after it (nocall.completedafter an error)deadline: Nonefor the streaming branch (unbounded subscriptions)- Abort:
call.abortedfor a streaming request drops the stream (Drop releases resources; existinghandle_aborthandles the pending entry) - Existing
Query/Mutationdispatch path unchanged (onecall.responded/call.errorframe, nocall.completed) - Unit test:
Subscriptionop dispatch → multiplecall.respondedframes +call.completedon stream end - Unit test:
Subscriptionop handler yieldsErr→ onecall.errorframe, nocall.completedafter - Unit test:
Queryop dispatch unchanged (one frame, nocall.completed) - Unit test:
call.abortedfor streaming request → stream dropped cargo test -p alknet-callsucceedscargo clippy -p alknet-call --all-targetssucceeds with no warningscargo fmt --check -p alknet-callpasses
References
- docs/architecture/decisions/049-streaming-handler-for-subscriptions.md — ADR-049 §6 (server-side dispatch branches on op_type)
- docs/architecture/crates/call/call-protocol.md — §CallAdapter Stream Handling (streaming branch: invoke_streaming → write each → call.completed; deadline: None; abort cascade)
- docs/architecture/decisions/016-abort-cascade-for-nested-calls.md — ADR-016 (stream drop on abort)
Notes
The streaming pump is a straightforward
while let Some(envelope) = stream.next().awaitloop — not a complex abstraction. The tricky part is thecall.completedsemantics: write it on natural stream end, NOT after anErr(which is terminal). Track whether the last envelope was an error.deadline: Nonefor subscriptions is a spec requirement — the 30s request/response deadline does not bound a long-running subscription. The abort cascade needs no new code: dropping the stream future (via task cancellation orhandle_abort) releases the handler's resources through Rust'sDrop. Pick the dispatch-entry shape (separate streaming method vs unified enum return) and document it — the spec frames it as a branch inhandle_stream, so the branch should be visible there.
Summary
To be filled on completion