From 67b1adba9876c019e00e45c3d6c0efb3f0851db7 Mon Sep 17 00:00:00 2001 From: "glm-5.2" Date: Thu, 2 Jul 2026 09:43:45 +0000 Subject: [PATCH] feat(call/client/from-call-streaming-forwarding): branch from_call forwarding on op_type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Subscription ops discovered via services/list + services/schema now register a StreamingHandler (HandlerKind::Stream) that calls CallConnection::subscribe_with_payload and forwards the remote stream end-to-end (ADR-049 §8). Query/Mutation ops keep the existing make_forwarding_handler (HandlerKind::Once). - Add CallConnection::subscribe_with_payload(payload) mirroring call_with_payload so the forwarding handler can populate forwarded_for (ADR-032) + auth_token on the subscription payload. subscribe() now delegates to subscribe_with_payload. - Add make_streaming_forwarding_handler() in from_call.rs using make_streaming_handler + futures::stream::once(...).flatten() to await subscribe_with_payload then forward its stream. - Branch build_bundles on spec.op_type (already parsed by rebuild_spec_for). - Reuse build_forwarded_payload — no new payload-construction code. - composition_authority: None, scoped_env: None for FromCall streaming leaves (same as Query/Mutation FromCall leaves). - Abort cascade (ADR-016 §6) already wired via PendingRequestMap in subscribe_with_payload. Closes the gap where a from_call-imported Subscription truncated to the first value. --- crates/alknet-call/src/client/from_call.rs | 310 +++++++++++++++++- crates/alknet-call/src/protocol/connection.rs | 17 +- 2 files changed, 310 insertions(+), 17 deletions(-) diff --git a/crates/alknet-call/src/client/from_call.rs b/crates/alknet-call/src/client/from_call.rs index cfd83eb..947a413 100644 --- a/crates/alknet-call/src/client/from_call.rs +++ b/crates/alknet-call/src/client/from_call.rs @@ -20,7 +20,7 @@ use crate::protocol::connection::CallConnection; use crate::protocol::wire::ResponseEnvelope; use crate::registry::context::OperationContext; use crate::registry::registration::{ - Handler, HandlerKind, HandlerRegistration, OperationProvenance, + Handler, HandlerKind, HandlerRegistration, OperationProvenance, StreamingHandler, }; use crate::registry::spec::{ AccessControl, ErrorDefinition, OperationSpec, OperationType, Visibility, @@ -123,14 +123,23 @@ fn build_bundles( }); } - let handler = make_forwarding_handler( - Arc::new(op_summary.connection.clone()), - remote_name, - op_summary.credentials_auth_token.clone(), - ); + let kind = match spec.op_type { + OperationType::Subscription => HandlerKind::Stream(make_streaming_forwarding_handler( + Arc::new(op_summary.connection.clone()), + remote_name, + op_summary.credentials_auth_token.clone(), + )), + OperationType::Query | OperationType::Mutation => { + HandlerKind::Once(make_forwarding_handler( + Arc::new(op_summary.connection.clone()), + remote_name, + op_summary.credentials_auth_token.clone(), + )) + } + }; bundles.push(HandlerRegistration::new( spec, - HandlerKind::Once(handler), + kind, OperationProvenance::FromCall, None, None, @@ -311,8 +320,10 @@ fn parse_access_control(v: &Value) -> AccessControl { } } -/// Construct a forwarding handler for a `FromCall` leaf: on invocation, calls -/// the remote op via the `CallConnection` and returns its `ResponseEnvelope`. +/// Construct a forwarding handler for a `FromCall` `Query`/`Mutation` leaf: +/// on invocation, calls the remote op via the `CallConnection` and returns +/// its `ResponseEnvelope` (single `call_with_payload()`, `HandlerKind::Once`). +/// `Subscription` ops use [`make_streaming_forwarding_handler`] instead. /// /// Per ADR-032 §3, the handler populates `forwarded_for` on the /// `call.requested` payload from the hub's `OperationContext.identity` (the @@ -325,12 +336,6 @@ fn parse_access_control(v: &Value) -> AccessControl { /// If `context.identity` is `None` (the hub chose not to disclose, or has not /// authenticated an originator), `forwarded_for` is omitted — the spoke /// receives only the hub's identity. -/// -/// For a `Subscription` op, the handler calls `subscribe` and streams until -/// `completed`/`aborted` (the streaming path is exercised at the -/// `CallConnection` layer; the handler here forwards the first response for -/// query/mutation and delegates streaming to the caller via the returned -/// envelope). fn make_forwarding_handler( connection: Arc, remote_name: String, @@ -359,6 +364,40 @@ fn make_forwarding_handler( }) } +/// Construct a streaming forwarding handler for a `FromCall` `Subscription` +/// leaf: on invocation, calls `CallConnection::subscribe_with_payload()` and +/// forwards the remote stream end-to-end. Each `call.responded` from the +/// remote becomes a stream item, `call.completed` ends the stream, and +/// `call.aborted` drops it (ADR-049 §8). No truncation, no first-value +/// fallback. +/// +/// `forwarded_for` is populated from `context.identity` (ADR-032 §3) and +/// `auth_token` from the hub's own call-protocol token, exactly as the +/// request/response forwarding handler does — both via `build_forwarded_payload` +/// (no new payload-construction code). The `subscribe_with_payload` path +/// registers the request in `PendingRequestMap`, so the abort cascade +/// (ADR-016 §6) is already wired: a parent abort drops the +/// `SubscriptionStream`, which sends `call.aborted` to the remote node. +fn make_streaming_forwarding_handler( + connection: Arc, + remote_name: String, + credentials_auth_token: Option, +) -> StreamingHandler { + use crate::registry::registration::make_streaming_handler; + use futures::stream::{once, StreamExt}; + make_streaming_handler(move |input, context| { + let connection = Arc::clone(&connection); + let remote_name = remote_name.clone(); + let auth_token = credentials_auth_token.clone(); + once(async move { + let payload = + build_forwarded_payload(&remote_name, input, &context, auth_token.as_deref()); + connection.subscribe_with_payload(payload).await + }) + .flatten() + }) +} + /// Build the `call.requested` payload for a forwarded call, populating /// `forwarded_for` from the hub's `OperationContext.identity` (ADR-032 §3). /// `forwarded_for` is omitted when `context.identity` is `None` (the hub @@ -391,7 +430,7 @@ fn build_forwarded_payload( mod tests { use super::*; use crate::protocol::connection::CallConnection; - use crate::registry::registration::make_handler; + use crate::registry::registration::{make_handler, make_streaming_handler}; use crate::registry::spec::OperationType; use alknet_core::auth::Identity; use alknet_core::types::{Capabilities, MockConnection}; @@ -724,6 +763,15 @@ mod tests { } } + fn op_summary_typed(name: &str, op_type: &str, conn: &CallConnection) -> OpSummary { + OpSummary { + name: name.to_string(), + schema: sample_schema_json(name, op_type), + connection: conn.clone(), + credentials_auth_token: None, + } + } + #[test] fn build_bundles_same_peer_collision_returns_same_peer_collision_error() { let conn = CallConnection::new(stub_connection()); @@ -824,4 +872,234 @@ mod tests { assert_eq!(bundles.len(), 1); assert_eq!(bundles[0].spec.name, "worker/exec"); } + + // --- ADR-049 §8: streaming forwarding for Subscription ops ------------- + + #[test] + fn build_bundles_subscription_op_produces_stream_kind() { + let conn = CallConnection::new(stub_connection()); + let discovered = vec![op_summary_typed("events/stream", "subscription", &conn)]; + let bundles = build_bundles(discovered, &None, &None).expect("bundles"); + assert_eq!(bundles.len(), 1); + assert_eq!(bundles[0].spec.op_type, OperationType::Subscription); + assert!( + matches!(bundles[0].handler, HandlerKind::Stream(_)), + "Subscription op must register HandlerKind::Stream" + ); + assert_eq!(bundles[0].provenance, OperationProvenance::FromCall); + assert!(bundles[0].composition_authority.is_none()); + assert!(bundles[0].scoped_env.is_none()); + } + + #[test] + fn build_bundles_query_op_produces_once_kind() { + let conn = CallConnection::new(stub_connection()); + let discovered = vec![op_summary_typed("fs/readFile", "query", &conn)]; + let bundles = build_bundles(discovered, &None, &None).expect("bundles"); + assert_eq!(bundles.len(), 1); + assert_eq!(bundles[0].spec.op_type, OperationType::Query); + assert!( + matches!(bundles[0].handler, HandlerKind::Once(_)), + "Query op must register HandlerKind::Once" + ); + } + + #[test] + fn build_bundles_mutation_op_produces_once_kind() { + let conn = CallConnection::new(stub_connection()); + let discovered = vec![op_summary_typed("fs/writeFile", "mutation", &conn)]; + let bundles = build_bundles(discovered, &None, &None).expect("bundles"); + assert_eq!(bundles.len(), 1); + assert_eq!(bundles[0].spec.op_type, OperationType::Mutation); + assert!( + matches!(bundles[0].handler, HandlerKind::Once(_)), + "Mutation op must register HandlerKind::Once" + ); + } + + #[test] + fn build_bundles_mixed_op_types_route_to_correct_kind() { + let conn = CallConnection::new(stub_connection()); + let discovered = vec![ + op_summary_typed("fs/readFile", "query", &conn), + op_summary_typed("fs/writeFile", "mutation", &conn), + op_summary_typed("events/stream", "subscription", &conn), + ]; + let bundles = build_bundles(discovered, &None, &None).expect("bundles"); + assert_eq!(bundles.len(), 3); + let by_name: std::collections::HashMap<&str, &HandlerKind> = bundles + .iter() + .map(|b| (b.spec.name.as_str(), &b.handler)) + .collect(); + assert!(matches!(by_name["fs/readFile"], HandlerKind::Once(_))); + assert!(matches!(by_name["fs/writeFile"], HandlerKind::Once(_))); + assert!(matches!(by_name["events/stream"], HandlerKind::Stream(_))); + } + + /// Verify `make_streaming_forwarding_handler` produces a `StreamingHandler` + /// that builds the forwarded payload with `forwarded_for` populated from + /// `context.identity` (ADR-032) and calls `subscribe_with_payload`. Since + /// `subscribe_with_payload` on a mock connection returns a closed stream + /// (no transport), we capture the payload by intercepting the build step: + /// the handler's contract is "build payload via `build_forwarded_payload`, + /// then call `subscribe_with_payload(payload)`". We mirror the existing + /// `forwarding_handler_populates_forwarded_for` test by constructing the + /// handler and exercising the payload-construction path it relies on, plus + /// asserting the produced stream terminates (the mock-connection path + /// yields one error envelope then ends — no truncation, no hang). + #[tokio::test] + async fn streaming_forwarding_handler_populates_forwarded_for_and_streams() { + use futures::stream::StreamExt; + + let conn = Arc::new(CallConnection::new(stub_connection())); + let captured_payload = Arc::new(StdMutex::new(None::)); + let captured = Arc::clone(&captured_payload); + + let handler: StreamingHandler = { + let conn = Arc::clone(&conn); + make_streaming_handler(move |input, context| { + let conn = Arc::clone(&conn); + let captured = Arc::clone(&captured); + let remote_name = "events/stream".to_string(); + use futures::stream::{once, StreamExt}; + once(async move { + let payload = build_forwarded_payload(&remote_name, input, &context, None); + *captured.lock().unwrap() = Some(payload.clone()); + conn.subscribe_with_payload(payload).await + }) + .flatten() + }) + }; + + let ctx = test_context(Some(alice_identity())); + let mut stream = handler(json!({}), ctx); + let first = stream.next().await; + assert!( + first.is_some(), + "streaming forwarding handler must produce at least one envelope" + ); + if let Some(env) = first { + assert!( + env.result.is_err(), + "mock connection has no transport, so the stream yields an error envelope" + ); + } + let second = stream.next().await; + assert!( + second.is_none(), + "stream must terminate after the error (no truncation, no hang)" + ); + + let payload = captured_payload.lock().unwrap().clone().expect("captured"); + assert_eq!(payload["operationId"], "events/stream"); + assert_eq!(payload["forwarded_for"]["id"], "alice"); + } + + /// The streaming forwarding handler omits `forwarded_for` when + /// `context.identity` is `None`, mirroring the request/response handler. + #[tokio::test] + async fn streaming_forwarding_handler_omits_forwarded_for_when_identity_none() { + use futures::stream::StreamExt; + + let conn = Arc::new(CallConnection::new(stub_connection())); + let captured_payload = Arc::new(StdMutex::new(None::)); + let captured = Arc::clone(&captured_payload); + + let handler: StreamingHandler = { + let conn = Arc::clone(&conn); + make_streaming_handler(move |input, context| { + let conn = Arc::clone(&conn); + let captured = Arc::clone(&captured); + let remote_name = "events/stream".to_string(); + use futures::stream::{once, StreamExt}; + once(async move { + let payload = build_forwarded_payload(&remote_name, input, &context, None); + *captured.lock().unwrap() = Some(payload.clone()); + conn.subscribe_with_payload(payload).await + }) + .flatten() + }) + }; + + let ctx = test_context(None); + let mut stream = handler(json!({}), ctx); + let _ = stream.next().await; + let payload = captured_payload.lock().unwrap().clone().expect("captured"); + assert!( + payload.get("forwarded_for").is_none(), + "forwarded_for must be omitted when context.identity is None" + ); + assert_eq!(payload["operationId"], "events/stream"); + } + + /// The streaming forwarding handler populates `auth_token` when the hub's + /// own call-protocol token is provided. + #[tokio::test] + async fn streaming_forwarding_handler_sets_auth_token_when_provided() { + use futures::stream::StreamExt; + + let conn = Arc::new(CallConnection::new(stub_connection())); + let captured_payload = Arc::new(StdMutex::new(None::)); + let captured = Arc::clone(&captured_payload); + + let handler: StreamingHandler = { + let conn = Arc::clone(&conn); + make_streaming_handler(move |input, context| { + let conn = Arc::clone(&conn); + let captured = Arc::clone(&captured); + let remote_name = "events/stream".to_string(); + use futures::stream::{once, StreamExt}; + once(async move { + let payload = build_forwarded_payload( + &remote_name, + input, + &context, + Some("alk_hub_token"), + ); + *captured.lock().unwrap() = Some(payload.clone()); + conn.subscribe_with_payload(payload).await + }) + .flatten() + }) + }; + + let ctx = test_context(Some(alice_identity())); + let mut stream = handler(json!({}), ctx); + let _ = stream.next().await; + let payload = captured_payload.lock().unwrap().clone().expect("captured"); + assert_eq!(payload["auth_token"], "alk_hub_token"); + assert_eq!(payload["forwarded_for"]["id"], "alice"); + } + + /// `make_streaming_forwarding_handler` produces a `StreamingHandler` (not a + /// `Handler`) — verifies the helper returns the right type and that + /// `build_bundles` wires it into `HandlerKind::Stream`. + #[test] + fn make_streaming_forwarding_handler_returns_streaming_handler() { + let handler = make_streaming_forwarding_handler( + Arc::new(CallConnection::new(stub_connection())), + "events/stream".to_string(), + None, + ); + let reg = HandlerRegistration::new( + OperationSpec::new( + "events/stream", + OperationType::Subscription, + Visibility::External, + json!({}), + json!({}), + vec![], + AccessControl::default(), + ), + HandlerKind::Stream(handler), + OperationProvenance::FromCall, + None, + None, + Capabilities::new(), + ); + assert!(matches!(reg.handler, HandlerKind::Stream(_))); + assert_eq!(reg.provenance, OperationProvenance::FromCall); + assert!(reg.composition_authority.is_none()); + assert!(reg.scoped_env.is_none()); + } } diff --git a/crates/alknet-call/src/protocol/connection.rs b/crates/alknet-call/src/protocol/connection.rs index f49b71e..decca62 100644 --- a/crates/alknet-call/src/protocol/connection.rs +++ b/crates/alknet-call/src/protocol/connection.rs @@ -168,11 +168,26 @@ impl CallConnection { operation_id: &str, input: Value, ) -> impl Stream { - let request_id = generate_request_id(); let payload = serde_json::json!({ "operationId": operation_id, "input": input, }); + self.subscribe_with_payload(payload).await + } + + /// Subscribe to a remote op with a caller-constructed `call.requested` + /// payload. The payload MUST include `operationId` and `input`; the + /// caller may add `forwarded_for` (ADR-032) and `auth_token` (ADR-017 §7) + /// for the hub forwarding path used by `from_call`'s streaming forwarding + /// handler. Mirrors [`call_with_payload`](Self::call_with_payload) so the + /// forwarding handler can populate `forwarded_for` + `auth_token` on the + /// subscription payload (the plain [`subscribe`](Self::subscribe) builds + /// the payload internally and omits those fields). + pub async fn subscribe_with_payload( + &self, + payload: Value, + ) -> impl Stream { + let request_id = generate_request_id(); let connection = match &self.connection { Some(c) => c,