tasks(decomp): ADR-049 streaming handler — 8 atomic tasks + gitignore .worktrees/

Decompose the ADR-049 streaming handler work into 8 dependency-ordered tasks:
- call/registry/streaming-handler-handlerkind (foundation: StreamingHandler,
  HandlerKind, ResponseStream, INVALID_OPERATION_TYPE, migrate all sites)
- call/registry/invoke-streaming (OperationRegistry::invoke_streaming)
- call/protocol/dispatch-streaming-branch (server-side op_type branch)
- call/client/from-call-streaming-forwarding (Subscription → subscribe())
- http/gateway/invoke-streaming (GatewayDispatch::invoke_streaming)
- http/server/subscribe-sse-streaming (/subscribe pipes BoxStream to SSE)
- http/adapters/from-openapi-sse-streaming (SSE → StreamingHandler)
- review-streaming-impl (phase review checkpoint)

Validated with taskgraph: 86 tasks, no cycles. Also ignore .worktrees/ so
agents' worktree workspaces don't leak into git status.
This commit is contained in:
2026-07-02 08:23:27 +00:00
parent 7ecc11610a
commit 07f7607fbb
9 changed files with 1513 additions and 1 deletions

View File

@@ -0,0 +1,172 @@
---
id: call/client/from-call-streaming-forwarding
name: Implement from_call streaming forwarding handler (Subscription → CallConnection::subscribe → StreamingHandler)
status: pending
depends_on: [call/registry/streaming-handler-handlerkind]
scope: narrow
risk: medium
impact: component
level: implementation
---
## Description
Branch `from_call`'s forwarding handler construction on `op_type` so that a
`Subscription` op discovered via `services/list` + `services/schema` registers a
`StreamingHandler` (`HandlerKind::Stream`) that calls
`CallConnection::subscribe()` and forwards the remote stream end-to-end.
`Query`/`Mutation` ops keep the existing `make_forwarding_handler()` (single
`call_with_payload()`, `HandlerKind::Once`). This closes the gap where a
`from_call`-imported `Subscription` truncated to the first value.
This task depends on `call/registry/streaming-handler-handlerkind` (which
introduces `HandlerKind::Stream` and `make_streaming_handler`). The
`CallConnection::subscribe()` client-side path already works (it returns
`impl Stream<Item = ResponseEnvelope>`); this task wires it into the forwarding
handler.
### The branch in build_bundles
`build_bundles` currently constructs one `make_forwarding_handler()` per
discovered op and wraps in `HandlerKind::Once`. Branch on
`op_summary.op_type` (parsed from `services/schema`):
- `Query` / `Mutation``make_forwarding_handler()` (existing), wrap in
`HandlerKind::Once`
- `Subscription``make_streaming_forwarding_handler()` (new), wrap in
`HandlerKind::Stream`
The `op_type` is already parsed in `rebuild_spec_for` (it reads `schema.op_type`
and produces `OperationType::Subscription`). The `OpSummary` needs to carry the
`op_type` (or the spec's `op_type` is read from the constructed `spec`). Read
`spec.op_type` after `rebuild_spec_for` to decide the handler kind.
### make_streaming_forwarding_handler
```rust
fn make_streaming_forwarding_handler(
connection: Arc<CallConnection>,
remote_name: String,
credentials_auth_token: Option<String>,
) -> StreamingHandler {
use crate::registry::registration::make_streaming_handler;
make_streaming_handler(move |input, context| {
let connection = Arc::clone(&connection);
let remote_name = remote_name.clone();
let auth_token = credentials_auth_token.clone();
// The streaming forwarding handler calls subscribe() and forwards the
// remote stream. forwarded_for is populated from context.identity
// (ADR-032), same as the request/response forwarding handler.
async move {
// Build the payload (same as build_forwarded_payload, but for subscribe)
let payload = build_forwarded_payload(&remote_name, input, &context, auth_token.as_deref());
// CallConnection::subscribe takes (operation_id, input); for the
// forwarded payload path, we need a subscribe_with_payload variant,
// OR we call subscribe(remote_name, input) and let it build the
// payload. The forwarded_for + auth_token need to be in the payload,
// so a subscribe_with_payload variant is needed (mirrors
// call_with_payload). Check if CallConnection::subscribe can accept
// a full payload — if not, add subscribe_with_payload().
let stream = connection.subscribe_with_payload(payload).await;
// Map the impl Stream<Item=ResponseEnvelope> to BoxStream<ResponseEnvelope>
Box::pin(stream) as ResponseStream
}
})
}
```
**Coordinate with `CallConnection::subscribe`**: the existing
`subscribe(operation_id, input)` builds the payload internally and does NOT
populate `forwarded_for` or `auth_token`. The forwarding handler needs those
fields (ADR-032). Two options:
1. Add `CallConnection::subscribe_with_payload(payload: Value)` (mirrors
`call_with_payload`) that takes a caller-constructed payload. The forwarding
handler builds the payload with `build_forwarded_payload` and calls
`subscribe_with_payload`.
2. Extend `subscribe()` to accept optional `forwarded_for` / `auth_token`.
Option 1 mirrors the existing `call` / `call_with_payload` split and is cleaner.
Add `subscribe_with_payload()` alongside `subscribe()`.
### forwarded_for on the streaming payload
The streaming forwarding handler populates `forwarded_for` from
`context.identity` exactly as the request/response forwarding handler does
(ADR-032 §3) — reuse `build_forwarded_payload()`. The `auth_token` (hub's own
call-protocol token) is also populated identically. No new payload-construction
code; reuse the existing `build_forwarded_payload`.
### Abort cascade (ADR-016 §6)
The streaming forwarding handler's `parent_request_id` participates in the
abort cascade: if the parent is aborted, the cascade reaches this handler,
which sends `call.aborted` to the remote node; the remote node cascades to its
own descendants. Cross-node abort is transparent. The `subscribe_with_payload`
path registers the request in `PendingRequestMap` (the existing `subscribe()`
does this); abort handling is already wired. Verify the streaming forwarding
handler's stream is dropped on parent abort (the `SubscriptionStream`'s `Drop`
or the pending entry's removal handles it).
### What this task does NOT do
- **No `OperationEnv::invoke_streaming()`.** Composition is request/response-only.
- **No server-side dispatch changes.** The server-side streaming branch is
`call/protocol/dispatch-streaming-branch`.
- **No gateway changes.** The gateway streaming path is `http/gateway/invoke-streaming`.
## Acceptance Criteria
- [ ] `build_bundles` branches on `spec.op_type`: `Subscription` → streaming
forwarding handler (`HandlerKind::Stream`), `Query`/`Mutation` → existing
`HandlerKind::Once`
- [ ] `make_streaming_forwarding_handler()` constructs a `StreamingHandler`
- [ ] Streaming forwarding handler calls `CallConnection::subscribe_with_payload()`
(or `subscribe()` with the forwarded payload) and forwards the remote stream
- [ ] `CallConnection::subscribe_with_payload(payload)` exists (mirrors
`call_with_payload`) OR `subscribe()` accepts the forwarded payload
- [ ] `forwarded_for` populated from `context.identity` (ADR-032) on the
streaming payload (reuse `build_forwarded_payload`)
- [ ] `auth_token` populated when present (reuse `build_forwarded_payload`)
- [ ] Remote stream forwarded end-to-end: each `call.responded` → stream item,
`call.completed` → stream end, `call.aborted` → stream dropped
- [ ] No truncation, no first-value fallback
- [ ] `composition_authority: None`, `scoped_env: None` for FromCall streaming
leaves (same as Query/Mutation FromCall leaves)
- [ ] Unit test: `build_bundles` with a `Subscription` op produces a
`HandlerKind::Stream` registration
- [ ] Unit test: `build_bundles` with a `Query` op produces `HandlerKind::Once`
(unchanged)
- [ ] Unit test: `make_streaming_forwarding_handler` produces a `StreamingHandler`
that calls `subscribe_with_payload` (verify via payload capture, mirroring
the existing `forwarding_handler_populates_forwarded_for` test)
- [ ] Integration test: subscription forwarding streams remote events (if
feasible with mock connection; otherwise document the integration test as
deferred to a connection-level integration test)
- [ ] `cargo test -p alknet-call` succeeds
- [ ] `cargo clippy -p alknet-call --all-targets` succeeds with no warnings
- [ ] `cargo fmt --check -p alknet-call` passes
## References
- docs/architecture/decisions/049-streaming-handler-for-subscriptions.md — ADR-049 §8 (from_call stream forwarding)
- docs/architecture/crates/call/client-and-adapters.md — §from_call (handler branched on op_type; Subscription → StreamingHandler via subscribe())
- docs/architecture/decisions/017-call-protocol-client-and-adapter-contract.md — ADR-017 §3 (from_call flow), §6 (cross-node abort)
- docs/architecture/decisions/032-forwarded-for-identity.md — ADR-032 §3 (forwarded_for population)
## Notes
> The client-side `CallConnection::subscribe()` already works — this task wires
> it into the forwarding handler. The main new piece is
> `subscribe_with_payload()` (mirroring `call_with_payload`) so the forwarding
> handler can populate `forwarded_for` + `auth_token`. Reuse
> `build_forwarded_payload` — no new payload-construction code. The abort
> cascade is already wired via `PendingRequestMap`; verify the stream drops on
> parent abort. The `OpSummary` / `build_bundles` needs the `op_type` to
> branch — read it from the constructed `spec.op_type` (already parsed by
> `rebuild_spec_for`). Cross-node abort is transparent via
> `parent_request_id` (ADR-016 §6).
## Summary
> To be filled on completion

View File

@@ -0,0 +1,174 @@
---
id: call/protocol/dispatch-streaming-branch
name: Wire Dispatcher::handle_stream streaming branch (Subscription → invoke_streaming → write each → call.completed)
status: pending
depends_on: [call/registry/invoke-streaming]
scope: narrow
risk: medium
impact: component
level: implementation
---
## Description
Wire the server-side streaming dispatch branch in
`Dispatcher::handle_stream` / `dispatch_requested`. When a `call.requested`
arrives for a `Subscription` op, the dispatcher must call
`OperationRegistry::invoke_streaming()` and pump the resulting
`ResponseStream` to the wire: each `Ok(value)``call.responded` frame,
`Err``call.error` frame (terminal), natural stream end → `call.completed`
frame. This is the server-side path that makes `Subscription` operations work
end-to-end — without it, a `StreamingHandler`-registered op had no server-side
dispatch path.
This task depends on `call/registry/invoke-streaming` (which provides
`invoke_streaming()`). It adds the `op_type` branch to the dispatch path and
the stream-to-wire pump.
### The branch
`dispatch_requested` currently unconditionally calls `registry.invoke()` and
returns one `ResponseEnvelope`. It needs to know the `op_type` to branch. Two
options:
1. **Look up the registration to get `op_type` before dispatching.** The
`build_root_context` already looks up the registration; expose `op_type`
from it. Then branch: `Subscription` → streaming path, `Query`/`Mutation`
existing `invoke()` path.
2. **Return an enum from `dispatch_requested`** (`DispatchResult::Once(ResponseEnvelope)`
| `DispatchResult::Stream(ResponseStream)`) and let `handle_stream` match on
it for the wire-writing loop.
Pick the cleaner option. Option 1 keeps `dispatch_requested` returning a
`ResponseEnvelope` for the Once path but needs a separate streaming entry point
(e.g., `dispatch_requested_streaming` returning `ResponseStream`). Option 2
unifies the dispatch entry but changes the return type. The spec frames it as
"branches on `op_type`" in `handle_stream`, suggesting the branch lives in the
dispatch layer. Document the choice.
### Streaming dispatch path
For a `Subscription` op:
```rust
// In dispatch_requested (or a new dispatch_requested_streaming):
let context = self.build_root_context(...);
// deadline: None for subscriptions (unbounded — ADR-049 §6, call-protocol Timeouts)
// The build_root_context sets a 30s deadline; for the streaming path, set
// deadline to None AFTER construction (or pass a flag). The spec says
// "deadline: None for subscriptions (unbounded)".
let stream = self.registry.invoke_streaming(&operation_name, input, context);
stream // ResponseStream — pumped by handle_stream
```
### handle_stream streaming pump
In `handle_stream`, after dispatching, if the result is a stream:
```rust
// Read the ResponseStream, write each envelope as an EventEnvelope frame
let mut stream = tokio_stream_into_response_stream(...); // or use StreamExt
while let Some(envelope) = stream.next().await {
let event: EventEnvelope = envelope.into();
if let Err(err) = writer.write_frame(&event).await {
warn!(error = %err, "failed to write streaming frame; closing stream");
break;
}
// If the envelope was an error (Err result), the stream ends after it
// (the StreamingHandler's contract: Err is terminal). The stream's own
// end (None) triggers call.completed below.
}
// Natural stream end → write call.completed
let completed = EventEnvelope::completed(&request_id);
if let Err(err) = writer.write_frame(&completed).await {
warn!(error = %err, "failed to write call.completed");
}
```
The `ResponseEnvelope → EventEnvelope` conversion (`into()`) already exists and
produces `call.responded` for `Ok` and `call.error` for `Err`. The
`call.completed` frame is written once when the stream ends naturally (not on
error — an `Err` envelope is terminal, the stream ends after it, and we do NOT
write `call.completed` after a `call.error`; the stream's `None` after an error
is not a "natural end"). Track whether the last envelope was an error to decide
whether to write `call.completed`. Alternatively, the `StreamingHandler`'s
contract is: `Err` ends the stream (the handler's stream yields the error then
`None`), so after the loop, only write `call.completed` if the stream did not
end on an error. Simplest correct approach: write `call.completed` only on
natural end (the stream returned `None` without the last item being an `Err`).
Track a `last_was_error` flag.
### deadline: None for subscriptions
`build_root_context` sets `deadline: Some(now + 30s)`. For the streaming path,
the spec says `deadline: None` (unbounded — subscriptions are long-running). Set
`context.deadline = None` after `build_root_context` for the streaming branch,
or add a parameter to `build_root_context`. The deadline bounds the
request/response call tree; a subscription has no such bound. Document this.
### Abort cascade (ADR-016)
If `call.aborted` arrives for a streaming request ID, the stream is dropped
(Rust `Drop` releases the handler's resources). The existing `handle_abort`
path already removes the pending entry and cascades. For the streaming branch,
the stream future being dropped (when the `handle_stream` task is cancelled or
the `call.aborted` is processed) releases the handler's resources via `Drop`.
No new abort code is needed — the existing `handle_abort` + the stream's `Drop`
handle it. Verify the streaming pump task is cancellable (it's a `tokio::spawn`
task; aborting the connection cancels it).
### What this task does NOT do
- **No client-side changes.** The client `CallConnection::subscribe()` already
works (it reads `call.responded` events until `call.completed`). This task is
server-side only.
- **No gateway changes.** `GatewayDispatch::invoke_streaming` is
`http/gateway/invoke-streaming`.
## Acceptance Criteria
- [ ] `dispatch_requested` (or a new `dispatch_requested_streaming`) branches on
`op_type`: `Subscription``invoke_streaming()`, `Query`/`Mutation`
`invoke()` (existing)
- [ ] `handle_stream` pumps the `ResponseStream` for the streaming branch:
each `ResponseEnvelope``EventEnvelope` frame
- [ ] Natural stream end → `call.completed` frame written
- [ ] `Err` envelope → `call.error` frame written, stream ends after it (no
`call.completed` after an error)
- [ ] `deadline: None` for the streaming branch (unbounded subscriptions)
- [ ] Abort: `call.aborted` for a streaming request drops the stream (Drop
releases resources; existing `handle_abort` handles the pending entry)
- [ ] Existing `Query`/`Mutation` dispatch path unchanged (one
`call.responded`/`call.error` frame, no `call.completed`)
- [ ] Unit test: `Subscription` op dispatch → multiple `call.responded` frames
+ `call.completed` on stream end
- [ ] Unit test: `Subscription` op handler yields `Err` → one `call.error`
frame, no `call.completed` after
- [ ] Unit test: `Query` op dispatch unchanged (one frame, no `call.completed`)
- [ ] Unit test: `call.aborted` for streaming request → stream dropped
- [ ] `cargo test -p alknet-call` succeeds
- [ ] `cargo clippy -p alknet-call --all-targets` succeeds with no warnings
- [ ] `cargo fmt --check -p alknet-call` passes
## References
- docs/architecture/decisions/049-streaming-handler-for-subscriptions.md — ADR-049 §6 (server-side dispatch branches on op_type)
- docs/architecture/crates/call/call-protocol.md — §CallAdapter Stream Handling (streaming branch: invoke_streaming → write each → call.completed; deadline: None; abort cascade)
- docs/architecture/decisions/016-abort-cascade-for-nested-calls.md — ADR-016 (stream drop on abort)
## Notes
> The streaming pump is a straightforward `while let Some(envelope) = stream.next().await`
> loop — not a complex abstraction. The tricky part is the `call.completed`
> semantics: write it on natural stream end, NOT after an `Err` (which is
> terminal). Track whether the last envelope was an error. `deadline: None` for
> subscriptions is a spec requirement — the 30s request/response deadline does
> not bound a long-running subscription. The abort cascade needs no new code:
> dropping the stream future (via task cancellation or `handle_abort`) releases
> the handler's resources through Rust's `Drop`. Pick the dispatch-entry shape
> (separate streaming method vs unified enum return) and document it — the spec
> frames it as a branch in `handle_stream`, so the branch should be visible there.
## Summary
> To be filled on completion

View File

@@ -0,0 +1,170 @@
---
id: call/registry/invoke-streaming
name: Implement OperationRegistry::invoke_streaming() returning ResponseStream
status: pending
depends_on: [call/registry/streaming-handler-handlerkind]
scope: narrow
risk: medium
impact: component
level: implementation
---
## Description
Add `OperationRegistry::invoke_streaming()` — the streaming dispatch path that
`Subscription` operations use. This is the counterpart to `invoke()`: same
visibility + ACL checks, then dispatches to the `StreamingHandler` and returns
the `ResponseStream`. Pre-handler errors (not-found, forbidden,
`INVALID_OPERATION_TYPE` for a non-Subscription op) yield a single error
`ResponseEnvelope` and end the stream.
This task depends on `call/registry/streaming-handler-handlerkind` (which
introduces `HandlerKind::Stream` and the `ResponseStream` alias). It adds only
the `invoke_streaming()` method — no other changes.
### invoke_streaming()
```rust
use futures::stream::{self, StreamExt};
impl OperationRegistry {
/// Dispatch a Subscription operation. Returns a stream of
/// ResponseEnvelopes. Pre-handler errors (not-found, forbidden,
/// INVALID_OPERATION_TYPE for a non-Subscription op) yield a single
/// error ResponseEnvelope and end the stream.
pub fn invoke_streaming(
&self,
name: &str,
input: Value,
context: OperationContext,
) -> ResponseStream {
let request_id = context.request_id.clone();
// 1. Look up registration
let registration = match self.operations.get(name) {
Some(r) => r,
None => {
return Box::pin(stream::once(async move {
ResponseEnvelope::not_found(request_id, name)
}));
}
};
// 2. Visibility check (same as invoke)
if registration.spec.visibility == Visibility::Internal && !context.internal {
return Box::pin(stream::once(async move {
ResponseEnvelope::not_found(request_id, name)
}));
}
// 3. ACL check (same as invoke)
let acl = &registration.spec.access_control;
let identity = if context.internal {
context.handler_identity.as_ref().and_then(|ca| ca.as_identity())
} else {
context.identity.clone()
};
if let AccessResult::Forbidden(message) = acl.check(identity.as_ref()) {
return Box::pin(stream::once(async move {
ResponseEnvelope::forbidden(request_id, message)
}));
}
// 4. HandlerKind check — must be Stream for invoke_streaming
let streaming_handler = match &registration.handler {
HandlerKind::Stream(h) => Arc::clone(h),
HandlerKind::Once(_) => {
return Box::pin(stream::once(async move {
ResponseEnvelope::error(
request_id,
CallError::invalid_operation_type(
"invoke_streaming() called on a Query/Mutation op; use invoke()"
),
)
}));
}
};
// 5. Dispatch — the handler returns the stream
streaming_handler(input, context)
}
}
```
The visibility + ACL checks are **identical** to `invoke()` — extract them into
a private helper if it reduces duplication, but the spec requires the security
axis to be provably identical between `invoke()` and `invoke_streaming()`. The
two methods diverge only on the return shape (single envelope vs stream) and
the handler-kind guard (Once vs Stream).
### Pre-handler errors as single-item streams
A pre-handler error (not-found, forbidden, wrong kind) produces a
`ResponseStream` that yields exactly one error `ResponseEnvelope` and then ends.
This matches the single-response path's behavior, just on a stream — the caller
(`Dispatcher::handle_stream` streaming branch, `GatewayDispatch::invoke_streaming`)
drains the stream and writes frames; a one-item error stream produces one
`call.error` frame and closes.
Use `futures::stream::once(async move { ... })` to build these single-item
streams. The error envelope carries the `request_id` from the context.
### What this task does NOT do
- **No `OperationEnv::invoke_streaming()`.** Composition stays
request/response-only (ADR-049). `OperationEnv::invoke()` errors on
`Subscription` (handled in `streaming-handler-handlerkind` via the
`HandlerKind::Stream` match in `LocalOperationEnv``registry.invoke()` and
`OverlayOperationEnv` direct match). No streaming variant is added to the
trait.
- **No dispatch-loop wiring.** `Dispatcher::handle_stream` streaming branch is
`call/protocol/dispatch-streaming-branch`.
- **No gateway wiring.** `GatewayDispatch::invoke_streaming` is
`http/gateway/invoke-streaming`.
## Acceptance Criteria
- [ ] `OperationRegistry::invoke_streaming()` method exists
- [ ] Returns `ResponseStream` (`Pin<Box<dyn Stream<Item = ResponseEnvelope> + Send>>`)
- [ ] Not-found op → single-item stream with `NOT_FOUND` error envelope, then ends
- [ ] Internal op from external call → single-item stream with `NOT_FOUND`, then ends
- [ ] ACL denied → single-item stream with `FORBIDDEN`, then ends
- [ ] `HandlerKind::Once` op (Query/Mutation) → single-item stream with
`INVALID_OPERATION_TYPE`, then ends
- [ ] `HandlerKind::Stream` op (Subscription) → dispatches the `StreamingHandler`,
returns its stream
- [ ] Visibility + ACL checks identical to `invoke()` (same authority switch:
internal → handler_identity, external → identity)
- [ ] Unit test: `invoke_streaming()` on a registered `Subscription` op yields
the handler's stream items
- [ ] Unit test: `invoke_streaming()` on unknown op yields one `NOT_FOUND` then ends
- [ ] Unit test: `invoke_streaming()` on a `Query` op yields one
`INVALID_OPERATION_TYPE` then ends
- [ ] Unit test: `invoke_streaming()` on Internal op from external context yields
one `NOT_FOUND` then ends
- [ ] Unit test: `invoke_streaming()` ACL denied yields one `FORBIDDEN` then ends
- [ ] Unit test: `invoke_streaming()` internal call uses handler_identity for ACL
- [ ] `cargo test -p alknet-call` succeeds
- [ ] `cargo clippy -p alknet-call --all-targets` succeeds with no warnings
- [ ] `cargo fmt --check -p alknet-call` passes
## References
- docs/architecture/decisions/049-streaming-handler-for-subscriptions.md — ADR-049 §3 (invoke_streaming), §4 (invoke errors on Subscription), §5 (OperationEnv request/response-only)
- docs/architecture/crates/call/operation-registry.md — §OperationRegistry (invoke_streaming signature, pre-handler errors as single-item streams)
## Notes
> The visibility + ACL checks MUST be identical to `invoke()` — the spec calls
> this out explicitly: "invoke_streaming() performs the same visibility + ACL
> checks as invoke()". Extract a shared helper if it helps, but the security
> axis must be provably identical. Pre-handler errors become single-item streams
> (one error envelope, then end) — this matches the single-response path's
> behavior, just on a stream. Do NOT add `OperationEnv::invoke_streaming()` —
> composition is request/response-only by design (ADR-049 §5); stream
> composition is a handler-level concern. The `futures` crate's `stream::once`
> and `StreamExt` are the tools for building single-item streams.
## Summary
> To be filled on completion

View File

@@ -0,0 +1,256 @@
---
id: call/registry/streaming-handler-handlerkind
name: Introduce StreamingHandler, HandlerKind, ResponseStream types and migrate HandlerRegistration to HandlerKind
status: pending
depends_on: []
scope: broad
risk: medium
impact: component
level: implementation
---
## Description
ADR-049 restores the streaming handler path that the Rust port dropped when it
collapsed the TS `OperationHandler` / `SubscriptionHandler` union into a single
`Handler`. This task introduces the new types (`StreamingHandler`, `HandlerKind`,
`ResponseStream`, `make_streaming_handler`), adds the `INVALID_OPERATION_TYPE`
protocol error code, changes `HandlerRegistration.handler` from `Handler` to
`HandlerKind`, updates the builder to absorb the wrapping, adds registration-time
validation, updates `invoke()` to error on `Stream`, updates the overlay env to
match on `HandlerKind`, and migrates **every existing construction site** to wrap
in `HandlerKind::Once`.
This is the foundational breaking change — all downstream streaming tasks depend
on it. It is broad in surface area (touches `registration.rs`, `wire.rs`,
`connection.rs`, and every test/adapter that constructs a `HandlerRegistration`)
but each individual change is mechanical. The goal: after this task, the codebase
compiles with two handler kinds, `Query`/`Mutation` ops work exactly as before
(wrapped in `HandlerKind::Once`), and `Subscription` ops are rejected by `invoke()`
with `INVALID_OPERATION_TYPE` (the streaming dispatch path `invoke_streaming()`
is added in `call/registry/invoke-streaming`).
### New types (registration.rs)
```rust
use futures::stream::Stream;
/// Streaming handler — Subscription operations. Returns a stream of
/// ResponseEnvelopes: each Ok(value) → call.responded, an Err → call.error
/// (terminal — stream ends), natural stream end → call.completed.
pub type StreamingHandler = Arc<
dyn Fn(Value, OperationContext)
-> Pin<Box<dyn Stream<Item = ResponseEnvelope> + Send>>
+ Send + Sync,
>;
/// Type alias for the boxed stream shape used by `invoke_streaming()` and
/// `StreamingHandler` return values. `futures::stream::BoxStream<'static, T>`
/// = `Pin<Box<dyn Stream<Item = T> + Send>>` — the concrete library is a
/// two-way-door implementation detail (ADR-049); the alias exists so the two
/// spellings refer to the same type.
pub type ResponseStream = Pin<Box<dyn Stream<Item = ResponseEnvelope> + Send>>;
/// Which dispatch path a handler uses — locked by ADR-049. Validated against
/// `spec.op_type` at registration: Query/Mutation → Once; Subscription → Stream.
/// Mismatch is a startup error.
pub enum HandlerKind {
Once(Handler),
Stream(StreamingHandler),
}
```
`make_streaming_handler()` helper (analogue of `make_handler()`):
```rust
pub fn make_streaming_handler<S, St>(f: S) -> StreamingHandler
where
S: Fn(Value, OperationContext) -> St + Send + Sync + 'static,
St: Stream<Item = ResponseEnvelope> + Send + 'static,
{
Arc::new(move |input, context| Box::pin(f(input, context)))
}
```
### INVALID_OPERATION_TYPE error code (wire.rs)
Add the sixth protocol-level error code to `CallError`:
```rust
pub fn invalid_operation_type(message: impl Into<String>) -> Self {
Self::new("INVALID_OPERATION_TYPE", message, false)
}
```
`retryable: false`, `details: None`. This is a permanent client-side programming
error (wrong dispatch path for the operation's type), not a transient failure.
Clients should treat unknown codes as `INTERNAL` with `retryable: false` (the
existing rule); `INVALID_OPERATION_TYPE` is distinct from `INVALID_INPUT` (schema
mismatch) and `INTERNAL` (handler failure).
### HandlerRegistration.handler → HandlerKind
```rust
pub struct HandlerRegistration {
pub spec: OperationSpec,
pub handler: HandlerKind, // was: Handler
pub provenance: OperationProvenance,
pub composition_authority: Option<CompositionAuthority>,
pub scoped_env: Option<ScopedPeerEnv>,
pub capabilities: Capabilities,
}
```
`HandlerRegistration::new()` takes `handler: HandlerKind` (callers wrap in
`HandlerKind::Once(...)` or `HandlerKind::Stream(...)`).
### Builder absorbs HandlerKind wrapping
The builder inspects `spec.op_type` and wraps automatically — `.with_local()`
and `.with_leaf()` / `.with_leaf_provenance()` take the raw `Handler` (for
Query/Mutation) and wrap it in `HandlerKind::Once`. For Subscription ops, add a
parallel method pair (`.with_local_streaming()` / `.with_leaf_streaming()`) that
takes a `StreamingHandler` and wraps in `HandlerKind::Stream`. The builder
validates `handler` kind matches `spec.op_type` and reports mismatch as a
startup error.
The two-method-pair approach is preferred over a typed enum input because it
keeps the common case (Query/Mutation, `Handler`) on the existing signatures
and makes the streaming case explicit at the call site. Document this choice.
### register() validation
`OperationRegistry::register()` validates that `handler` is the right
`HandlerKind` for `spec.op_type`:
- `Query` / `Mutation``HandlerKind::Once`
- `Subscription``HandlerKind::Stream`
Mismatch is a startup error. Change `register()` to return `Result<(), String>`
(preferred — startup errors should be explicit, not panics) with a clear message
(`"handler kind mismatch: {op_type} requires HandlerKind::{Once|Stream}"`).
Update all `register()` call sites to handle the Result (the builder's `store()`
and tests). Alternatively panic with a clear message — but `Result` is cleaner
for a startup error and matches the `AdapterError` pattern used elsewhere.
### invoke() errors on Stream
`OperationRegistry::invoke()` matches on `registration.handler`:
- `HandlerKind::Once(handler)` → existing dispatch path (unchanged)
- `HandlerKind::Stream(_)` → return `ResponseEnvelope::error(request_id,
CallError::invalid_operation_type("invoke() called on a Subscription op;
use invoke_streaming()"))`
This is the guard that prevents a streaming op from being silently truncated
through the request/response path. The `invoke_streaming()` method itself is
added in `call/registry/invoke-streaming`.
### OverlayOperationEnv (connection.rs)
`OverlayOperationEnv::invoke_with_policy` dispatches directly (it does NOT call
`registry.invoke()` — it reads the handler from the overlay and calls it). After
the type change, `registration.handler` is `HandlerKind`, so the env must match:
- `HandlerKind::Once(handler)` → `handler(input, context).await` (existing path)
- `HandlerKind::Stream(_)` → return `ResponseEnvelope::error(parent.request_id,
CallError::invalid_operation_type("OperationEnv::invoke() called on a
Subscription op; composition is request/response-only"))`
`LocalOperationEnv` calls `self.registry.invoke()` which already errors on
`Stream` — no change needed there. `PeerCompositeEnv` delegates to
session/connection/base envs — no change needed there either.
### Migration of existing construction sites
Every site that constructs `HandlerRegistration::new(spec, handler, ...)` must
wrap `handler` in `HandlerKind::Once(handler)`. This is mechanical. Sites
include (non-exhaustive — find them all with a grep for `HandlerRegistration::new`):
- `crates/alknet-call/src/registry/registration.rs` (tests)
- `crates/alknet-call/src/registry/env.rs` (tests)
- `crates/alknet-call/src/registry/discovery.rs` (`services_list_handler`,
`services_schema_handler` construction — these are `Query` ops)
- `crates/alknet-call/src/protocol/dispatch.rs` (tests)
- `crates/alknet-call/src/protocol/connection.rs` (tests,
`imported_registration` helper)
- `crates/alknet-call/src/client/from_call.rs` (`build_bundles`,
`make_forwarding_handler`, tests)
- `crates/alknet-http/src/gateway/dispatch.rs` (tests)
- `crates/alknet-http/src/server/gateway_routes.rs` (tests)
- `crates/alknet-http/src/adapters/from_openapi.rs` (`build_registration`)
- `crates/alknet-http/src/adapters/from_mcp/mod.rs` (`build_registration`)
The builder sites (`with_local`, `with_leaf`, `with_leaf_provenance`, `with`)
are updated by the builder-absorbs-wrapping change above — their callers pass
raw `Handler` and the builder wraps. Direct `HandlerRegistration::new()` calls
need the explicit `HandlerKind::Once(...)` wrap.
## Acceptance Criteria
- [ ] `StreamingHandler` type alias in `registration.rs`
- [ ] `ResponseStream` type alias (`Pin<Box<dyn Stream<Item = ResponseEnvelope> + Send>>`)
- [ ] `HandlerKind` enum with `Once(Handler)` and `Stream(StreamingHandler)` variants
- [ ] `make_streaming_handler()` helper compiles and works
- [ ] `CallError::invalid_operation_type()` constructor in `wire.rs`
- [ ] `HandlerRegistration.handler` field is `HandlerKind` (not `Handler`)
- [ ] `HandlerRegistration::new()` takes `HandlerKind`
- [ ] Builder `with_local` / `with_leaf` / `with_leaf_provenance` wrap `Handler` in
`HandlerKind::Once` for Query/Mutation
- [ ] Builder `with_local_streaming` / `with_leaf_streaming` wrap `StreamingHandler`
in `HandlerKind::Stream` for Subscription
- [ ] Builder validates `handler` kind matches `spec.op_type` — mismatch is a
startup error
- [ ] `OperationRegistry::register()` validates `HandlerKind` matches `op_type`
(returns `Result<(), String>` or panics with clear message)
- [ ] `OperationRegistry::invoke()` dispatches `HandlerKind::Once` (existing path)
- [ ] `OperationRegistry::invoke()` returns `INVALID_OPERATION_TYPE` for
`HandlerKind::Stream`
- [ ] `OverlayOperationEnv::invoke_with_policy` matches on `HandlerKind`:
`Once` → dispatch, `Stream` → `INVALID_OPERATION_TYPE`
- [ ] `LocalOperationEnv` propagates `INVALID_OPERATION_TYPE` via `registry.invoke()`
(no code change needed — verify)
- [ ] All existing `HandlerRegistration::new()` call sites wrap in
`HandlerKind::Once(...)`
- [ ] All existing builder call sites compile (builder absorbs wrapping)
- [ ] Unit test: `invoke()` on a `Subscription` op (registered with
`HandlerKind::Stream`) returns `INVALID_OPERATION_TYPE`
- [ ] Unit test: `invoke()` on a `Query` op (registered with `HandlerKind::Once`)
dispatches normally
- [ ] Unit test: `register()` rejects `HandlerKind::Once` for a `Subscription` spec
- [ ] Unit test: `register()` rejects `HandlerKind::Stream` for a `Query` spec
- [ ] Unit test: `OverlayOperationEnv::invoke()` on a `Stream`-kind overlay op
returns `INVALID_OPERATION_TYPE`
- [ ] Unit test: `make_streaming_handler` produces a working `StreamingHandler`
- [ ] `cargo test -p alknet-call` succeeds
- [ ] `cargo test -p alknet-http` succeeds
- [ ] `cargo clippy -p alknet-call --all-targets` succeeds with no warnings
- [ ] `cargo clippy -p alknet-http --all-targets` succeeds with no warnings
- [ ] `cargo fmt --check -p alknet-call -p alknet-http` passes
## References
- docs/architecture/decisions/049-streaming-handler-for-subscriptions.md — ADR-049 (the decision)
- docs/architecture/crates/call/operation-registry.md — §Handler (StreamingHandler, HandlerKind, ResponseStream, make_streaming_handler), §OperationRegistry (register validation, invoke errors on Stream), §HandlerRegistration
- docs/architecture/crates/call/call-protocol.md — §call.error Payload (INVALID_OPERATION_TYPE protocol code)
- docs/architecture/decisions/023-operation-error-schemas.md — ADR-023 (amended: six protocol codes)
## Notes
> This is the foundational breaking change. The `HandlerRegistration.handler`
> type flip from `Handler` to `HandlerKind` ripples to every construction site,
> but each change is mechanical (`Handler` → `HandlerKind::Once(handler)`). The
> builder absorbs the wrapping for the common case. The load-bearing parts are:
> (1) `register()` validation catches kind/op_type mismatch at startup, (2)
> `invoke()` errors on `Stream` (the guard that prevents silent truncation), (3)
> `OverlayOperationEnv` matches on `HandlerKind` (it dispatches directly, not via
> `registry.invoke()`). `LocalOperationEnv` needs no change — it delegates to
> `registry.invoke()` which handles it. Do NOT add `invoke_streaming()` in this
> task — that's `call/registry/invoke-streaming`. The `futures` crate is already
> a dependency of `alknet-call`. The two-method-pair builder API
> (`with_local`/`with_local_streaming`) is preferred over a typed enum input —
> it keeps the common case on existing signatures and makes streaming explicit.
## Summary
> To be filled on completion