feat(call,http): expose EventEnvelope-level dispatch API for non-QUIC transports

Make Dispatcher::dispatch_requested pub and extract abort-cascade handling
into a pub handle_abort method so the WebSocket handler can feed deserialized
EventEnvelopes directly to the shared Dispatcher without a QUIC Connection.

CallConnection gains a new_overlay_only(identity) constructor (Option A) that
holds the Layer 2 overlay, PendingRequestMap, and resolved bearer Identity
without a QUIC Connection; identity() reads the stored field for the non-QUIC
case. compose_root_env uses the new identity() accessor for both paths.

The existing QUIC path (CallAdapter, CallClient, run_loop, handle_stream) is
unchanged — outgoing client methods guard on connection().is_none().
This commit is contained in:
2026-07-01 17:17:02 +00:00
parent 1900c72deb
commit ef53a03589
5 changed files with 522 additions and 28 deletions

View File

@@ -11,6 +11,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use alknet_core::auth::Identity;
use alknet_core::types::Connection;
use futures::stream::Stream;
use parking_lot::{Mutex, RwLock};
@@ -30,7 +31,8 @@ use crate::registry::registration::{Handler, HandlerRegistration};
const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(30);
pub struct CallConnection {
connection: Arc<Connection>,
connection: Option<Arc<Connection>>,
stored_identity: Option<Identity>,
imported_operations: Arc<RwLock<HashMap<String, HandlerRegistration>>>,
pending: Arc<Mutex<PendingRequestMap>>,
}
@@ -38,7 +40,8 @@ pub struct CallConnection {
impl Clone for CallConnection {
fn clone(&self) -> Self {
Self {
connection: Arc::clone(&self.connection),
connection: self.connection.clone(),
stored_identity: self.stored_identity.clone(),
imported_operations: Arc::clone(&self.imported_operations),
pending: Arc::clone(&self.pending),
}
@@ -48,17 +51,34 @@ impl Clone for CallConnection {
impl CallConnection {
pub fn new(connection: Connection) -> Self {
Self {
connection: Arc::new(connection),
connection: Some(Arc::new(connection)),
stored_identity: None,
imported_operations: Arc::new(RwLock::new(HashMap::new())),
pending: Arc::new(Mutex::new(PendingRequestMap::new())),
}
}
pub fn connection(&self) -> &Arc<Connection> {
&self.connection
pub fn new_overlay_only(identity: Identity) -> Self {
Self {
connection: None,
stored_identity: Some(identity),
imported_operations: Arc::new(RwLock::new(HashMap::new())),
pending: Arc::new(Mutex::new(PendingRequestMap::new())),
}
}
pub(crate) fn pending(&self) -> &Arc<Mutex<PendingRequestMap>> {
pub fn connection(&self) -> Option<&Arc<Connection>> {
self.connection.as_ref()
}
pub fn identity(&self) -> Option<&Identity> {
match &self.connection {
Some(c) => c.identity(),
None => self.stored_identity.as_ref(),
}
}
pub fn pending(&self) -> &Arc<Mutex<PendingRequestMap>> {
&self.pending
}
@@ -95,7 +115,17 @@ impl CallConnection {
pub async fn call_with_payload(&self, payload: Value) -> ResponseEnvelope {
let request_id = generate_request_id();
let (send, recv) = match self.connection.open_bi().await {
let connection = match &self.connection {
Some(c) => c,
None => {
return ResponseEnvelope::error(
request_id,
CallError::internal("no underlying connection (overlay-only)"),
);
}
};
let (send, recv) = match connection.open_bi().await {
Ok(pair) => pair,
Err(err) => {
let call_error = CallError::internal(format!("failed to open stream: {err}"));
@@ -143,7 +173,15 @@ impl CallConnection {
"input": input,
});
let (send, recv) = match self.connection.open_bi().await {
let connection = match &self.connection {
Some(c) => c,
None => {
let call_error = CallError::internal("no underlying connection (overlay-only)");
return SubscriptionStream::closed(request_id, call_error);
}
};
let (send, recv) = match connection.open_bi().await {
Ok(pair) => pair,
Err(err) => {
let call_error = CallError::internal(format!("failed to open stream: {err}"));
@@ -196,8 +234,11 @@ impl CallConnection {
}
async fn write_envelope(&self, envelope: &EventEnvelope) -> Result<(), String> {
let (send, _recv) = self
let connection = self
.connection
.as_ref()
.ok_or_else(|| "no underlying connection (overlay-only)".to_string())?;
let (send, _recv) = connection
.open_bi()
.await
.map_err(|e| format!("failed to open stream: {e}"))?;
@@ -574,7 +615,10 @@ mod tests {
#[test]
fn connection_accessor_returns_underlying_connection() {
let conn = CallConnection::new(stub_connection());
assert_eq!(conn.connection().remote_alpn(), b"alknet/call");
assert_eq!(
conn.connection().expect("quic connection present").remote_alpn(),
b"alknet/call"
);
}
#[test]
@@ -813,4 +857,92 @@ mod tests {
"stream terminates after error"
);
}
// --- non-QUIC (overlay-only) CallConnection ---------------------------
fn sample_identity(id: &str) -> Identity {
Identity {
id: id.to_string(),
scopes: vec!["fs:read".to_string()],
resources: HashMap::new(),
}
}
#[test]
fn overlay_only_constructor_has_no_quic_connection() {
let conn = CallConnection::new_overlay_only(sample_identity("ws-peer"));
assert!(conn.connection().is_none(), "no QUIC connection stored");
}
#[test]
fn overlay_only_identity_returns_stored_identity() {
let conn = CallConnection::new_overlay_only(sample_identity("ws-peer"));
let identity = conn.identity().expect("identity stored");
assert_eq!(identity.id, "ws-peer");
assert_eq!(identity.scopes, vec!["fs:read".to_string()]);
}
#[test]
fn overlay_only_holds_independent_pending_map() {
let conn = CallConnection::new_overlay_only(sample_identity("ws-peer"));
let pending = Arc::clone(conn.pending());
assert!(pending.lock().is_empty());
let _rx = pending.lock().register_call(
"req-overlay-1".to_string(),
Instant::now() + Duration::from_secs(30),
None,
);
assert!(pending.lock().contains("req-overlay-1"));
}
#[test]
fn overlay_only_register_imported_populates_overlay() {
let conn = CallConnection::new_overlay_only(sample_identity("ws-peer"));
let env = conn.overlay_env();
assert!(!env.contains("worker/exec"));
conn.register_imported(imported_registration("worker/exec"));
assert!(env.contains("worker/exec"));
}
#[tokio::test]
async fn overlay_only_overlay_env_dispatches_imported_op() {
let conn = CallConnection::new_overlay_only(sample_identity("ws-peer"));
conn.register_imported(imported_registration("worker/exec"));
let env = conn.overlay_env();
let scoped = ScopedPeerEnv::new(["worker/exec"]);
let ctx = root_context("overlay-root-1", scoped, env.clone());
let response = env
.invoke("worker", "exec", serde_json::json!({"v": 1}), &ctx)
.await;
assert!(response.result.is_ok());
assert_eq!(response.result.unwrap(), serde_json::json!({"v": 1}));
}
#[tokio::test]
async fn overlay_only_call_without_connection_returns_error() {
let conn = CallConnection::new_overlay_only(sample_identity("ws-peer"));
let response = conn.call("fs/readFile", serde_json::json!({})).await;
let err = response.result.expect_err("no connection → error");
assert_eq!(err.code, "INTERNAL");
}
#[test]
fn quic_path_identity_returns_connection_identity() {
let conn = CallConnection::new(stub_connection());
conn.connection()
.expect("quic connection present")
.set_identity(sample_identity("quic-peer"))
.expect("identity not yet set");
let identity = conn.identity().expect("identity from connection");
assert_eq!(identity.id, "quic-peer");
}
#[test]
fn quic_path_stored_identity_is_none_when_connection_present() {
let conn = CallConnection::new(stub_connection());
assert!(conn.connection().is_some(), "QUIC connection present");
assert!(conn.identity().is_none(), "no identity set yet");
}
}