feat(call/client/from-call-streaming-forwarding): branch from_call forwarding on op_type

Subscription ops discovered via services/list + services/schema now
register a StreamingHandler (HandlerKind::Stream) that calls
CallConnection::subscribe_with_payload and forwards the remote stream
end-to-end (ADR-049 §8). Query/Mutation ops keep the existing
make_forwarding_handler (HandlerKind::Once).

- Add CallConnection::subscribe_with_payload(payload) mirroring
  call_with_payload so the forwarding handler can populate forwarded_for
  (ADR-032) + auth_token on the subscription payload. subscribe() now
  delegates to subscribe_with_payload.
- Add make_streaming_forwarding_handler() in from_call.rs using
  make_streaming_handler + futures::stream::once(...).flatten() to await
  subscribe_with_payload then forward its stream.
- Branch build_bundles on spec.op_type (already parsed by rebuild_spec_for).
- Reuse build_forwarded_payload — no new payload-construction code.
- composition_authority: None, scoped_env: None for FromCall streaming
  leaves (same as Query/Mutation FromCall leaves).
- Abort cascade (ADR-016 §6) already wired via PendingRequestMap in
  subscribe_with_payload.

Closes the gap where a from_call-imported Subscription truncated to the
first value.
This commit is contained in:
2026-07-02 09:43:45 +00:00
parent 185ddb82b5
commit 67b1adba98
2 changed files with 310 additions and 17 deletions

View File

@@ -168,11 +168,26 @@ impl CallConnection {
operation_id: &str,
input: Value,
) -> impl Stream<Item = ResponseEnvelope> {
let request_id = generate_request_id();
let payload = serde_json::json!({
"operationId": operation_id,
"input": input,
});
self.subscribe_with_payload(payload).await
}
/// Subscribe to a remote op with a caller-constructed `call.requested`
/// payload. The payload MUST include `operationId` and `input`; the
/// caller may add `forwarded_for` (ADR-032) and `auth_token` (ADR-017 §7)
/// for the hub forwarding path used by `from_call`'s streaming forwarding
/// handler. Mirrors [`call_with_payload`](Self::call_with_payload) so the
/// forwarding handler can populate `forwarded_for` + `auth_token` on the
/// subscription payload (the plain [`subscribe`](Self::subscribe) builds
/// the payload internally and omits those fields).
pub async fn subscribe_with_payload(
&self,
payload: Value,
) -> impl Stream<Item = ResponseEnvelope> {
let request_id = generate_request_id();
let connection = match &self.connection {
Some(c) => c,