From 7ecc11610a6e67f7d0b769f575cf444b740456f7 Mon Sep 17 00:00:00 2001 From: "glm-5.2" Date: Thu, 2 Jul 2026 07:43:01 +0000 Subject: [PATCH] =?UTF-8?q?docs(arch):=20ADR-049=20=E2=80=94=20streaming?= =?UTF-8?q?=20handler=20for=20subscription=20operations?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The call protocol spec describes streaming (call.responded*N + call.completed, PendingRequestMap::Subscribe, CallConnection::subscribe), but the server-side Handler type returned a single ResponseEnvelope — a Subscription op had no way to produce a stream. The TS predecessor (@alkdev/operations) had separate OperationHandler / SubscriptionHandler types; the Rust port collapsed them, losing the streaming path. This restores it end-to-end: StreamingHandler type, HandlerKind on HandlerRegistration validated against op_type, invoke_streaming() on OperationRegistry, server-side dispatch branches on op_type, new INVALID_OPERATION_TYPE protocol code for wrong-dispatch-path misuse, GatewayDispatch::invoke_streaming() for /subscribe SSE, from_call stream forwarding via CallConnection::subscribe(), from_openapi SSE forwarding. OperationEnv::invoke() stays request/response-only (stream composition is handler-level, not protocol-level). Amends ADR-023's protocol-code list (five → six). Tracks the stream-operators library as OQ-41 (feature extension, not an unmade decision). --- docs/architecture/README.md | 4 +- .../architecture/crates/call/call-protocol.md | 13 +- .../crates/call/client-and-adapters.md | 20 +- .../crates/call/operation-registry.md | 108 +++++- .../architecture/crates/http/http-adapters.md | 27 +- docs/architecture/crates/http/http-mcp.md | 51 +-- docs/architecture/crates/http/http-server.md | 36 +- .../decisions/023-operation-error-schemas.md | 31 +- ...049-streaming-handler-for-subscriptions.md | 337 ++++++++++++++++++ docs/architecture/open-questions.md | 51 ++- 10 files changed, 602 insertions(+), 76 deletions(-) create mode 100644 docs/architecture/decisions/049-streaming-handler-for-subscriptions.md diff --git a/docs/architecture/README.md b/docs/architecture/README.md index 57db3d0..a0ef848 100644 --- a/docs/architecture/README.md +++ b/docs/architecture/README.md @@ -1,6 +1,6 @@ --- status: draft -last_updated: 2026-06-30 +last_updated: 2026-07-02 --- # Alknet Architecture @@ -102,6 +102,7 @@ The alknet-call crate is **implemented and reviewed** — both the server-side c | [046](decisions/046-assembly-layer-custom-http-routes.md) | Assembly-Layer Custom HTTP Routes on HttpAdapter | Proposed | | [047](decisions/047-remove-direct-call-http-surface.md) | Remove the Direct-Call HTTP Surface; Gateway Is the Sole Invoke Path | Proposed | | [048](decisions/048-websocket-native-session-not-gateway.md) | WebSocket Carries the Native Call-Protocol Session, Not the Gateway Shape | Accepted | +| [049](decisions/049-streaming-handler-for-subscriptions.md) | Streaming Handler for Subscription Operations | Accepted | ## Open Questions @@ -152,6 +153,7 @@ See [open-questions.md](open-questions.md) for the full tracker. - **OQ-37**: ~~X.509 outgoing-only case~~ — **resolved by ADR-034** (three remote roles named: public X.509 endpoint, transport relay, hub; `PeerEntry` asymmetry is correct; client-side verifier selection by `PeerEntry` presence) - **OQ-38**: WebTransport standalone relay service scope — the standalone relay (future `alknet-relay`, fork of iroh-relay with WebTransport proxy fallback) is distinct from the in-process ALPN-stream-proxy (ADR-040); scope question, not deferral - **OQ-39**: ~~`to_openapi` published-spec versioning~~ — **resolved by ADR-045** (`info.version` semver tracks the gateway endpoint contract, not the operation set; per-caller operations discovered via `/search`) +- **OQ-41**: Stream operators library — a handler-level utility library (filter, map, batch, dedupe, window, etc. on `BoxStream`), prior art in `@alkdev/pubsub/operators.ts`; feature extension, not an architectural decision (the architecture decision — stream composition is handler-level, not protocol-level — is made in ADR-049) **Deferred (not active):** - **OQ-09**: WASM target boundaries — design constraint, not deliverable diff --git a/docs/architecture/crates/call/call-protocol.md b/docs/architecture/crates/call/call-protocol.md index 23d6deb..4866c04 100644 --- a/docs/architecture/crates/call/call-protocol.md +++ b/docs/architecture/crates/call/call-protocol.md @@ -1,6 +1,6 @@ --- status: draft -last_updated: 2026-06-23 +last_updated: 2026-07-02 --- # Call Protocol @@ -275,6 +275,7 @@ Error codes use an extensible string enum. The protocol defines the following ** - `NOT_FOUND` — operation not in registry (or Internal op called from wire) - `FORBIDDEN` — access denied (insufficient scopes or unauthenticated) - `INVALID_INPUT` — input doesn't match the operation's JSON Schema +- `INVALID_OPERATION_TYPE` — wrong dispatch path for the operation's type (`invoke()` called on a `Subscription`, or `invoke_streaming()` on a `Query`/`Mutation`, or `OperationEnv::invoke()` on a `Subscription` during composition — ADR-049) - `INTERNAL` — handler error, panic, connection failure - `TIMEOUT` — request timed out (retryable: true) @@ -309,7 +310,7 @@ Local dispatch produces `ResponseEnvelope { request_id, result: Result }` | -The `request_id` becomes the `id` field. For subscriptions, each `call.responded` is a separate `EventEnvelope` with the same `id`; `call.completed` is `{ type: "call.completed", id, payload: {} }`. +The `request_id` becomes the `id` field. For subscriptions, each `call.responded` is a separate `EventEnvelope` with the same `id`; `call.completed` is `{ type: "call.completed", id, payload: {} }`. The streaming dispatch path (`invoke_streaming()` → write each → write `call.completed`) produces these frames from a `StreamingHandler`'s stream; the single-response path (`invoke()` → write one) produces them from a `Handler`'s future. See ADR-049 and [operation-registry.md](operation-registry.md#handler). ### Protocol Operations @@ -405,10 +406,14 @@ The `CallAdapter::handle()` method: 1. Spawns a task that continuously calls `connection.accept_bi()` to receive incoming streams 2. For each accepted stream, reads `EventEnvelope` frames using `FrameFramedReader` -3. Dispatches `call.requested` events to the operation registry +3. Dispatches `call.requested` events to the operation registry, **branching on `op_type`** (ADR-049): + - **`Query` / `Mutation`** → `OperationRegistry::invoke()` → write one `call.responded` (or `call.error`) `EventEnvelope` frame + - **`Subscription`** → `OperationRegistry::invoke_streaming()` → write each `call.responded` `EventEnvelope` as the stream yields → write `call.completed` on natural stream end (or `call.error` if the stream yields an `Err`). `deadline: None` for subscriptions (unbounded — see Timeouts below). Abort (`call.aborted` arriving for the request ID, or the stream being dropped) cascades per ADR-016: the stream future is dropped, `Drop` guards release the handler's resources, and descendants are aborted. 4. Writes response `EventEnvelope` frames using `FrameFramedWriter` 5. Manages `PendingRequestMap` for outgoing calls initiated by the server +The streaming branch is the server-side path that makes `Subscription` operations work end-to-end. Without it, a `Subscription` op registered with a `StreamingHandler` had no server-side dispatch path — the handler produced a stream but the dispatcher only read one `ResponseEnvelope` and closed. ADR-049 adds the `StreamingHandler` type and the `invoke_streaming()` dispatch path; this section wires them into the accept loop. See [operation-registry.md](operation-registry.md#handler) for the `Handler` / `StreamingHandler` / `HandlerKind` types. + For outgoing calls (server → client), the adapter: 1. Opens a bidirectional stream with `connection.open_bi()` 2. Sends `call.requested` on that stream @@ -562,6 +567,7 @@ Handlers clean up resources when their call is cancelled (in Rust, the future is | Peer-graph routing model (supersedes ADR-028) | [ADR-029](../../decisions/029-peer-graph-routing-model.md) | Peer-keyed overlays + `PeerRef` routing; `AccessControl`-based peer authorization; retires `remote_safe`/`trusted_peer` | | Forwarded-for identity | [ADR-032](../../decisions/032-forwarded-for-identity.md) | `forwarded_for` field on `call.requested` and `OperationContext`; metadata only — `AccessControl::check` never reads it; the `from_call` handler populates it | | Operation error schemas | [ADR-023](../../decisions/023-operation-error-schemas.md) | Operations declare domain errors; `call.error` carries typed `details` | +| Streaming handler for subscriptions | [ADR-049](../../decisions/049-streaming-handler-for-subscriptions.md) | `StreamingHandler` type, `invoke_streaming()` dispatch path, `INVALID_OPERATION_TYPE` protocol code; the server-side streaming branch in `handle_stream` | ## Open Questions @@ -615,4 +621,5 @@ See [open-questions.md](../../open-questions.md) for full details. - ADR-030: PeerEntry and Identity.id decoupling (`PeerId` source) - ADR-032: Forwarded-for identity (`forwarded_for` on `call.requested` and `OperationContext`) - ADR-034: Outgoing-only X.509 and the three peer roles +- ADR-049: Streaming handler for subscriptions (server-side streaming dispatch path) - Reference implementation: `/workspace/@alkdev/alknet-main/crates/alknet-core/src/call/` \ No newline at end of file diff --git a/docs/architecture/crates/call/client-and-adapters.md b/docs/architecture/crates/call/client-and-adapters.md index ab4305e..d063120 100644 --- a/docs/architecture/crates/call/client-and-adapters.md +++ b/docs/architecture/crates/call/client-and-adapters.md @@ -1,6 +1,6 @@ --- status: draft -last_updated: 2026-06-28 +last_updated: 2026-07-02 --- # alknet-call — Client and Adapters @@ -323,8 +323,21 @@ The flow (ADR-017 §3): 3. For each discovered op, construct a `HandlerRegistration`: - `spec` mirrors the remote op's name (with optional prefix), namespace, type, schemas, access control. - - `handler` is a forwarding handler: sends `call.requested` through the - `CallConnection`, awaits `call.responded` (or streams for subscriptions). + - `handler` is a forwarding handler, **branched on `op_type`** (ADR-049): + - `Query` / `Mutation` → a `Handler` (registered as `HandlerKind::Once`): + sends `call.requested` via `CallConnection::call_with_payload()`, awaits + the single `call.responded` (or `call.error`), returns the + `ResponseEnvelope`. + - `Subscription` → a `StreamingHandler` (registered as + `HandlerKind::Stream`): calls `CallConnection::subscribe()`, which + returns `impl Stream` (the client-side + streaming path, already implemented), maps it to a + `BoxStream`. The remote stream flows end-to-end: + each `call.responded` the remote sends becomes a stream item; the + remote's `call.completed` ends the stream (→ wire `call.completed`); + `call.aborted` drops the stream (cascade per ADR-016). No truncation, + no first-value fallback — a `from_call`-imported subscription forwards + the full remote stream. - `provenance: FromCall`, `composition_authority: None`, `scoped_env: None` (leaf — ADR-022). 4. The caller registers the bundles via @@ -668,6 +681,7 @@ Based on the gap analysis and the downstream unblock chain: | Privilege model and authority context | [ADR-015](../../decisions/015-privilege-model-and-authority-context.md) | Adapter-registered ops are `Internal` by default; default-deny posture | | Abort cascade for nested calls | [ADR-016](../../decisions/016-abort-cascade-for-nested-calls.md) | Cross-node abort through `from_call` forwarding handler's `parent_request_id` | | Operation error schemas | [ADR-023](../../decisions/023-operation-error-schemas.md) | `error_schemas` mirrored by `from_call` from remote op's spec | +| Streaming handler for subscriptions | [ADR-049](../../decisions/049-streaming-handler-for-subscriptions.md) | `from_call` `Subscription` ops register a `StreamingHandler` (`HandlerKind::Stream`) that calls `CallConnection::subscribe()` and forwards the remote stream; `Query`/`Mutation` stay `HandlerKind::Once` | | TLS identity redesign | [ADR-027](../../decisions/027-tls-identity-redesign-acme-rawkey-decoupling.md) | RFC 7250 raw key / X.509 cert dimensions of `CallCredentials` | | Outgoing-only X.509 and three peer roles | [ADR-034](../../decisions/034-outgoing-only-x509-and-three-peer-roles.md) | Public X.509 endpoint is not a `PeerEntry` on the client side (no `PeerId`, not in peer graph); client-side verifier by `PeerEntry` presence (CA vs fingerprint pin); hub = mixed-fingerprint `PeerEntry` | | HD derivation for encryption keys | [ADR-020](../../decisions/020-hd-derivation-for-encryption-keys.md) | Vault-derived TLS identity material | diff --git a/docs/architecture/crates/call/operation-registry.md b/docs/architecture/crates/call/operation-registry.md index 1c4ab89..ff5f05c 100644 --- a/docs/architecture/crates/call/operation-registry.md +++ b/docs/architecture/crates/call/operation-registry.md @@ -1,6 +1,6 @@ --- status: draft -last_updated: 2026-06-27 +last_updated: 2026-07-02 --- # Operation Registry @@ -91,19 +91,75 @@ Operations with empty `AccessControl` (no required scopes, no resource checks) a ### Handler +There are two handler types, one per dispatch shape — mirroring the +TypeScript prior art (`@alkdev/operations/src/types.ts:62-78`: +`OperationHandler` returns a single value; `SubscriptionHandler` returns an +`AsyncGenerator`). The split is locked by ADR-049. + ```rust -pub type Handler = Arc Pin + Send>> + Send + Sync>; +/// Request/response handler — Query and Mutation operations. +pub type Handler = Arc< + dyn Fn(Value, OperationContext) -> Pin + Send>> + + Send + Sync, +>; + +/// Streaming handler — Subscription operations. Returns a stream of +/// ResponseEnvelopes: each Ok(value) → call.responded, an Err → call.error +/// (terminal — stream ends), natural stream end → call.completed. +pub type StreamingHandler = Arc< + dyn Fn(Value, OperationContext) + -> Pin + Send>> + + Send + Sync, +>; + +/// Type alias for the boxed stream shape used by `invoke_streaming()` and +/// `StreamingHandler` return values. The concrete library +/// (`futures::stream::BoxStream<'static, T>` = `Pin +/// + Send>>`) is a two-way-door implementation detail (ADR-049); the alias +/// exists so the two spellings (the expanded form in `StreamingHandler` and +/// the short form in `invoke_streaming()`) refer to the same type. +pub type ResponseStream = Pin + Send>>; ``` -Handlers are async — many operations (file I/O, HTTP service calls, irpc service calls) are inherently asynchronous. The handler receives an `async` runtime context and returns a `Future`. +Both handlers are async — many operations (file I/O, HTTP service calls, +irpc service calls, LLM streaming) are inherently asynchronous. A handler +receives: -A handler receives: -- `input: Value` — the deserialized `payload` from the `call.requested` event (always `serde_json::Value`) +- `input: Value` — the deserialized `payload` from the `call.requested` event + (always `serde_json::Value`) - `context: OperationContext` — request ID, identity, metadata, env -And returns a `ResponseEnvelope` containing the result or an error. `ResponseEnvelope` is defined in [call-protocol.md](call-protocol.md#responseenvelope) — it carries the request ID and a `Result`. Local dispatch produces it with no serialization overhead; the `CallAdapter` converts it to `EventEnvelope` for the wire. +The **`Handler`** (request/response) returns a single `ResponseEnvelope` +containing the result or an error. `ResponseEnvelope` is defined in +[call-protocol.md](call-protocol.md#responseenvelope) — it carries the request +ID and a `Result`. Local dispatch produces it with no +serialization overhead; the `CallAdapter` converts it to `EventEnvelope` for +the wire. -When a handler returns an error, the `CallError.code` is matched against the operation's declared `error_schemas` (ADR-023). If the code matches a declared `ErrorDefinition`, the `call.error` event carries that code and the error's detail payload. If it doesn't match, the `call.error` carries `INTERNAL`. This is how handler failures become typed errors on the wire instead of string-matched messages. +The **`StreamingHandler`** (streaming) returns a `Pin + Send>>` — the stream analogue of `Handler`'s +`Pin>>`. Each `Ok(value)` in the stream becomes a +`call.responded` event; an `Err` becomes a `call.error` event (terminal — the +stream ends after it); natural stream end becomes `call.completed`. The +dispatch path converts each `ResponseEnvelope` to `EventEnvelope` exactly as +it does for the single-response case — no new wire-format concept is +introduced. See ADR-049 and [call-protocol.md](call-protocol.md) §"CallAdapter +Stream Handling". + +When a handler returns an error, the `CallError.code` is matched against the operation's declared `error_schemas` (ADR-023). If the code matches a declared `ErrorDefinition`, the `call.error` event carries that code and the error's detail payload. If it doesn't match, the `call.error` carries `INTERNAL`. This is how handler failures become typed errors on the wire instead of string-matched messages. The same matching applies to `Err` values yielded by a `StreamingHandler`. + +A `make_streaming_handler()` helper (analogue of `make_handler()`) wraps a +stream-producing closure into a `StreamingHandler`: + +```rust +pub fn make_streaming_handler(f: S) -> StreamingHandler +where + S: Fn(Value, OperationContext) -> St + Send + Sync + 'static, + St: Stream + Send + 'static, +{ + Arc::new(move |input, context| Box::pin(f(input, context))) +} +``` ### OperationContext @@ -196,9 +252,10 @@ pub struct OperationRegistry { The registry maps operation names to `HandlerRegistration` bundles. The curated layer (Layer 0) is a `HashMap`; session and connection overlays (Layers 1 and 2) are separate maps that the `CallAdapter` composes into the per-call `OperationContext.env` (ADR-024). See ADR-022 for the full registration model and ADR-024 for the layering model. Key methods: -- `register(registration)`: Add an operation to the curated layer at startup -- `registration(name)`: Find a registration by operation name (checks active overlays first, then curated base — ADR-024). Returns spec, handler, provenance, composition authority, scoped env, capabilities. -- `invoke(name, input, context)`: Look up, check ACL, invoke handler, return result +- `register(registration)`: Add an operation to the curated layer at startup. Validates `handler` is the right `HandlerKind` for `spec.op_type` (Once for Query/Mutation, Stream for Subscription — ADR-049). Mismatch is a startup error. +- `registration(name)`: Find a registration by operation name (checks active overlays first, then curated base — ADR-024). Returns spec, handler (`HandlerKind`), provenance, composition authority, scoped env, capabilities. +- `invoke(name, input, context)`: Look up, check ACL, invoke handler, return a single `ResponseEnvelope` (request/response path — Query/Mutation). **Errors with `INVALID_OPERATION_TYPE` if the op is a `Subscription`** — `invoke()` is the wrong dispatch path for streaming ops; use `invoke_streaming()` (ADR-049). +- `invoke_streaming(name, input, context)`: Look up, check ACL, invoke streaming handler, return a `ResponseStream` (the boxed stream alias — ADR-049) (streaming path — Subscription). Pre-handler errors (not-found, forbidden, `INVALID_OPERATION_TYPE` for a non-Subscription op) yield a single error `ResponseEnvelope` and end the stream. See ADR-049. - `list_operations()`: Return all registered specs (for `/services/list` — returns curated + active overlay ops) ### Request ID Generation @@ -229,15 +286,23 @@ The registration bundle carries everything the dispatch path needs to construct ```rust pub struct HandlerRegistration { pub spec: OperationSpec, - pub handler: Handler, + pub handler: HandlerKind, // Once or Stream — validated against spec.op_type (ADR-049) pub provenance: OperationProvenance, pub composition_authority: Option, // None for leaves - pub scoped_env: Option, // None for leaves + pub scoped_env: Option, // None for leaves pub capabilities: Capabilities, // NOTE: ADR-028 added `remote_safe: bool` here; ADR-029 supersedes it and // removes the field. Peer authorization is `AccessControl::check(peer_identity)`, // not a per-op boolean. See ADR-029 §3. } + +/// Which dispatch path a handler uses — locked by ADR-049. +/// Validated against `spec.op_type` at registration: +/// Query/Mutation → Once; Subscription → Stream. Mismatch is a startup error. +pub enum HandlerKind { + Once(Handler), + Stream(StreamingHandler), +} ``` #### OperationProvenance @@ -291,19 +356,22 @@ impl CompositionAuthority { - `scoped_env`: The set of operations this handler may reach via `env.invoke()`. `None` for leaves (empty env). The reachability control from ADR-015. - `capabilities`: Outbound credentials (decrypted API keys, signing keys). Populated by the assembly layer from the vault at registration time. See [Capability Injection](#capability-injection). -The `OperationRegistryBuilder` provides a fluent API with convenience methods for common cases: +The `OperationRegistryBuilder` provides a fluent API with convenience methods for common cases. The builder absorbs the `HandlerKind` wrapping internally — `.with_local()` and `.with_leaf()` take the raw `Handler` (or `StreamingHandler`) and wrap it in the right `HandlerKind` based on `spec.op_type` (ADR-049): ```rust // with_local: Local provenance, full bundle — all 5 args required. // with_local(spec, handler, composition_authority, scoped_env, capabilities) +// The builder inspects spec.op_type and wraps in HandlerKind::Once +// (Query/Mutation) or HandlerKind::Stream (Subscription) automatically. let registry = OperationRegistryBuilder::new() // Built-in service discovery (Local, no composition — empty authority, empty env, empty caps) .with_local(services_list_spec(), Arc::new(services_list_handler), CompositionAuthority::none(), ScopedOperationEnv::empty(), Capabilities::new()) .with_local(services_schema_spec(), Arc::new(schema_handler), CompositionAuthority::none(), ScopedOperationEnv::empty(), Capabilities::new()) - // Agent handler (Local, composes — authority + scoped env + capabilities) - .with_local(agent_chat_spec(), Arc::new(agent_chat_handler), + // Agent handler (Local, Subscription — streams call.responded as the + // LLM generates tokens; builder wraps in HandlerKind::Stream) + .with_local(agent_chat_spec(), Arc::new(agent_chat_streaming_handler), CompositionAuthority::new("agent-chat", ["llm:call", "fs:read", "vastai:query"]), ScopedOperationEnv::new(["fs/readFile", "vastai/listMachines", "llm/generate"]), Capabilities::new().with_api_key("google", google_api_key)) @@ -318,6 +386,8 @@ The CLI binary (or assembly layer) constructs the registry and passes it to the The `OperationEnv` trait is the universal composition mechanism. A handler calls `context.env.invoke("fs", "readFile", input, &context)` and gets a `ResponseEnvelope` back — regardless of whether the operation runs locally, via an irpc service, or on a remote node. +**`OperationEnv` is request/response-only** (ADR-049). It returns a single `ResponseEnvelope` — no streaming variant exists. Calling `invoke()` on a `Subscription` op produces `CallError { code: "INVALID_OPERATION_TYPE", ... }` — composition cannot truncate a stream to its first value. Stream composition (filter, map, combine, window, dedupe) is a handler-level concern, not a protocol composition concern; see ADR-049 for the rationale and the `@alkdev/pubsub` `operators.ts` prior art. + ```rust /// The composition dispatch trait. A handler composes child operations /// through its `OperationContext.env` (which implements this trait). @@ -673,10 +743,11 @@ let registry = OperationRegistryBuilder::new() CompositionAuthority::none(), ScopedOperationEnv::empty(), Capabilities::new()) .with_local(services_schema_spec(), Arc::new(schema_handler), CompositionAuthority::none(), ScopedOperationEnv::empty(), Capabilities::new()) - // Agent handler (Local, composes — full bundle via .with()) + // Agent handler (Local, Subscription — composes; streaming handler + // wrapped in HandlerKind::Stream by the builder per ADR-049) .with(HandlerRegistration { spec: agent_chat_spec(), - handler: Arc::new(agent_chat_handler), + handler: HandlerKind::Stream(Arc::new(agent_chat_streaming_handler)), provenance: OperationProvenance::Local, composition_authority: Some(CompositionAuthority::new( "agent-chat", ["llm:call", "fs:read", "vastai:query"])), @@ -750,6 +821,7 @@ The `Capabilities` type holds non-serializable, zeroized secret material. It doe - **The call protocol carries no secret material.** Secret material (private keys, API keys, mnemonics, decrypted credentials) must not appear in `call.requested` payloads, `call.responded` payloads, or `OperationContext.metadata`. See ADR-014. - **Metadata does not propagate through composition.** `OperationEnv::invoke()` constructs fresh metadata for nested calls (`HashMap::new()`), not the parent's metadata. This prevents a handler that accidentally places a secret in metadata from leaking it to child operations — and if a child is a `from_call` operation (ADR-017), across the wire to a remote node. The tracing link is `parent_request_id`, not metadata propagation. See ADR-014. - **Provenance determines composition capability.** Only `Local` and `Session` ops can compose. Leaves (`FromOpenAPI`, `FromMCP`, `FromCall`) get `composition_authority: None` and `scoped_env: None` — they don't compose, so they don't need authority or reachability bounds. See ADR-022. +- **`HandlerKind` matches `op_type`** (ADR-049). `Query`/`Mutation` ops register a `HandlerKind::Once(Handler)`; `Subscription` ops register a `HandlerKind::Stream(StreamingHandler)`. Mismatch is a startup error. `invoke()` on a `Subscription` and `invoke_streaming()` on a `Query`/`Mutation` both return `INVALID_OPERATION_TYPE`. `OperationEnv::invoke()` (composition) is request/response-only and errors with `INVALID_OPERATION_TYPE` on `Subscription` ops — stream composition is a handler-level concern, not a protocol composition concern. ## Design Decisions @@ -768,6 +840,7 @@ The `Capabilities` type holds non-serializable, zeroized secret material. It doe | Peer-graph routing model (supersedes ADR-028) | [ADR-029](../../decisions/029-peer-graph-routing-model.md) | Peer-keyed overlays + `PeerRef` routing; peer authorization via `AccessControl::check(peer_identity)`; retires `remote_safe`/`trusted_peer` (the field this doc's `HandlerRegistration` previously gained) | | Forwarded-for identity | [ADR-032](../../decisions/032-forwarded-for-identity.md) | `forwarded_for` field on `OperationContext` and `call.requested`; metadata only — `AccessControl::check` never reads it; the `from_call` handler populates it | | ~~Peer-scoped registry filtering~~ (superseded) | ~~[ADR-028](../../decisions/028-callclient-peer-scoped-registry-filtering.md)~~ | ~~`remote_safe` marking on `HandlerRegistration`~~ — superseded by ADR-029 | +| Streaming handler for subscriptions | [ADR-049](../../decisions/049-streaming-handler-for-subscriptions.md) | `StreamingHandler` type alongside `Handler`; `HandlerKind` enum on `HandlerRegistration` validated against `op_type`; `invoke_streaming()` on `OperationRegistry`; `invoke()` and `OperationEnv::invoke()` error with `INVALID_OPERATION_TYPE` on `Subscription` ops; composition stays request/response-only, stream composition is handler-level | ## Open Questions @@ -814,4 +887,5 @@ See [open-questions.md](../../open-questions.md) for full details. - ADR-029: Peer-graph routing model (peer-keyed overlays + `PeerRef` routing; `PeerCompositeEnv` supersedes the singular-connection `CompositeOperationEnv`) - ADR-030: PeerEntry and Identity.id decoupling (`PeerId` source = `Identity.id` = `PeerEntry.peer_id`) - ADR-032: Forwarded-for identity (`forwarded_for` on `OperationContext` and `call.requested`; metadata only) +- ADR-049: Streaming handler for subscriptions (`StreamingHandler`, `HandlerKind`, `invoke_streaming()`, `INVALID_OPERATION_TYPE`) - Reference implementation: `/workspace/@alkdev/alknet-main/crates/alknet-core/src/call/` \ No newline at end of file diff --git a/docs/architecture/crates/http/http-adapters.md b/docs/architecture/crates/http/http-adapters.md index 1ddbab7..aecbd11 100644 --- a/docs/architecture/crates/http/http-adapters.md +++ b/docs/architecture/crates/http/http-adapters.md @@ -1,6 +1,6 @@ --- status: draft -last_updated: 2026-07-01 +last_updated: 2026-07-02 --- # HTTP Adapters — from_openapi and to_openapi @@ -123,8 +123,8 @@ The adapter: ### Forwarding handler -The forwarding handler is the `Arc` stored in the -`HandlerRegistration`. At call time, it: +The forwarding handler is stored in the `HandlerRegistration` as a +`HandlerKind` (ADR-049). At call time, it: 1. Reads the call input (`serde_json::Value`). 2. Builds the outbound HTTP request: @@ -138,15 +138,23 @@ The forwarding handler is the `Arc` stored in the below). 4. For a `Query`/`Mutation`: parses the response body (JSON, text, or binary — same content-type branching as the TS `createHTTPOperation`), - wraps it in a `ResponseEnvelope`, returns. + wraps it in a `ResponseEnvelope`, returns. Registered as + `HandlerKind::Once` — a `Handler` returning a single + `ResponseEnvelope`. 5. For a `Subscription` (`text/event-stream` response): streams `call.responded` events as the SSE chunks arrive (same SSE parsing as - the TS `parseSSEFrames`), then `call.completed` on stream end. + the TS `parseSSEFrames`), then the stream ends on SSE close (which + becomes `call.completed` on the wire). Registered as + `HandlerKind::Stream` — a `StreamingHandler` returning a + `BoxStream` (ADR-049). Each SSE `data:` frame becomes + a `ResponseEnvelope::ok()`; an HTTP error (non-2xx) becomes a single + `ResponseEnvelope::error()` and ends the stream. 6. On HTTP error (non-2xx): maps to the declared `ErrorDefinition` by HTTP status code (see Error Fidelity below), returns a `CallError`. -The handler is opaque to the `CallAdapter` — it's an `Arc` -the registry dispatches. `alknet-call` never sees `reqwest`. +The handler is opaque to the `CallAdapter` — it's a `HandlerKind` the +registry dispatches (via `invoke()` for `Once`, `invoke_streaming()` for +`Stream`). `alknet-call` never sees `reqwest`. ### HTTP client (reqwest) @@ -319,9 +327,9 @@ factoring recommendation (thin shared struct, not a trait). `from_openapi` maps OpenAPI non-2xx response status codes to `ErrorDefinition`s (ADR-023 §5). The normative rule (review #002 W20): -`from_openapi` must not produce error codes that collide with the five +`from_openapi` must not produce error codes that collide with the six protocol-level codes (`NOT_FOUND`, `FORBIDDEN`, `INVALID_INPUT`, -`INTERNAL`, `TIMEOUT`). The adapter prefixes imported error codes with +`INVALID_OPERATION_TYPE`, `INTERNAL`, `TIMEOUT`). The adapter prefixes imported error codes with `HTTP_` and the status number: ```rust @@ -423,6 +431,7 @@ once published, the 5-endpoint gateway shape is one-way. | HTTP path = operation path (~~direct-call surface~~) | [ADR-036](../../decisions/036-http-to-call-operation-mapping.md) → superseded by [ADR-047](../../decisions/047-remove-direct-call-http-surface.md) | ~~`POST /{service}/{op}` → `call.requested`~~ — removed; the gateway `/call` with `{ operation, input }` is the sole invoke path; `to_openapi` describes the gateway, not a per-operation surface | | `to_openapi` gateway pattern | [ADR-042](../../decisions/042-openapi-gateway-pattern.md) | 5 fixed gateway endpoints (search/schema/call/batch/subscribe), not one path per operation; per-caller AccessControl-filtered. Supersedes ADR-036's original `to_openapi` "paths mirror `/{service}/{op}`" clause | | `to_openapi` published-spec versioning | [ADR-045](../../decisions/045-to-openapi-gateway-spec-versioning.md) | `info.version` semver tracks the gateway endpoint contract, not the operation set; consumers detect breaking changes via the major version | +| Streaming handler for subscriptions | [ADR-049](../../decisions/049-streaming-handler-for-subscriptions.md) | `from_openapi` `Subscription` ops register a `StreamingHandler` (`HandlerKind::Stream`); SSE response → `BoxStream`; `Query`/`Mutation` stay `HandlerKind::Once` | ## Open Questions diff --git a/docs/architecture/crates/http/http-mcp.md b/docs/architecture/crates/http/http-mcp.md index 4d0c4bc..31bdfb7 100644 --- a/docs/architecture/crates/http/http-mcp.md +++ b/docs/architecture/crates/http/http-mcp.md @@ -1,6 +1,6 @@ --- status: draft -last_updated: 2026-07-01 +last_updated: 2026-07-02 --- # HTTP MCP — from_mcp and to_mcp @@ -78,10 +78,12 @@ The adapter: namespace prefix is configured — same local-naming sugar as `from_call`'s `FromCallConfig::namespace_prefix`, ADR-029 §5). - `spec.namespace` = the configured `namespace`. - - `spec.op_type` = `Mutation` (MCP tools are call/response; the MCP - spec doesn't have a native streaming/tool-subscription distinction - — `tools/call` returns a result. If MCP adds a streaming-tool - extension, a `Subscription` mapping would be added.) + - `spec.op_type` = `Mutation` (MCP tools are call/response; the MCP + spec doesn't have a native streaming/tool-subscription distinction + — `tools/call` returns a result. If MCP adds a streaming-tool + extension, a `Subscription` mapping would be added.) All `from_mcp` + handlers are `HandlerKind::Once` (ADR-049); `from_mcp` never + produces a `StreamingHandler`. - `spec.visibility` = `Internal` (adapter-registered, ADR-015). - `spec.input_schema` = the tool's `inputSchema` (JSON Schema). - `spec.output_schema` = depends on whether the tool declares @@ -128,8 +130,9 @@ At call time, the `from_mcp` forwarding handler: registration (the MCP server is a persistent streamable HTTP endpoint, not a per-call connection). -The handler is opaque to the `CallAdapter` — `Arc` the -registry dispatches. `alknet-call` never sees rmcp. +The handler is opaque to the `CallAdapter` — a `HandlerKind::Once` +wrapping an `Arc` that the registry dispatches. `alknet-call` +never sees rmcp. ### Output handling (structuredContent vs content blocks) @@ -222,7 +225,12 @@ The gateway exposes only `Query` and `Mutation` operations (request/response). `Subscription` operations (streaming) are filtered out of `search` results and cannot be invoked via `call` — MCP tool calls are request/response by protocol design; streaming subscriptions -don't fit the LLM tool-call pattern. See ADR-041 §2. +don't fit the LLM tool-call pattern. This is unaffected by ADR-049 +(streaming handlers): the `StreamingHandler` type and `invoke_streaming()` +dispatch path exist in `alknet-call` and are used by `to_openapi`'s +`/subscribe` endpoint, but `to_mcp` does not expose them — it filters by +`op_type` and only dispatches `Query`/`Mutation` via `invoke()`. See +ADR-041 §2. #### `to_mcp` service behavior @@ -263,19 +271,19 @@ axum route handlers) are genuinely per-gateway and are not shared. Research findings (`docs/research/alknet-http-gateway-factoring/findings.md`) recommend -extracting a **thin shared spine** (a concrete struct holding -`Arc` + `Arc` with a -`resolve + build_context + invoke` method returning a -`ResponseEnvelope`), **not** a `GatewayDispatch` trait or gateway -abstraction. The spine is small (~15–30 lines per endpoint), but it is -the one place where a divergence bug (identity resolved differently, -`OperationContext.internal` set inconsistently, `CallError` mapped -asymmetrically) would be a security/correctness issue. The -server-integration and wire-framing layers stay per-gateway; a third -gateway (GraphQL, gRPC) is not on the horizon, and if one appears its -server-integration layer needs its own shape anyway. This is an -implementation factoring note, not an ADR — the decision is internal to -`alknet-http` and does not cross crate boundaries. +extracting a **thin shared spine** (the concrete `GatewayDispatch` struct +holding `Arc` + `Arc` with a +`resolve + build_context + invoke` method returning a `ResponseEnvelope`, +named in ADR-049 and extended with `invoke_streaming()` for the streaming +path), **not** a trait or gateway abstraction. The spine is small (~15–30 +lines per endpoint), but it is the one place where a divergence bug +(identity resolved differently, `OperationContext.internal` set +inconsistently, `CallError` mapped asymmetrically) would be a +security/correctness issue. The server-integration and wire-framing layers +stay per-gateway; a third gateway (GraphQL, gRPC) is not on the horizon, +and if one appears its server-integration layer needs its own shape anyway. +This is an implementation factoring note, not an ADR — the decision is +internal to `alknet-http` and does not cross crate boundaries. ### No-Env-Vars @@ -340,6 +348,7 @@ every other HTTP request. | Error fidelity | [ADR-023](../../decisions/023-operation-error-schemas.md) | MCP tool errors mapped to `ErrorDefinition`s | | No-env-vars credential injection | [ADR-014](../../decisions/014-secret-material-flow-and-capability-injection.md) | Handler reads `context.capabilities`, not env vars | | MCP clients are not alknet peers | [ADR-034](../../decisions/034-outgoing-only-x509-and-three-peer-roles.md) | Bearer token, no `PeerId` | +| Streaming handler for subscriptions | [ADR-049](../../decisions/049-streaming-handler-for-subscriptions.md) | `from_mcp` handlers are always `HandlerKind::Once` (MCP tools are request/response); `to_mcp` excludes `Subscription` ops (unchanged by the streaming handler) | ## Open Questions diff --git a/docs/architecture/crates/http/http-server.md b/docs/architecture/crates/http/http-server.md index 696ce85..06fcb6a 100644 --- a/docs/architecture/crates/http/http-server.md +++ b/docs/architecture/crates/http/http-server.md @@ -1,6 +1,6 @@ --- status: draft -last_updated: 2026-07-01 +last_updated: 2026-07-02 --- # HTTP Server @@ -194,13 +194,22 @@ The request body is `{ operation, input }` (the same flat JSON shape as `Accept: text/event-stream` on the `POST`). The axum route handler: - Sets `Content-Type: text/event-stream`. -- For each `call.responded` event, writes an SSE `data:` frame (the - event's `output` serialized as JSON). -- On `call.completed`, closes the SSE stream (normal end). -- On `call.aborted`, closes the stream with an SSE error event. -- On HTTP client disconnect (detected as the response writer closing), - sends `call.aborted` for the in-flight subscription, which cascades - to descendants per ADR-016. +- Calls `GatewayDispatch::invoke_streaming()` (ADR-049) — the streaming + analogue of `invoke()`, returning a `BoxStream`. The + security invariants are identical to `invoke()`: `internal: false`, + `forwarded_for: None`, same capabilities, same `scoped_env`, same ACL + check before dispatch. The two methods diverge only on the return shape + (stream vs single envelope). +- For each `ResponseEnvelope` the stream yields, writes an SSE `data:` frame: + `Ok(value)` → `data:` frame with the output serialized as JSON; `Err` → + SSE error event with the `CallError` serialized, then close (an `Err` is + terminal — the stream ends after it, matching the wire protocol's + `call.error` semantics). +- On natural stream end (the `StreamingHandler`'s stream completes), closes + the SSE stream (normal end — corresponds to `call.completed` on the wire). +- On `call.aborted` or HTTP client disconnect (detected as the response + writer closing), drops the stream future — `Drop` guards release the + handler's resources, and the abort cascade runs per ADR-016. This is the HTTP/1.1 + HTTP/2 streaming projection. Over WebSocket ([websocket.md](websocket.md)), the subscription projects directly @@ -209,6 +218,16 @@ no SSE framing. WebTransport (`h3`, deferred per ADR-044) would project onto WebTransport bidirectional streams; see [webtransport.md](webtransport.md). +**The streaming dispatch path.** Pre-ADR-049, `subscribe_handler` called +`GatewayDispatch::invoke()` (single response) and wrapped the one +`ResponseEnvelope` in a one-event SSE stream — a placeholder that couldn't +stream a real `Subscription` op. ADR-049 adds `GatewayDispatch:: +invoke_streaming()` and the underlying `OperationRegistry:: +invoke_streaming()`, giving `/subscribe` a real streaming dispatch path +to call. See ADR-049 and [http-adapters.md](http-adapters.md) for the +`from_openapi` SSE forwarding handler that feeds `StreamingHandler`s from +external `text/event-stream` responses. + ### One-directional projection (HTTP request/response) The HTTP/1.1 + HTTP/2 surface is a **lossy, one-directional projection** @@ -446,6 +465,7 @@ two-way door (add/remove freely). See | Browsers are not alknet peers | [ADR-034](../../decisions/034-outgoing-only-x509-and-three-peer-roles.md) §4 (amended by ADR-044 §5) | Bearer token, no `PeerId`, connection-local overlay (addressability vs. bidirectionality) — full rationale in [websocket.md](websocket.md) | | Error mapping (call codes → HTTP status) | [ADR-023](../../decisions/023-operation-error-schemas.md) | Protocol/operation codes distinct; `HTTP_` prefix for imported | | Custom HTTP routes from the assembly layer | [ADR-046](../../decisions/046-assembly-layer-custom-http-routes.md) | `extra_routes: Option` at construction; raw HTTP, not operations; default surface takes precedence on collision | +| Streaming handler for subscriptions (`invoke_streaming()`) | [ADR-049](../../decisions/049-streaming-handler-for-subscriptions.md) | `GatewayDispatch::invoke_streaming()` returns `BoxStream`; `/subscribe` pipes it to SSE; replaces the one-event placeholder with the real streaming dispatch path | ## Open Questions diff --git a/docs/architecture/decisions/023-operation-error-schemas.md b/docs/architecture/decisions/023-operation-error-schemas.md index 158ad27..9bd301d 100644 --- a/docs/architecture/decisions/023-operation-error-schemas.md +++ b/docs/architecture/decisions/023-operation-error-schemas.md @@ -2,19 +2,23 @@ ## Status -Accepted +Accepted (amended by ADR-049 — protocol-level code list extended to six) ## Context The `OperationSpec` in alknet-call has `input_schema` and `output_schema` but no `error_schemas`. The `call.error` payload (call-protocol.md L128–134) -carries a `code` and `message`, where `code` is one of five infrastructure -codes: `NOT_FOUND`, `FORBIDDEN`, `INVALID_INPUT`, `INTERNAL`, `TIMEOUT`. +carries a `code` and `message`, where `code` is one of six infrastructure +codes: `NOT_FOUND`, `FORBIDDEN`, `INVALID_INPUT`, `INVALID_OPERATION_TYPE`, +`INTERNAL`, `TIMEOUT`. -These five codes cover **protocol-level failures** — the call protocol +These six codes cover **protocol-level failures** — the call protocol itself can always fail to find an operation, deny access, reject bad input, -time out, or hit an internal error. They are emitted by the dispatch -machinery (the registry, the adapter), not by operation handlers. +reject the wrong dispatch method for the operation type, time out, or hit +an internal error. They are emitted by the dispatch machinery (the registry, +the adapter), not by operation handlers. `INVALID_OPERATION_TYPE` was added +by ADR-049 (streaming handler for subscriptions — `invoke()` called on a +`Subscription`, or `invoke_streaming()` on a `Query`/`Mutation`). But operations also have **domain-level failures** that are not covered: @@ -164,8 +168,8 @@ optional-array convention. ``` - `code` — the error code. Either a protocol-level code (`NOT_FOUND`, - `FORBIDDEN`, `INVALID_INPUT`, `INTERNAL`, `TIMEOUT`) or an - operation-level domain code from `error_schemas` (e.g., + `FORBIDDEN`, `INVALID_INPUT`, `INVALID_OPERATION_TYPE`, `INTERNAL`, + `TIMEOUT`) or an operation-level domain code from `error_schemas` (e.g., `FILE_NOT_FOUND`, `RATE_LIMITED`). - `message` — human-readable error message. Unstructured — for logging and debugging, not for programmatic handling. Clients should switch on @@ -182,7 +186,7 @@ optional-array convention. ### 3. Protocol-level vs operation-level error codes -The five existing codes are **protocol-level** — emitted by the dispatch +The six existing codes are **protocol-level** — emitted by the dispatch machinery, not by handlers: | Code | Emitted by | Meaning | @@ -190,6 +194,7 @@ machinery, not by handlers: | `NOT_FOUND` | Registry | Operation not registered (or Internal op called from wire) | | `FORBIDDEN` | Registry / ACL | Caller lacks required scopes, or unauthenticated | | `INVALID_INPUT` | Registry | Input doesn't match `input_schema` | +| `INVALID_OPERATION_TYPE` | Registry / `OperationEnv` | Wrong dispatch path for the operation's type (`invoke()` on a `Subscription`, `invoke_streaming()` on a `Query`/`Mutation`, or `OperationEnv::invoke()` on a `Subscription` during composition — ADR-049) | | `INTERNAL` | Registry / Adapter | Handler panic, unhandled error, connection failure | | `TIMEOUT` | Adapter | Request timed out | @@ -242,8 +247,9 @@ accordingly. ``` **Normative rule (review #002 W20)**: `from_openapi` must not produce error -codes that collide with the five protocol-level codes (`NOT_FOUND`, -`FORBIDDEN`, `INVALID_INPUT`, `INTERNAL`, `TIMEOUT`). The adapter prefixes +codes that collide with the six protocol-level codes (`NOT_FOUND`, +`FORBIDDEN`, `INVALID_INPUT`, `INVALID_OPERATION_TYPE`, `INTERNAL`, +`TIMEOUT`). The adapter prefixes imported error codes with `HTTP_` and the status number (e.g., `HTTP_404`, `HTTP_429`) to avoid collision. This is a requirement for the adapter, not a naming convention — the `from_openapi` example above was previously shown @@ -401,6 +407,9 @@ enum instead of a generic `Result`. for OS-level permission issues) - docs/reviews/001-pre-implementation-architecture-sanity-check.md (finding C5, which this ADR resolves) +- ADR-049: Streaming handler for subscriptions (amends this ADR's + protocol-level code list — `INVALID_OPERATION_TYPE` added as the sixth + protocol-level code) - docs/sdd_process.md L19, L423 (Safe Exit protocol — the general principle of making failure typed and declared) - TypeScript reference: `/workspace/@alkdev/operations/src/types.ts` diff --git a/docs/architecture/decisions/049-streaming-handler-for-subscriptions.md b/docs/architecture/decisions/049-streaming-handler-for-subscriptions.md new file mode 100644 index 0000000..8e35afe --- /dev/null +++ b/docs/architecture/decisions/049-streaming-handler-for-subscriptions.md @@ -0,0 +1,337 @@ +# ADR-049: Streaming Handler for Subscription Operations + +## Status + +Accepted + +## Context + +The call protocol defines `Subscription` as a first-class operation type +(ADR-012 lists `subscribe` as one of four top-level protocol operations; +`OperationSpec.op_type` includes `Subscription`). The wire protocol supports +streaming: five event types (`call.requested`, `call.responded`, +`call.completed`, `call.aborted`, `call.error`), `PendingRequestMap::Subscribe` +with an mpsc channel, `CallConnection::subscribe()` returning +`impl Stream`, and a full streaming-subscribe example +in `call-protocol.md`. The **client side** works — a client can subscribe to a +remote stream and consume `call.responded` events until `call.completed`. + +The **server side does not.** The `Handler` type in `alknet-call` is: + +```rust +pub type Handler = Arc< + dyn Fn(Value, OperationContext) -> Pin + Send>> + + Send + Sync, +>; +``` + +It returns a single `ResponseEnvelope`. `OperationRegistry::invoke()` returns +one `ResponseEnvelope` and closes. `Dispatcher::handle_stream` calls +`dispatch_requested` → `registry.invoke()` → writes one `EventEnvelope` frame → +loops. A `Subscription` operation that should produce a *stream* of +`call.responded` events followed by `call.completed` has no way to express that +through this handler signature. + +This is a **spec gap that should not have shipped.** The TypeScript predecessor +(`@alkdev/operations`, from which the Rust port was derived) had two distinct +handler types: + +```typescript +type OperationHandler = (input: I, context: C) => Promise | O; +type SubscriptionHandler = (input: I, context: C) => AsyncGenerator; +``` + +The TS registry (`registry.ts:21`) stored them as a union, validated at +registration that `SUBSCRIPTION` ops get an `AsyncGeneratorFunction`, and the +server-side dispatch (`call.ts:341-349`, `buildCallHandler`) branched on +`op_type`: `SUBSCRIPTION` → iterate the async generator, `respond()` for each, +then `complete()`; else → `execute()`, single `respond()`. The Rust port +collapsed the union into a single `Handler` returning one `ResponseEnvelope`, +losing the streaming path. The fix is to restore it. + +The downstream consequences of the gap: + +- **`/subscribe` HTTP endpoint** (`GatewayDispatch::invoke()` → + `subscribe_handler`) wraps a single `ResponseEnvelope` in a one-event SSE + stream. A real `Subscription` operation (e.g., `agent/chat` streaming LLM + tokens) cannot stream through it. +- **`from_call` forwarding** for a `Subscription` op calls + `CallConnection::call_with_payload()` (single response), not + `CallConnection::subscribe()` (stream). A `from_call`-imported subscription + truncates to the first value. +- **`from_openapi` forwarding** for a `text/event-stream` response returns one + `ResponseEnvelope` instead of streaming the SSE chunks. + +All three are symptoms of the same root cause: the `Handler` type cannot +produce a stream. + +## Decision + +### 1. `StreamingHandler` type (alongside `Handler`) + +Add a streaming handler type that returns a stream of `ResponseEnvelope`s, +mirroring the TS `SubscriptionHandler` / `OperationHandler` split: + +```rust +pub type StreamingHandler = Arc< + dyn Fn(Value, OperationContext) + -> Pin + Send>> + + Send + Sync, +>; +``` + +Each `Ok(value)` in the stream becomes a `call.responded` event. An `Err` +becomes a `call.error` event (terminal — the stream ends). Natural stream end +becomes `call.completed`. The dispatch path converts each `ResponseEnvelope` to +`EventEnvelope` exactly as it does today for the single-response case — no new +wire-format concept is introduced. + +A `make_streaming_handler()` helper (analogue of `make_handler()`) wraps an +async generator / stream-producing closure into a `StreamingHandler`. + +### 2. `HandlerKind` enum on `HandlerRegistration` + +```rust +pub enum HandlerKind { + Once(Handler), + Stream(StreamingHandler), +} + +pub struct HandlerRegistration { + pub spec: OperationSpec, + pub handler: HandlerKind, // validated against spec.op_type at registration + pub provenance: OperationProvenance, + pub composition_authority: Option, + pub scoped_env: Option, + pub capabilities: Capabilities, +} +``` + +Registration validates: `Query` / `Mutation` → `HandlerKind::Once`; +`Subscription` → `HandlerKind::Stream`. Mismatch is a startup error (same as +the TS `validateSubscriptionHandler`). The enum makes the "one or the other, +matching `op_type`" invariant type-level rather than two `Option`s validated at +runtime. + +### 3. `OperationRegistry::invoke_streaming()` + +```rust +impl OperationRegistry { + /// Dispatch a Subscription operation. Returns a stream of + /// ResponseEnvelopes. Errors (not-found, forbidden, invalid operation + /// type) yield a single ResponseEnvelope::error and end the stream. + pub fn invoke_streaming( + &self, + name: &str, + input: Value, + context: OperationContext, + ) -> BoxStream; +} +``` + +`invoke_streaming()` performs the same visibility + ACL checks as `invoke()`, +then dispatches to the `StreamingHandler`. Pre-handler errors (not-found, +forbidden) produce a single error `ResponseEnvelope` and end the stream +(matching the single-response path's behavior, just on a stream). + +### 4. `OperationRegistry::invoke()` errors on `Subscription` + +`invoke()` is the request/response dispatch path. Calling it on a +`Subscription` op is a type mismatch — a streaming operation dispatched through +the request/response path. It returns a `ResponseEnvelope` carrying +`CallError { code: "INVALID_OPERATION_TYPE", ... }` (a new protocol-level +code): + +``` +INVALID_OPERATION_TYPE +``` + +(`retryable: false`, `details: None`). This is the wire-format addition: a +sixth protocol-level error code. It signals "you called the wrong dispatch +method for this operation's type" — distinct from `INVALID_INPUT` (schema +mismatch) and `INTERNAL` (handler failure). Clients should treat unknown codes +as `INTERNAL` with `retryable: false` (the existing rule); `INVALID_OPERATION_ +TYPE` is a permanent client-side programming error, not a transient failure. + +### 5. `OperationEnv::invoke()` errors on `Subscription` + +`OperationEnv::invoke()` (composition) stays request/response-only. It returns +a single `ResponseEnvelope`. Calling it on a `Subscription` op produces the +same `INVALID_OPERATION_TYPE` error — composition cannot truncate a stream to +its first value. This is a clean architectural boundary, not a deferral: + +- **`OperationEnv` composition** is "call a child operation, get a result" + (the `OperationHandler` model). It is request/response by construction. +- **Stream composition** (filter, map, combine, window, dedupe) is a + handler-level concern. A handler that produces a stream transforms it with + stream operators at the handler level, not through `OperationEnv`. The + `@alkdev/pubsub` `operators.ts` is the prior art for this model: 13 operators + (`filter`, `map`, `take`, `batch`, `dedupe`, `window`, `chain`, `join`, etc.) + that operate on `AsyncIterable`, distinct from the request/response + composition. In Rust, the analogues operate on `BoxStream`. +- No `invoke_streaming()` is added to `OperationEnv`. The protocol composition + surface is request/response; stream manipulation is handler-internal. + +### 6. Server-side dispatch branches on `op_type` + +`Dispatcher::handle_stream` / `dispatch_requested` gains a branch on +`op_type`: + +- `Subscription` → `registry.invoke_streaming()` → for each `ResponseEnvelope` + in the stream, write `EventEnvelope` to the wire → write `call.completed` on + stream end. +- `Query` / `Mutation` → `registry.invoke()` → write one `EventEnvelope` + (existing path, unchanged). + +The streaming branch sets `deadline: None` for subscriptions (unbounded — +already specced in `call-protocol.md` Timeouts) and wires abort cascade +(ADR-016): if `call.aborted` arrives for a streaming request, the stream is +dropped (Rust `Drop` releases the handler's resources). + +### 7. `GatewayDispatch::invoke_streaming()` (alknet-http) + +The shared dispatch spine gains a streaming variant: + +```rust +impl GatewayDispatch { + pub async fn invoke_streaming( + &self, + identity: Option, + op: &str, + input: Value, + ) -> BoxStream; +} +``` + +`invoke_streaming()` builds the root `OperationContext` identically to +`invoke()` (same security invariants: `internal: false`, `forwarded_for: +None`, same capabilities, same `scoped_env`), then calls +`registry.invoke_streaming()`. The two gateways (`to_openapi`, `to_mcp`) +diverge only on wire-framing; the security axis is provably identical between +`invoke()` and `invoke_streaming()`. + +The HTTP `/subscribe` handler calls `invoke_streaming()` and pipes the +`BoxStream` to SSE: each `Ok(value)` → SSE `data:` frame, +`Err` → SSE error event + close, stream end → close. This replaces the current +one-event `subscribe_stream_from_envelope` with the real streaming path. + +### 8. `from_call` stream forwarding + +The `from_call` forwarding handler construction branches on `op_type` during +discovery: + +- `Query` / `Mutation` → existing `make_forwarding_handler()` (calls + `CallConnection::call_with_payload()`, returns single `ResponseEnvelope`), + registered as `HandlerKind::Once`. +- `Subscription` → new `make_streaming_forwarding_handler()` (calls + `CallConnection::subscribe()`, returns `impl Stream`, maps to `BoxStream`), registered as + `HandlerKind::Stream`. + +A `from_call`-imported `Subscription` op forwards the remote stream end-to-end: +the client-side `CallConnection::subscribe()` (already working) feeds a +`StreamingHandler` that produces the stream. No truncation, no first-value +fallback. + +### 9. `from_openapi` SSE forwarding + +The `from_openapi` forwarding handler construction branches on `op_type` +(determined by `detectOperationType` — `text/event-stream` response → +`Subscription`): + +- `Query` / `Mutation` → existing forwarding handler (single HTTP request → + single `ResponseEnvelope`), `HandlerKind::Once`. +- `Subscription` → streaming forwarding handler (HTTP request → SSE response + stream → parse SSE chunks → `BoxStream`), `HandlerKind:: + Stream`. + +The SSE parsing reuses the TS `parseSSEFrames` pattern: each SSE `data:` frame +becomes a `ResponseEnvelope::ok()`, SSE stream end becomes stream end (→ +`call.completed`). + +## Consequences + +**Positive:** + +- `Subscription` operations work end-to-end: server-side handler → + server-side dispatch → wire → HTTP `/subscribe` SSE → `from_call` + forwarding → `from_openapi` SSE forwarding. No truncation, no broken paths. +- The `Handler` / `StreamingHandler` split mirrors the TS prior art exactly, + making the Rust port faithful to its source. +- `HandlerKind` makes the "one or the other, matching `op_type`" invariant + type-level (a `Once` variant for `Query`/`Mutation`, a `Stream` variant for + `Subscription`) rather than a runtime check on two `Option`s. +- Existing handlers (echo, discovery, from_openapi Query/Mutation, from_mcp, + from_call Query/Mutation) are unchanged — they return a single + `ResponseEnvelope` and register as `HandlerKind::Once`. The streaming path + is additive to the existing handler surface. +- `OperationEnv` composition stays request/response, preserving the + composition model's simplicity. Stream composition is a handler-level + concern, cleanly separated. +- The new `INVALID_OPERATION_TYPE` protocol code catches dispatch-path misuse + (calling `invoke()` on a `Subscription`) at the protocol level instead of + silently producing wrong behavior. + +**Negative:** + +- `HandlerRegistration.handler` changes type from `Handler` to `HandlerKind`. + Existing code constructing `HandlerRegistration` bundles must wrap in + `HandlerKind::Once(...)`. This is a mechanical change across handler + construction sites (the builder's `.with_local()` / `.with_leaf()` / + `.with()` methods absorb the wrapping internally, so most assembly-layer + code is unaffected; direct `HandlerRegistration::new()` calls need the + wrap). +- A new protocol-level error code (`INVALID_OPERATION_TYPE`) is a wire-format + addition. Existing clients that treat unknown codes as `INTERNAL` with + `retryable: false` (the existing rule) handle it correctly — they just + don't distinguish it from `INTERNAL` until updated. The code is distinct + from all existing codes and from operation-level domain codes (no + `HTTP_` prefix, no collision with the five existing protocol codes). +- The `Dispatcher::handle_stream` streaming branch adds a stream-to-wire + pump (read stream → write frames → write `call.completed`). This is new + code in the hot dispatch path, but it is a straightforward `while let + Some(envelope) = stream.next().await` loop, not a complex abstraction. + +## Door type + +**One-way.** The `Handler` / `StreamingHandler` / `HandlerKind` API surface +is what handlers are written against across crates (`alknet-call`, +`alknet-http`, downstream consumers). Changing it after handlers exist is a +rewrite. The `INVALID_OPERATION_TYPE` wire code is also one-way — once +emitted, clients may handle it, and removing it would break those handlers. + +The `HandlerKind` enum shape (`Once(Handler) | Stream(StreamingHandler)`) is +the one-way commitment: two handler variants, validated against `op_type`. +The concrete `BoxStream` library choice (`futures::stream::BoxStream` vs a +custom type) is a two-way-door implementation detail within the one-way +decision. + +## References + +- ADR-012: Call Protocol Stream Model (defines `subscribe` as a top-level + protocol operation; the streaming path this ADR implements) +- ADR-017: Call Protocol Client and Adapter Contract (the adapter contract + this ADR extends with the `StreamingHandler` variant) +- ADR-022: Handler Registration, Provenance, and Composition Authority + (`HandlerRegistration` gains `HandlerKind`) +- ADR-015: Privilege Model and Authority Context (visibility/ACL checks run + identically in `invoke_streaming()` as in `invoke()`) +- ADR-016: Abort Cascade for Nested Calls (stream drop on abort; cascade + through streaming handlers) +- ADR-023: Operation Error Schemas (`INVALID_OPERATION_TYPE` is a + protocol-level code, distinct from operation-level domain codes) +- ADR-029: Peer-Graph Routing Model (`from_call` forwarding handlers gain the + streaming variant) +- ADR-009: One-Way Door Decision Framework (the `Handler` / `StreamingHandler` + split is a one-way door — handler API surface) +- `@alkdev/operations/src/types.ts:62-78` — TS prior art + (`OperationHandler` / `SubscriptionHandler` split) +- `@alkdev/operations/src/registry.ts:65-75` — TS prior art + (`validateSubscriptionHandler` — runtime validation against `op_type`) +- `@alkdev/operations/src/call.ts:341-349` — TS prior art (`buildCallHandler` + branches on `op_type`: `SUBSCRIPTION` → iterate + complete; else → execute) +- `@alkdev/pubsub/src/operators.ts` — stream operators prior art (filter, + map, batch, dedupe, window, chain, join — handler-level stream + composition, distinct from `OperationEnv` request/response composition) +- Spec documents amended: `operation-registry.md`, `call-protocol.md`, + `http-server.md`, `http-adapters.md`, `http-mcp.md`, `client-and-adapters.md` \ No newline at end of file diff --git a/docs/architecture/open-questions.md b/docs/architecture/open-questions.md index 95fec86..afd1800 100644 --- a/docs/architecture/open-questions.md +++ b/docs/architecture/open-questions.md @@ -1,6 +1,6 @@ --- status: draft -last_updated: 2026-06-30 +last_updated: 2026-07-02 --- # Open Questions @@ -316,7 +316,12 @@ These questions are acknowledged but not active. They will be promoted to open w - **Status**: resolved - **Door type**: One-way (wire format), two-way (mapping mechanism) - **Priority**: high -- **Resolution**: `OperationSpec` gains `error_schemas: Vec` where each `ErrorDefinition` carries a `code`, `description`, `schema` (JSON Schema for the error detail payload), and optional `http_status` (for adapter projection). The `call.error` payload gains an optional `details` field carrying the typed error payload. Protocol-level codes (`NOT_FOUND`, `FORBIDDEN`, `INVALID_INPUT`, `INTERNAL`, `TIMEOUT`) are distinct from operation-level domain codes (`FILE_NOT_FOUND`, `RATE_LIMITED`, etc.) — protocol codes are emitted by the dispatch machinery, operation codes by handlers. `from_openapi`/`to_openapi` map OpenAPI response status codes to/from `ErrorDefinition`s, making the adapter contract from ADR-017 faithful on the error axis. `services/schema` exposes `error_schemas` for client code generation. See ADR-023. +- **Resolution**: `OperationSpec` gains `error_schemas: Vec` where each `ErrorDefinition` carries a `code`, `description`, `schema` (JSON Schema for the error detail payload), and optional `http_status` (for adapter projection). The `call.error` payload gains an optional `details` field carrying the typed error payload. Protocol-level codes (`NOT_FOUND`, `FORBIDDEN`, `INVALID_INPUT`, + `INVALID_OPERATION_TYPE`, `INTERNAL`, `TIMEOUT`) are distinct from + operation-level domain codes (`FILE_NOT_FOUND`, `RATE_LIMITED`, etc.) — + protocol codes are emitted by the dispatch machinery, operation codes by + handlers. The six-code protocol-level list was extended from five by + ADR-049 (`INVALID_OPERATION_TYPE`). `from_openapi`/`to_openapi` map OpenAPI response status codes to/from `ErrorDefinition`s, making the adapter contract from ADR-017 faithful on the error axis. `services/schema` exposes `error_schemas` for client code generation. See ADR-023. - **Cross-references**: ADR-017, ADR-023, docs/reviews/001-pre-implementation-architecture-sanity-check.md (C5), [operation-registry.md](crates/call/operation-registry.md), [call-protocol.md](crates/call/call-protocol.md) ## Theme: Call Client and Adapters @@ -909,4 +914,44 @@ is a feature extension, not an unmade architecture decision. system's structure, constraints, or API surface across crates. - **Cross-references**: ADR-014, ADR-017, ADR-035, [http-adapters.md](crates/http/http-adapters.md), - [http-mcp.md](crates/http/http-mcp.md) \ No newline at end of file + [http-mcp.md](crates/http/http-mcp.md) + +### OQ-41: Stream Operators Library + +- **Origin**: [ADR-049](decisions/049-streaming-handler-for-subscriptions.md), + [operation-registry.md](crates/call/operation-registry.md) §"OperationEnv" +- **Status**: open (feature extension — a library to build, not a decision + to make before implementation) +- **Door type**: Two-way (additive utility library; no protocol or API-surface + change) +- **Priority**: low +- **Resolution**: ADR-049 establishes that stream composition (filter, map, + combine, window, dedupe) is a **handler-level concern**, not a protocol + composition concern. `OperationEnv::invoke()` is request/response-only; + stream manipulation happens at the handler level with stream operators on + the `BoxStream` the handler itself produces. The + `@alkdev/pubsub` `operators.ts` is the prior art: 13 operators (`filter`, + `map`, `take`, `batch`, `dedupe`, `window`, `chain`, `join`, `reduce`, + `groupBy`, `flat`, `pipe`, `toArray`) that operate on `AsyncIterable`, + forked from graphql-yoga's subscription implementation. + + The Rust analogue — a stream-operators utility crate or module providing + the same set of operators on `BoxStream` / `impl Stream` — is + a **feature extension**, not an unmade architectural decision. Handlers can + produce streams today without it (`Box::pin(stream::iter(...))`, + `async_stream::stream!`, `futures::stream` combinators all work); the + operators library is a convenience that reduces boilerplate for handlers + that transform streams (filter, batch, dedupe, window). No ADR is needed + for the library itself — it's internal utility code that doesn't cross + crate boundaries as a contract. An ADR would be warranted only if the + operators become part of a public API surface (e.g., a handler-registration + DSL that references operator names). + + This OQ exists so the operators library is tracked and findable, not left + as inline hedging in the spec docs. It is not a deferral of a decision — + the architectural decision (stream composition is handler-level, not + protocol-level) is made in ADR-049. This tracks the *implementation* of + the utility library, which is scheduling work, not architecture work. +- **Cross-references**: ADR-049, + [operation-registry.md](crates/call/operation-registry.md) §"OperationEnv", + `/workspace/@alkdev/pubsub/src/operators.ts` (TS prior art) \ No newline at end of file