From ef53a03589f7185b4145407590ecd550b18e432a Mon Sep 17 00:00:00 2001 From: "glm-5.2" Date: Wed, 1 Jul 2026 17:17:02 +0000 Subject: [PATCH] feat(call,http): expose EventEnvelope-level dispatch API for non-QUIC transports MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make Dispatcher::dispatch_requested pub and extract abort-cascade handling into a pub handle_abort method so the WebSocket handler can feed deserialized EventEnvelopes directly to the shared Dispatcher without a QUIC Connection. CallConnection gains a new_overlay_only(identity) constructor (Option A) that holds the Layer 2 overlay, PendingRequestMap, and resolved bearer Identity without a QUIC Connection; identity() reads the stored field for the non-QUIC case. compose_root_env uses the new identity() accessor for both paths. The existing QUIC path (CallAdapter, CallClient, run_loop, handle_stream) is unchanged — outgoing client methods guard on connection().is_none(). --- crates/alknet-call/src/client/call_client.rs | 5 +- crates/alknet-call/src/protocol/adapter.rs | 1 + crates/alknet-call/src/protocol/connection.rs | 152 +++++++++++++- crates/alknet-call/src/protocol/dispatch.rs | 196 ++++++++++++++++-- crates/alknet-http/src/websocket/mod.rs | 196 +++++++++++++++++- 5 files changed, 522 insertions(+), 28 deletions(-) diff --git a/crates/alknet-call/src/client/call_client.rs b/crates/alknet-call/src/client/call_client.rs index 7b0b80e..26e96ba 100644 --- a/crates/alknet-call/src/client/call_client.rs +++ b/crates/alknet-call/src/client/call_client.rs @@ -708,7 +708,10 @@ mod tests { let registry = registry_with_caps(); let client = CallClient::new(Arc::clone(®istry), Arc::new(NoopIdentityProvider)); let conn = client.spawn_dispatch(stub_connection()); - assert_eq!(conn.connection().remote_alpn(), b"alknet/call"); + assert_eq!( + conn.connection().expect("quic connection present").remote_alpn(), + b"alknet/call" + ); std::mem::drop(conn); } diff --git a/crates/alknet-call/src/protocol/adapter.rs b/crates/alknet-call/src/protocol/adapter.rs index 9f0a12c..f79c154 100644 --- a/crates/alknet-call/src/protocol/adapter.rs +++ b/crates/alknet-call/src/protocol/adapter.rs @@ -556,6 +556,7 @@ mod tests { resources: HashMap::new(), }; conn.connection() + .expect("quic connection present") .set_identity(peer_identity) .expect("identity not yet set"); diff --git a/crates/alknet-call/src/protocol/connection.rs b/crates/alknet-call/src/protocol/connection.rs index dd2f3c7..9ad4461 100644 --- a/crates/alknet-call/src/protocol/connection.rs +++ b/crates/alknet-call/src/protocol/connection.rs @@ -11,6 +11,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; +use alknet_core::auth::Identity; use alknet_core::types::Connection; use futures::stream::Stream; use parking_lot::{Mutex, RwLock}; @@ -30,7 +31,8 @@ use crate::registry::registration::{Handler, HandlerRegistration}; const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(30); pub struct CallConnection { - connection: Arc, + connection: Option>, + stored_identity: Option, imported_operations: Arc>>, pending: Arc>, } @@ -38,7 +40,8 @@ pub struct CallConnection { impl Clone for CallConnection { fn clone(&self) -> Self { Self { - connection: Arc::clone(&self.connection), + connection: self.connection.clone(), + stored_identity: self.stored_identity.clone(), imported_operations: Arc::clone(&self.imported_operations), pending: Arc::clone(&self.pending), } @@ -48,17 +51,34 @@ impl Clone for CallConnection { impl CallConnection { pub fn new(connection: Connection) -> Self { Self { - connection: Arc::new(connection), + connection: Some(Arc::new(connection)), + stored_identity: None, imported_operations: Arc::new(RwLock::new(HashMap::new())), pending: Arc::new(Mutex::new(PendingRequestMap::new())), } } - pub fn connection(&self) -> &Arc { - &self.connection + pub fn new_overlay_only(identity: Identity) -> Self { + Self { + connection: None, + stored_identity: Some(identity), + imported_operations: Arc::new(RwLock::new(HashMap::new())), + pending: Arc::new(Mutex::new(PendingRequestMap::new())), + } } - pub(crate) fn pending(&self) -> &Arc> { + pub fn connection(&self) -> Option<&Arc> { + self.connection.as_ref() + } + + pub fn identity(&self) -> Option<&Identity> { + match &self.connection { + Some(c) => c.identity(), + None => self.stored_identity.as_ref(), + } + } + + pub fn pending(&self) -> &Arc> { &self.pending } @@ -95,7 +115,17 @@ impl CallConnection { pub async fn call_with_payload(&self, payload: Value) -> ResponseEnvelope { let request_id = generate_request_id(); - let (send, recv) = match self.connection.open_bi().await { + let connection = match &self.connection { + Some(c) => c, + None => { + return ResponseEnvelope::error( + request_id, + CallError::internal("no underlying connection (overlay-only)"), + ); + } + }; + + let (send, recv) = match connection.open_bi().await { Ok(pair) => pair, Err(err) => { let call_error = CallError::internal(format!("failed to open stream: {err}")); @@ -143,7 +173,15 @@ impl CallConnection { "input": input, }); - let (send, recv) = match self.connection.open_bi().await { + let connection = match &self.connection { + Some(c) => c, + None => { + let call_error = CallError::internal("no underlying connection (overlay-only)"); + return SubscriptionStream::closed(request_id, call_error); + } + }; + + let (send, recv) = match connection.open_bi().await { Ok(pair) => pair, Err(err) => { let call_error = CallError::internal(format!("failed to open stream: {err}")); @@ -196,8 +234,11 @@ impl CallConnection { } async fn write_envelope(&self, envelope: &EventEnvelope) -> Result<(), String> { - let (send, _recv) = self + let connection = self .connection + .as_ref() + .ok_or_else(|| "no underlying connection (overlay-only)".to_string())?; + let (send, _recv) = connection .open_bi() .await .map_err(|e| format!("failed to open stream: {e}"))?; @@ -574,7 +615,10 @@ mod tests { #[test] fn connection_accessor_returns_underlying_connection() { let conn = CallConnection::new(stub_connection()); - assert_eq!(conn.connection().remote_alpn(), b"alknet/call"); + assert_eq!( + conn.connection().expect("quic connection present").remote_alpn(), + b"alknet/call" + ); } #[test] @@ -813,4 +857,92 @@ mod tests { "stream terminates after error" ); } + + // --- non-QUIC (overlay-only) CallConnection --------------------------- + + fn sample_identity(id: &str) -> Identity { + Identity { + id: id.to_string(), + scopes: vec!["fs:read".to_string()], + resources: HashMap::new(), + } + } + + #[test] + fn overlay_only_constructor_has_no_quic_connection() { + let conn = CallConnection::new_overlay_only(sample_identity("ws-peer")); + assert!(conn.connection().is_none(), "no QUIC connection stored"); + } + + #[test] + fn overlay_only_identity_returns_stored_identity() { + let conn = CallConnection::new_overlay_only(sample_identity("ws-peer")); + let identity = conn.identity().expect("identity stored"); + assert_eq!(identity.id, "ws-peer"); + assert_eq!(identity.scopes, vec!["fs:read".to_string()]); + } + + #[test] + fn overlay_only_holds_independent_pending_map() { + let conn = CallConnection::new_overlay_only(sample_identity("ws-peer")); + let pending = Arc::clone(conn.pending()); + assert!(pending.lock().is_empty()); + let _rx = pending.lock().register_call( + "req-overlay-1".to_string(), + Instant::now() + Duration::from_secs(30), + None, + ); + assert!(pending.lock().contains("req-overlay-1")); + } + + #[test] + fn overlay_only_register_imported_populates_overlay() { + let conn = CallConnection::new_overlay_only(sample_identity("ws-peer")); + let env = conn.overlay_env(); + assert!(!env.contains("worker/exec")); + conn.register_imported(imported_registration("worker/exec")); + assert!(env.contains("worker/exec")); + } + + #[tokio::test] + async fn overlay_only_overlay_env_dispatches_imported_op() { + let conn = CallConnection::new_overlay_only(sample_identity("ws-peer")); + conn.register_imported(imported_registration("worker/exec")); + let env = conn.overlay_env(); + + let scoped = ScopedPeerEnv::new(["worker/exec"]); + let ctx = root_context("overlay-root-1", scoped, env.clone()); + + let response = env + .invoke("worker", "exec", serde_json::json!({"v": 1}), &ctx) + .await; + assert!(response.result.is_ok()); + assert_eq!(response.result.unwrap(), serde_json::json!({"v": 1})); + } + + #[tokio::test] + async fn overlay_only_call_without_connection_returns_error() { + let conn = CallConnection::new_overlay_only(sample_identity("ws-peer")); + let response = conn.call("fs/readFile", serde_json::json!({})).await; + let err = response.result.expect_err("no connection → error"); + assert_eq!(err.code, "INTERNAL"); + } + + #[test] + fn quic_path_identity_returns_connection_identity() { + let conn = CallConnection::new(stub_connection()); + conn.connection() + .expect("quic connection present") + .set_identity(sample_identity("quic-peer")) + .expect("identity not yet set"); + let identity = conn.identity().expect("identity from connection"); + assert_eq!(identity.id, "quic-peer"); + } + + #[test] + fn quic_path_stored_identity_is_none_when_connection_present() { + let conn = CallConnection::new(stub_connection()); + assert!(conn.connection().is_some(), "QUIC connection present"); + assert!(conn.identity().is_none(), "no identity set yet"); + } } diff --git a/crates/alknet-call/src/protocol/dispatch.rs b/crates/alknet-call/src/protocol/dispatch.rs index 7f99a8d..172722b 100644 --- a/crates/alknet-call/src/protocol/dispatch.rs +++ b/crates/alknet-call/src/protocol/dispatch.rs @@ -96,7 +96,7 @@ impl Dispatcher { } } - pub(crate) fn compose_root_env( + pub fn compose_root_env( &self, connection: &CallConnection, context: &OperationContext, @@ -112,11 +112,7 @@ impl Dispatcher { if let Some(session) = session { env = env.with_session(session); } - if let Some(peer_id) = connection - .connection() - .identity() - .map(|identity| identity.id.clone()) - { + if let Some(peer_id) = connection.identity().map(|identity| identity.id.clone()) { env.attach_peer(peer_id, connection.overlay_env()); } Arc::new(env) @@ -164,7 +160,7 @@ impl Dispatcher { context } - pub(crate) async fn dispatch_requested( + pub async fn dispatch_requested( &self, connection: &Arc, request_id: String, @@ -176,7 +172,7 @@ impl Dispatcher { .unwrap_or(""); let operation_name = Self::strip_leading_slash(operation_id).to_string(); - let connection_identity = connection.connection().identity().cloned(); + let connection_identity = connection.identity().cloned(); let identity = self.resolve_identity(connection_identity, &payload); let forwarded_for = payload .get("forwarded_for") @@ -195,6 +191,16 @@ impl Dispatcher { self.registry.invoke(&operation_name, input, context).await } + pub async fn handle_abort(&self, connection: &Arc, request_id: &str) { + let mut pending = connection.pending().lock(); + let mut cascade = AbortCascade::new(&mut pending); + let aborted = cascade.cascade_abort(request_id, AbortPolicy::AbortDependents); + pending.handle_aborted(request_id); + if !aborted.is_empty() { + debug!(count = aborted.len(), "abort cascade evicted descendants"); + } + } + pub(crate) async fn handle_stream( &self, connection: Arc, @@ -231,13 +237,7 @@ impl Dispatcher { } EVENT_ABORTED => { let request_id = envelope.id.clone(); - let mut pending = connection.pending().lock(); - let mut cascade = AbortCascade::new(&mut pending); - let aborted = cascade.cascade_abort(&request_id, AbortPolicy::AbortDependents); - pending.handle_aborted(&request_id); - if !aborted.is_empty() { - debug!(count = aborted.len(), "abort cascade evicted descendants"); - } + self.handle_abort(&connection, &request_id).await; } other => { debug!(event_type = %other, id = %envelope.id, "ignoring non-requested/non-aborted event on inbound stream"); @@ -254,6 +254,14 @@ impl Dispatcher { pub async fn run_loop(self, connection: Arc) { let pending = Arc::clone(connection.pending()); + let quic = match connection.connection() { + Some(c) => Arc::clone(c), + None => { + warn!("run_loop called with an overlay-only CallConnection; returning"); + return; + } + }; + let sweeper_pending = Arc::clone(&pending); let sweeper_handle: JoinHandle<()> = tokio::spawn(async move { let mut interval = tokio::time::interval(SWEEPER_INTERVAL); @@ -271,7 +279,7 @@ impl Dispatcher { }); loop { - match connection.connection().accept_bi().await { + match quic.accept_bi().await { Ok((send, recv)) => { let conn = Arc::clone(&connection); let dispatcher = self.clone(); @@ -317,6 +325,7 @@ impl Clone for Dispatcher { #[cfg(test)] mod tests { use super::*; + use crate::protocol::wire::EVENT_RESPONDED; use crate::registry::registration::{make_handler, HandlerRegistration, OperationProvenance}; use crate::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use alknet_core::auth::{AuthToken, Identity, IdentityProvider}; @@ -577,6 +586,7 @@ mod tests { let dp = Dispatcher::new(registry, provider); let conn = CallConnection::new(stub_connection()); conn.connection() + .expect("quic connection present") .set_identity(identity_with_scopes("worker-a", &[])) .expect("identity not yet set"); @@ -687,4 +697,158 @@ mod tests { fn dispatcher_helper_compiles_with_full_signature() { let _dp = dispatcher(); } + + // --- non-QUIC (overlay-only) dispatch path ---------------------------- + + fn overlay_only_connection(identity: Identity) -> Arc { + Arc::new(CallConnection::new_overlay_only(identity)) + } + + #[tokio::test] + async fn dispatch_requested_works_with_overlay_only_connection() { + let registry = Arc::new(registry_with( + "echo/run", + Visibility::External, + AccessControl::default(), + )); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = Dispatcher::new(registry, provider); + let conn = overlay_only_connection(identity_with_scopes("ws-peer", &[])); + + let payload = serde_json::json!({ + "operationId": "/echo/run", + "input": { "msg": "hello" }, + }); + let response = dp + .dispatch_requested(&conn, "ws-req-1".to_string(), payload) + .await; + assert_eq!(response.request_id, "ws-req-1"); + assert_eq!(response.result, Ok(serde_json::json!({ "msg": "hello" }))); + } + + #[tokio::test] + async fn dispatch_requested_overlay_only_attaches_peer_keyed_by_stored_identity() { + let mut registry = OperationRegistry::new(); + let handler = make_handler(|_input, context| async move { + let peer_ids = context.env.peer_ids(); + ResponseEnvelope::ok( + context.request_id, + serde_json::json!({ "peer_ids": peer_ids }), + ) + }); + registry.register(HandlerRegistration::new( + external_spec("fs/readFile", AccessControl::default()), + handler, + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + let registry = Arc::new(registry); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = Dispatcher::new(registry, provider); + let conn = overlay_only_connection(identity_with_scopes("ws-peer", &[])); + + let payload = serde_json::json!({ + "operationId": "/fs/readFile", + "input": {}, + }); + let response = dp + .dispatch_requested(&conn, "ws-req-2".to_string(), payload) + .await; + let out = response.result.expect("ok"); + assert_eq!(out["peer_ids"], serde_json::json!(["ws-peer"])); + } + + #[tokio::test] + async fn dispatch_requested_overlay_only_unknown_op_returns_not_found() { + let registry = Arc::new(OperationRegistry::new()); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = Dispatcher::new(registry, provider); + let conn = overlay_only_connection(identity_with_scopes("ws-peer", &[])); + + let payload = serde_json::json!({ + "operationId": "/no/such/op", + "input": {}, + }); + let response = dp + .dispatch_requested(&conn, "ws-req-3".to_string(), payload) + .await; + match response.result { + Err(e) => assert_eq!(e.code, "NOT_FOUND"), + other => panic!("expected NOT_FOUND, got {other:?}"), + } + } + + #[tokio::test] + async fn handle_abort_works_with_overlay_only_connection() { + let registry = Arc::new(registry_with( + "echo/run", + Visibility::External, + AccessControl::default(), + )); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = Dispatcher::new(registry, provider); + let conn = overlay_only_connection(identity_with_scopes("ws-peer", &[])); + + let parent_id = "ws-abort-root".to_string(); + let child_id = "ws-abort-child".to_string(); + { + let mut pending = conn.pending().lock(); + pending.register_call(parent_id.clone(), Instant::now() + Duration::from_secs(30), None); + pending.register_call( + child_id.clone(), + Instant::now() + Duration::from_secs(30), + Some(parent_id.clone()), + ); + } + assert!(conn.pending().lock().contains(&parent_id)); + assert!(conn.pending().lock().contains(&child_id)); + + dp.handle_abort(&conn, &parent_id).await; + + assert!( + !conn.pending().lock().contains(&parent_id), + "parent entry removed after abort" + ); + assert!( + !conn.pending().lock().contains(&child_id), + "child aborted by cascade" + ); + } + + #[tokio::test] + async fn handle_abort_unknown_request_id_is_noop_for_overlay_only() { + let registry = Arc::new(OperationRegistry::new()); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = Dispatcher::new(registry, provider); + let conn = overlay_only_connection(identity_with_scopes("ws-peer", &[])); + + dp.handle_abort(&conn, "totally-unknown").await; + assert!(conn.pending().lock().is_empty()); + } + + #[tokio::test] + async fn overlay_only_full_dispatch_round_trip_returns_response_envelope() { + let registry = Arc::new(registry_with( + "echo/run", + Visibility::External, + AccessControl::default(), + )); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = Dispatcher::new(registry, provider); + let conn = overlay_only_connection(identity_with_scopes("ws-peer", &[])); + + let payload = serde_json::json!({ + "operationId": "/echo/run", + "input": { "v": 42 }, + }); + let request_id = "ws-roundtrip-1".to_string(); + let response = dp.dispatch_requested(&conn, request_id.clone(), payload).await; + assert!(response.result.is_ok()); + let envelope: EventEnvelope = response.into(); + assert_eq!(envelope.r#type, EVENT_RESPONDED); + assert_eq!(envelope.id, "ws-roundtrip-1"); + assert_eq!(envelope.payload.get("output"), Some(&serde_json::json!({ "v": 42 }))); + } } diff --git a/crates/alknet-http/src/websocket/mod.rs b/crates/alknet-http/src/websocket/mod.rs index 1ec437f..6f89a11 100644 --- a/crates/alknet-http/src/websocket/mod.rs +++ b/crates/alknet-http/src/websocket/mod.rs @@ -4,4 +4,198 @@ //! native `EventEnvelope` call-protocol session, not the gateway shape //! (ADR-048). See `docs/architecture/crates/http/websocket.md`. -// TODO: implement +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + use std::time::{Duration, Instant}; + + use alknet_call::protocol::connection::CallConnection; + use alknet_call::protocol::dispatch::Dispatcher; + use alknet_call::protocol::wire::{EventEnvelope, ResponseEnvelope, EVENT_RESPONDED}; + use alknet_call::registry::context::AbortPolicy; + use alknet_call::registry::registration::{ + make_handler, HandlerRegistration, OperationProvenance, + }; + use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; + use alknet_core::auth::{Identity, IdentityProvider}; + use alknet_core::types::Capabilities; + use std::sync::Mutex as StdMutex; + + struct StaticIdentityProvider { + tokens: StdMutex>, + } + + impl StaticIdentityProvider { + fn new() -> Self { + Self { + tokens: StdMutex::new(HashMap::new()), + } + } + + fn with_token(self, token: &str, identity: Identity) -> Self { + self.tokens + .lock() + .unwrap() + .insert(token.to_string(), identity); + self + } + } + + impl IdentityProvider for StaticIdentityProvider { + fn resolve_from_fingerprint(&self, _fp: &str) -> Option { + None + } + fn resolve_from_token(&self, token: &alknet_core::auth::AuthToken) -> Option { + let token_str = String::from_utf8_lossy(&token.raw); + self.tokens.lock().unwrap().get(token_str.as_ref()).cloned() + } + } + + fn identity(id: &str) -> Identity { + Identity { + id: id.to_string(), + scopes: vec![], + resources: HashMap::new(), + } + } + + fn external_spec(name: &str) -> OperationSpec { + OperationSpec::new( + name, + OperationType::Query, + Visibility::External, + serde_json::json!({}), + serde_json::json!({}), + vec![], + AccessControl::default(), + ) + } + + fn echo_registry() -> Arc { + let mut registry = alknet_call::registry::registration::OperationRegistry::new(); + registry.register(HandlerRegistration::new( + external_spec("echo/run"), + make_handler(|input, ctx| async move { ResponseEnvelope::ok(ctx.request_id, input) }), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + Arc::new(registry) + } + + #[tokio::test] + async fn ws_dispatch_requested_via_pub_api_returns_response_envelope() { + let registry = echo_registry(); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dispatcher = Dispatcher::new(registry, provider); + let conn = Arc::new(CallConnection::new_overlay_only(identity("ws-peer"))); + + let payload = serde_json::json!({ + "operationId": "/echo/run", + "input": { "msg": "hi" }, + }); + let request_id = "ws-1".to_string(); + let response = dispatcher + .dispatch_requested(&conn, request_id.clone(), payload) + .await; + assert!(response.result.is_ok()); + assert_eq!(response.request_id, "ws-1"); + assert_eq!(response.result.unwrap(), serde_json::json!({ "msg": "hi" })); + } + + #[tokio::test] + async fn ws_dispatch_round_trip_event_envelope_via_pub_api() { + let registry = echo_registry(); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dispatcher = Dispatcher::new(registry, provider); + let conn = Arc::new(CallConnection::new_overlay_only(identity("ws-peer"))); + + let envelope = EventEnvelope::new( + "call.requested", + "ws-rt-1", + serde_json::json!({ + "operationId": "/echo/run", + "input": { "v": 7 }, + }), + ); + let response = dispatcher + .dispatch_requested(&conn, envelope.id.clone(), envelope.payload.clone()) + .await; + let out: EventEnvelope = response.into(); + assert_eq!(out.r#type, EVENT_RESPONDED); + assert_eq!(out.id, "ws-rt-1"); + assert_eq!(out.payload.get("output"), Some(&serde_json::json!({ "v": 7 }))); + } + + #[tokio::test] + async fn ws_handle_abort_via_pub_api_cascades_descendants() { + let registry = echo_registry(); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dispatcher = Dispatcher::new(registry, provider); + let conn = Arc::new(CallConnection::new_overlay_only(identity("ws-peer"))); + + { + let mut pending = conn.pending().lock(); + pending.register_call( + "ws-parent".to_string(), + Instant::now() + Duration::from_secs(30), + None, + ); + pending.register_call( + "ws-child".to_string(), + Instant::now() + Duration::from_secs(30), + Some("ws-parent".to_string()), + ); + } + dispatcher.handle_abort(&conn, "ws-parent").await; + assert!(!conn.pending().lock().contains("ws-parent")); + assert!(!conn.pending().lock().contains("ws-child")); + } + + #[tokio::test] + async fn ws_overlay_only_connection_holds_overlay_and_pending() { + let conn = CallConnection::new_overlay_only(identity("ws-peer")); + assert!(conn.connection().is_none()); + assert_eq!(conn.identity().map(|i| i.id.clone()), Some("ws-peer".to_string())); + assert!(conn.pending().lock().is_empty()); + + let env = conn.overlay_env(); + assert!(!env.contains("worker/exec")); + conn.register_imported(HandlerRegistration::new( + external_spec("worker/exec"), + make_handler(|input, ctx| async move { ResponseEnvelope::ok(ctx.request_id, input) }), + OperationProvenance::FromCall, + None, + None, + Capabilities::new(), + )); + assert!(env.contains("worker/exec")); + } + + #[tokio::test] + async fn ws_dispatch_with_auth_token_resolves_identity_via_provider() { + let registry = echo_registry(); + let provider: Arc = Arc::new( + StaticIdentityProvider::new().with_token("ws-token", identity("resolved-peer")), + ); + let dispatcher = Dispatcher::new(registry, provider); + let conn = Arc::new(CallConnection::new_overlay_only(identity("ws-peer"))); + + let payload = serde_json::json!({ + "operationId": "/echo/run", + "input": {}, + "auth_token": "ws-token", + }); + let response = dispatcher + .dispatch_requested(&conn, "ws-auth-1".to_string(), payload) + .await; + assert!(response.result.is_ok()); + } + + #[test] + fn ws_abort_policy_default_is_abort_dependents() { + assert_eq!(AbortPolicy::default(), AbortPolicy::AbortDependents); + } +}