feat(call): CallClient + shared dispatch loop + peer-scoped default-deny (ADR-017, ADR-028)

The #1 gap in alknet-call: the outbound connection opener. Every downstream
consumer (runner, container service, bilateral exchange, NAPI, agent
cross-node dispatch) is blocked on it.

Shared dispatch loop (ADR-017 §1 — the architectural commitment that keeps
CallClient from becoming a parallel protocol implementation):
- Extracts the accept-path dispatch (sweeper, accept_bi loop, handle_stream,
  dispatch_requested, build_root_context, compose_root_env, fail_all on
  close) out of CallAdapter into a new protocol/dispatch.rs Dispatcher struct.
  Both CallAdapter::handle and CallClient::connect produce a CallConnection
  and hand it to Dispatcher::run_loop — the loop is genuinely shared
  (refactored, not duplicated).
- CallAdapter keeps its public API and test-facing wrappers (pub(crate),
  #[cfg(test)]-gated) that delegate to the Dispatcher.

Peer-scoped default-deny (ADR-028 — the one-way-door security dimension):
- RemoteFilter { trusted_peer: bool } on the Dispatcher. In default-deny
  mode (CallClient::new), an incoming call to an op with remote_safe: false
  returns NOT_FOUND *before* any capability material reaches the handler —
  a remote peer's call must not populate OperationContext.capabilities from
  the local registration bundle unless the op is explicitly remote-safe
  (ADR-028 Context). Trusted-peer mode (CallClient::trusted_peer, explicit
  opt-in) bypasses the filter.
- The accept path (CallAdapter) uses RemoteFilter::trusted() by convention: a
  direct QUIC client is not a filtered CallClient peer in the ADR-028 sense.
- OperationRegistry::list_operations_peer_scoped(trusted_peer) +
  services_list_handler_peer_scoped for the CallClient's services/list
  serving path (ADR-028 Assumption 2: a peer should not see ops it cannot
  call, so discovery and dispatch filters agree).

CallClient (src/client/call_client.rs):
- CallClient { registry, identity_provider, trusted_peer: bool }.
- new() default-deny; trusted_peer() explicit opt-in (ADR-028 §3).
- connect(addr, CallCredentials) dials QUIC on ALPN alknet/call (quinn
  feature), spawns Dispatcher::run_loop, returns a live CallConnection.
- spawn_dispatch(connection) shared path for connect + tests.
- CallCredentials { tls_identity, auth_token, remote_identity } — all from
  Capabilities (ADR-014), never env vars (no-env-vars invariant). v1
  connects without client-auth TLS identity (server uses
  AcceptAnyCertVerifier); RawKey client-auth is a two-way-door remainder.
- RemoteIdentity { fingerprint } — concrete shape is a two-way door (OQ-25
  remainder); the one-way constraint is it comes from Capabilities.
- ClientError { Transport, TlsSetup, ConnectionClosed }.
- CallConnection is now Clone (shares the inner Arcs) so connect can hand
  the caller a live clone while the dispatcher task keeps its clone.

Tests (199 lib + 1 integration):
- Unit: default-deny NOT_FOUND for non-remote-safe; remote_safe dispatches;
  trusted-peer dispatches all External; default-deny does NOT populate
  capabilities (the load-bearing security assertion — verified by a handler
  that inspects context.capabilities and the fact that the handler is never
  reached for non-remote-safe ops); remote_safe op populates capabilities;
  services/list peer-scoped hide/trusted variants; CallClient constructors;
  CallCredentials builder; Send+Sync.
- Integration (tests/two_node_call.rs): real QUIC loopback — CallAdapter
  server (self-signed cert via rcgen) accepts, CallClient connects,
  client.call() round-trips to server/echo. Proves the connect path +
  shared dispatch loop work end-to-end.

clippy + fmt + test all green.

Refs: tasks/call/client/call-client.md
Refs: docs/architecture/decisions/017-call-protocol-client-and-adapter-contract.md §1, §2, §7
Refs: docs/architecture/decisions/028-callclient-peer-scoped-registry-filtering.md
Refs: docs/architecture/crates/call/client-and-adapters.md
This commit is contained in:
2026-06-26 13:19:15 +00:00
parent 404d00ae1a
commit 4bf897f5ab
12 changed files with 1376 additions and 222 deletions

View File

@@ -4,43 +4,42 @@
//! dispatches `call.requested` events to the operation registry. See
//! `docs/architecture/crates/call/call-protocol.md` for the full
//! specification.
//!
//! The dispatch loop is shared with [`crate::client::CallClient`] via
//! [`super::dispatch::Dispatcher`] (ADR-017 §1): `CallAdapter` is the
//! inbound (accept) half; `CallClient` is the outbound (connect) half; both
//! produce a [`CallConnection`] and hand it to the same `Dispatcher::run_loop`.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;
use alknet_core::auth::{AuthContext, AuthToken, Identity, IdentityProvider};
use alknet_core::types::{Connection, HandlerError, ProtocolHandler, StreamError};
use alknet_core::auth::{AuthContext, IdentityProvider};
use alknet_core::types::{Connection, HandlerError, ProtocolHandler};
use async_trait::async_trait;
use serde_json::Value;
use tokio::task::JoinHandle;
use tracing::{debug, warn};
use super::abort::AbortCascade;
use super::connection::CallConnection;
use super::wire::{
CallError, EventEnvelope, FrameFramedReader, FrameFramedWriter, ResponseEnvelope,
EVENT_ABORTED, EVENT_REQUESTED,
};
use crate::registry::context::{AbortPolicy, OperationContext, ScopedOperationEnv};
use crate::registry::env::{CompositeOperationEnv, LocalOperationEnv, OperationEnv};
use super::dispatch::{Dispatcher, RemoteFilter};
use crate::registry::context::OperationContext;
use crate::registry::registration::OperationRegistry;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
const SWEEPER_INTERVAL: Duration = Duration::from_secs(10);
pub use super::dispatch::RemoteFilter as AdapterRemoteFilter;
pub struct CallAdapter {
registry: Arc<OperationRegistry>,
identity_provider: Arc<dyn IdentityProvider>,
session_source: Option<Arc<dyn SessionOverlaySource + Send + Sync>>,
default_timeout: Duration,
}
#[cfg(test)]
use super::wire::ResponseEnvelope;
#[cfg(test)]
use alknet_core::auth::Identity;
#[cfg(test)]
use serde_json::Value;
pub trait SessionOverlaySource: Send + Sync {
fn overlay_for(
&self,
context: &OperationContext,
) -> Option<Arc<dyn OperationEnv + Send + Sync>>;
) -> Option<Arc<dyn crate::registry::env::OperationEnv + Send + Sync>>;
}
pub struct CallAdapter {
dispatcher: Dispatcher,
}
impl CallAdapter {
@@ -48,11 +47,11 @@ impl CallAdapter {
registry: Arc<OperationRegistry>,
identity_provider: Arc<dyn IdentityProvider>,
) -> Self {
// The accept path is not peer-scoped-filtered: a direct QUIC client is
// not a CallClient peer in the ADR-028 filtered sense, so the accept
// path lists/dispatches all External ops (trusted-peer posture).
Self {
registry,
identity_provider,
session_source: None,
default_timeout: DEFAULT_TIMEOUT,
dispatcher: Dispatcher::new(registry, identity_provider, RemoteFilter::trusted()),
}
}
@@ -60,184 +59,84 @@ impl CallAdapter {
mut self,
source: Arc<dyn SessionOverlaySource + Send + Sync>,
) -> Self {
self.session_source = Some(source);
self.dispatcher = self.dispatcher.with_session_source(source);
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.default_timeout = timeout;
self.dispatcher = self.dispatcher.with_timeout(timeout);
self
}
pub fn registry(&self) -> &Arc<OperationRegistry> {
&self.registry
&self.dispatcher.registry
}
pub fn identity_provider(&self) -> &Arc<dyn IdentityProvider> {
&self.identity_provider
&self.dispatcher.identity_provider
}
pub fn default_timeout(&self) -> Duration {
self.default_timeout
self.dispatcher.default_timeout
}
fn strip_leading_slash(operation_id: &str) -> &str {
pub fn session_source(&self) -> Option<&Arc<dyn SessionOverlaySource + Send + Sync>> {
self.dispatcher.session_source.as_ref()
}
// --- Test-facing wrappers around the shared Dispatcher -----------------
// These exist so the adapter's existing tests keep compiling against the
// adapter type; they delegate to the Dispatcher's shared implementation.
// Gated to test builds — the production adapter delegates through
// `handle()` -> `Dispatcher::run_loop()` directly.
#[cfg(test)]
pub(crate) fn strip_leading_slash(operation_id: &str) -> &str {
operation_id.strip_prefix('/').unwrap_or(operation_id)
}
fn resolve_identity(
#[cfg(test)]
pub(crate) fn resolve_identity(
&self,
connection_identity: Option<Identity>,
payload: &Value,
) -> Option<Identity> {
let auth_token = payload.get("auth_token").and_then(|v| v.as_str());
match auth_token {
Some(token_str) => {
let token = AuthToken {
raw: token_str.as_bytes().to_vec(),
};
match self.identity_provider.resolve_from_token(&token) {
Some(identity) => Some(identity),
None => connection_identity,
}
}
None => connection_identity,
}
self.dispatcher
.resolve_identity(connection_identity, payload)
}
fn compose_root_env(
&self,
connection: &CallConnection,
context: &OperationContext,
) -> Arc<dyn OperationEnv + Send + Sync> {
let base: Arc<dyn OperationEnv + Send + Sync> =
Arc::new(LocalOperationEnv::new(Arc::clone(&self.registry)));
let session = self
.session_source
.as_ref()
.and_then(|s| s.overlay_for(context));
let connection_overlay = connection.overlay_env();
Arc::new(CompositeOperationEnv::new(
base,
Some(connection_overlay),
session,
))
}
fn build_root_context(
#[cfg(test)]
pub(crate) fn build_root_context(
&self,
request_id: String,
operation_name: &str,
identity: Option<Identity>,
connection: &CallConnection,
) -> OperationContext {
let registration = self.registry.registration(operation_name);
let (composition_authority, capabilities, scoped_env) = match registration {
Some(r) => (
r.composition_authority.clone(),
r.capabilities.clone(),
r.scoped_env
.clone()
.unwrap_or_else(ScopedOperationEnv::empty),
),
None => (
None,
alknet_core::types::Capabilities::new(),
ScopedOperationEnv::empty(),
),
};
let stub_env: Arc<dyn OperationEnv + Send + Sync> =
Arc::new(LocalOperationEnv::new(Arc::clone(&self.registry)));
let mut context = OperationContext {
request_id,
parent_request_id: None,
identity: identity.clone(),
handler_identity: composition_authority,
capabilities,
metadata: HashMap::new(),
deadline: Some(Instant::now() + self.default_timeout),
scoped_env,
env: stub_env,
abort_policy: AbortPolicy::default(),
internal: false,
};
context.env = self.compose_root_env(connection, &context);
context
self.dispatcher
.build_root_context(request_id, operation_name, identity, connection)
}
async fn dispatch_requested(
#[cfg(test)]
pub(crate) async fn dispatch_requested(
&self,
connection: &Arc<CallConnection>,
request_id: String,
payload: Value,
) -> ResponseEnvelope {
let operation_id = payload
.get("operationId")
.and_then(|v| v.as_str())
.unwrap_or("");
let operation_name = Self::strip_leading_slash(operation_id).to_string();
let connection_identity = connection.connection().identity().cloned();
let identity = self.resolve_identity(connection_identity, &payload);
let input = payload.get("input").cloned().unwrap_or(Value::Null);
let context =
self.build_root_context(request_id.clone(), &operation_name, identity, connection);
self.registry.invoke(&operation_name, input, context).await
self.dispatcher
.dispatch_requested(connection, request_id, payload)
.await
}
async fn handle_stream(
#[cfg(test)]
pub(crate) async fn handle_stream(
&self,
connection: Arc<CallConnection>,
send: alknet_core::types::SendStream,
recv: alknet_core::types::RecvStream,
) {
let mut reader = FrameFramedReader::new(recv);
let mut writer = FrameFramedWriter::new(send);
loop {
let envelope = match reader.read_frame().await {
Ok(env) => env,
Err(super::wire::FrameError::ConnectionClosed) => break,
Err(err) => {
warn!(error = %err, "stream frame read error; closing stream");
break;
}
};
match envelope.r#type.as_str() {
EVENT_REQUESTED => {
let request_id = envelope.id.clone();
let payload = envelope.payload.clone();
let response = self
.dispatch_requested(&connection, request_id.clone(), payload)
.await;
let event: EventEnvelope = response.into();
if let Err(err) = writer.write_frame(&event).await {
warn!(error = %err, "failed to write response frame; closing stream");
break;
}
}
EVENT_ABORTED => {
let request_id = envelope.id.clone();
let mut pending = connection.pending().lock();
let mut cascade = AbortCascade::new(&mut pending);
let aborted = cascade.cascade_abort(&request_id, AbortPolicy::AbortDependents);
pending.handle_aborted(&request_id);
if !aborted.is_empty() {
debug!(count = aborted.len(), "abort cascade evicted descendants");
}
}
other => {
debug!(event_type = %other, id = %envelope.id, "ignoring non-requested/non-aborted event on inbound stream");
}
}
}
self.dispatcher.handle_stream(connection, send, recv).await;
}
}
@@ -253,63 +152,7 @@ impl ProtocolHandler for CallAdapter {
}
let call_connection = Arc::new(CallConnection::new(connection));
let pending = Arc::clone(call_connection.pending());
let sweeper_pending = Arc::clone(&pending);
let sweeper_handle: JoinHandle<()> = tokio::spawn(async move {
let mut interval = tokio::time::interval(SWEEPER_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
let evicted = sweeper_pending.lock().evict_expired();
if !evicted.is_empty() {
debug!(
count = evicted.len(),
"sweeper evicted expired pending entries"
);
}
}
});
loop {
match call_connection.connection().accept_bi().await {
Ok((send, recv)) => {
let conn = Arc::clone(&call_connection);
let adapter_registry = Arc::clone(&self.registry);
let adapter_identity = Arc::clone(&self.identity_provider);
let adapter_session = self.session_source.clone();
let adapter_timeout = self.default_timeout;
tokio::spawn(async move {
let adapter = CallAdapter {
registry: adapter_registry,
identity_provider: adapter_identity,
session_source: adapter_session,
default_timeout: adapter_timeout,
};
adapter.handle_stream(conn, send, recv).await;
});
}
Err(StreamError::ConnectionClosed) => break,
Err(StreamError::StreamClosed) => break,
Err(StreamError::Timeout) => break,
Err(err) => {
warn!(error = %err, "accept_bi error; stopping accept loop");
break;
}
}
}
let failed = pending
.lock()
.fail_all(CallError::internal("connection closed"));
if !failed.is_empty() {
debug!(
count = failed.len(),
"failed pending requests on connection close"
);
}
sweeper_handle.abort();
self.dispatcher.clone().run_loop(call_connection).await;
Ok(())
}
}
@@ -317,7 +160,11 @@ impl ProtocolHandler for CallAdapter {
#[cfg(test)]
mod tests {
use super::*;
use crate::protocol::wire::{EVENT_COMPLETED, EVENT_ERROR, EVENT_RESPONDED};
use crate::protocol::wire::{
CallError, EventEnvelope, EVENT_COMPLETED, EVENT_ERROR, EVENT_RESPONDED,
};
use crate::registry::context::{AbortPolicy, ScopedOperationEnv};
use crate::registry::env::OperationEnv;
use crate::registry::registration::{make_handler, HandlerRegistration, OperationProvenance};
use crate::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility};
use alknet_core::auth::AuthToken;
@@ -471,7 +318,7 @@ mod tests {
.with_timeout(Duration::from_secs(60));
assert_eq!(adapter.default_timeout(), Duration::from_secs(60));
assert!(Arc::ptr_eq(adapter.registry(), &registry));
assert!(adapter.session_source.is_none());
assert!(adapter.session_source().is_none());
}
#[test]

View File

@@ -37,6 +37,16 @@ pub struct CallConnection {
pending: Arc<Mutex<PendingRequestMap>>,
}
impl Clone for CallConnection {
fn clone(&self) -> Self {
Self {
connection: Arc::clone(&self.connection),
imported_operations: Arc::clone(&self.imported_operations),
pending: Arc::clone(&self.pending),
}
}
}
impl CallConnection {
pub fn new(connection: Connection) -> Self {
Self {

View File

@@ -0,0 +1,351 @@
//! Shared dispatch loop for `alknet/call` connections.
//!
//! Both [`CallAdapter`]'s accept path and [`crate::client::CallClient`]'s
//! connect path produce a [`CallConnection`] and hand it to the same dispatch
//! loop here (ADR-017 §1): the loop reads `EventEnvelope` frames off accepted
//! bidirectional streams, dispatches `call.requested` events against the
//! operation registry (with optional peer-scoped filtering per ADR-028), and
//! writes the response back on the same stream. The connection-establishment
//! half differs (accept vs dial); the dispatch half is shared.
//!
//! See `docs/architecture/crates/call/call-protocol.md` and
//! `docs/architecture/crates/call/client-and-adapters.md` for the spec.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use alknet_core::auth::{AuthToken, Identity, IdentityProvider};
use alknet_core::types::StreamError;
use serde_json::Value;
use tokio::task::JoinHandle;
use tracing::{debug, warn};
use super::abort::AbortCascade;
use super::connection::CallConnection;
use super::wire::{
CallError, EventEnvelope, FrameFramedReader, FrameFramedWriter, ResponseEnvelope,
EVENT_ABORTED, EVENT_REQUESTED,
};
use crate::protocol::adapter::SessionOverlaySource;
use crate::registry::context::{AbortPolicy, OperationContext, ScopedOperationEnv};
use crate::registry::env::{CompositeOperationEnv, LocalOperationEnv, OperationEnv};
use crate::registry::registration::OperationRegistry;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
const SWEEPER_INTERVAL: Duration = Duration::from_secs(10);
/// Peer-scoped registry filter state (ADR-028). When `trusted_peer` is false
/// (the default-deny mode for a `CallClient`), incoming dispatch hides ops
/// whose `HandlerRegistration.remote_safe` is false, and `services/list` hides
/// them too. When `trusted_peer` is true (the explicit opt-in for trusted
/// peers), the filter is bypassed: all `External` ops dispatch and list.
///
/// For the `CallAdapter` (local accept path), `trusted_peer` is `true` by
/// convention — a direct QUIC client is not a filtered `CallClient` peer in
/// the ADR-028 sense; the accept path keeps listing all `External` ops.
#[derive(Clone, Copy)]
pub struct RemoteFilter {
pub trusted_peer: bool,
}
impl RemoteFilter {
/// Default-deny mode: only `remote_safe: true` ops dispatch/list.
pub fn default_deny() -> Self {
Self {
trusted_peer: false,
}
}
/// Trusted-peer mode: all `External` ops dispatch/list regardless of
/// `remote_safe`.
pub fn trusted() -> Self {
Self { trusted_peer: true }
}
/// Returns whether `registration` is dispatchable to the remote peer.
pub fn allows(&self, remote_safe: bool) -> bool {
self.trusted_peer || remote_safe
}
}
/// Shared dispatcher for an established `CallConnection`. Constructed by
/// both `CallAdapter` (accept path) and `CallClient` (connect path) and used
/// to run the dispatch loop. Holds no per-connection state; the
/// `CallConnection` is passed into `run_loop`.
pub struct Dispatcher {
pub registry: Arc<OperationRegistry>,
pub identity_provider: Arc<dyn IdentityProvider>,
pub session_source: Option<Arc<dyn SessionOverlaySource + Send + Sync>>,
pub default_timeout: Duration,
pub remote_filter: RemoteFilter,
}
impl Dispatcher {
pub fn new(
registry: Arc<OperationRegistry>,
identity_provider: Arc<dyn IdentityProvider>,
remote_filter: RemoteFilter,
) -> Self {
Self {
registry,
identity_provider,
session_source: None,
default_timeout: DEFAULT_TIMEOUT,
remote_filter,
}
}
pub fn with_session_source(
mut self,
source: Arc<dyn SessionOverlaySource + Send + Sync>,
) -> Self {
self.session_source = Some(source);
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.default_timeout = timeout;
self
}
fn strip_leading_slash(operation_id: &str) -> &str {
operation_id.strip_prefix('/').unwrap_or(operation_id)
}
pub(crate) fn resolve_identity(
&self,
connection_identity: Option<Identity>,
payload: &Value,
) -> Option<Identity> {
let auth_token = payload.get("auth_token").and_then(|v| v.as_str());
match auth_token {
Some(token_str) => {
let token = AuthToken {
raw: token_str.as_bytes().to_vec(),
};
match self.identity_provider.resolve_from_token(&token) {
Some(identity) => Some(identity),
None => connection_identity,
}
}
None => connection_identity,
}
}
pub(crate) fn compose_root_env(
&self,
connection: &CallConnection,
context: &OperationContext,
) -> Arc<dyn OperationEnv + Send + Sync> {
let base: Arc<dyn OperationEnv + Send + Sync> =
Arc::new(LocalOperationEnv::new(Arc::clone(&self.registry)));
let session = self
.session_source
.as_ref()
.and_then(|s| s.overlay_for(context));
let connection_overlay = connection.overlay_env();
Arc::new(CompositeOperationEnv::new(
base,
Some(connection_overlay),
session,
))
}
pub(crate) fn build_root_context(
&self,
request_id: String,
operation_name: &str,
identity: Option<Identity>,
connection: &CallConnection,
) -> OperationContext {
let registration = self.registry.registration(operation_name);
let (composition_authority, capabilities, scoped_env) = match registration {
Some(r) => (
r.composition_authority.clone(),
r.capabilities.clone(),
r.scoped_env
.clone()
.unwrap_or_else(ScopedOperationEnv::empty),
),
None => (
None,
alknet_core::types::Capabilities::new(),
ScopedOperationEnv::empty(),
),
};
let stub_env: Arc<dyn OperationEnv + Send + Sync> =
Arc::new(LocalOperationEnv::new(Arc::clone(&self.registry)));
let mut context = OperationContext {
request_id,
parent_request_id: None,
identity: identity.clone(),
handler_identity: composition_authority,
capabilities,
metadata: HashMap::new(),
deadline: Some(Instant::now() + self.default_timeout),
scoped_env,
env: stub_env,
abort_policy: AbortPolicy::default(),
internal: false,
};
context.env = self.compose_root_env(connection, &context);
context
}
pub(crate) async fn dispatch_requested(
&self,
connection: &Arc<CallConnection>,
request_id: String,
payload: Value,
) -> ResponseEnvelope {
let operation_id = payload
.get("operationId")
.and_then(|v| v.as_str())
.unwrap_or("");
let operation_name = Self::strip_leading_slash(operation_id).to_string();
// Peer-scoped default-deny filter (ADR-028). When the caller is a
// remote peer (default-deny mode), an op marked `remote_safe: false`
// is hidden from dispatch — return NOT_FOUND, same posture as
// `Visibility::Internal` per ADR-015. Critically, this returns *before*
// any capability material reaches the handler, so a non-remote-safe
// op's `Capabilities` are never populated for a remote peer's call
// (ADR-028 Context — the security argument for default-deny).
if let Some(registration) = self.registry.registration(&operation_name) {
if !self.remote_filter.allows(registration.remote_safe) {
return ResponseEnvelope::not_found(request_id, &operation_name);
}
}
let connection_identity = connection.connection().identity().cloned();
let identity = self.resolve_identity(connection_identity, &payload);
let input = payload.get("input").cloned().unwrap_or(Value::Null);
let context =
self.build_root_context(request_id.clone(), &operation_name, identity, connection);
self.registry.invoke(&operation_name, input, context).await
}
pub(crate) async fn handle_stream(
&self,
connection: Arc<CallConnection>,
send: alknet_core::types::SendStream,
recv: alknet_core::types::RecvStream,
) {
let mut reader = FrameFramedReader::new(recv);
let mut writer = FrameFramedWriter::new(send);
loop {
let envelope = match reader.read_frame().await {
Ok(env) => env,
Err(super::wire::FrameError::ConnectionClosed) => break,
Err(err) => {
warn!(error = %err, "stream frame read error; closing stream");
break;
}
};
match envelope.r#type.as_str() {
EVENT_REQUESTED => {
let request_id = envelope.id.clone();
let payload = envelope.payload.clone();
let response = self
.dispatch_requested(&connection, request_id.clone(), payload)
.await;
let event: EventEnvelope = response.into();
if let Err(err) = writer.write_frame(&event).await {
warn!(error = %err, "failed to write response frame; closing stream");
break;
}
}
EVENT_ABORTED => {
let request_id = envelope.id.clone();
let mut pending = connection.pending().lock();
let mut cascade = AbortCascade::new(&mut pending);
let aborted = cascade.cascade_abort(&request_id, AbortPolicy::AbortDependents);
pending.handle_aborted(&request_id);
if !aborted.is_empty() {
debug!(count = aborted.len(), "abort cascade evicted descendants");
}
}
other => {
debug!(event_type = %other, id = %envelope.id, "ignoring non-requested/non-aborted event on inbound stream");
}
}
}
}
/// Run the shared dispatch loop over an established `CallConnection`:
/// spawn the pending-entry sweeper, accept bidirectional streams until the
/// connection closes, dispatch each stream via `handle_stream`, and fail
/// outstanding pending requests on close. Returns when the connection is
/// closed (accept loop yields `ConnectionClosed`/`StreamClosed`/`Timeout`).
pub async fn run_loop(self, connection: Arc<CallConnection>) {
let pending = Arc::clone(connection.pending());
let sweeper_pending = Arc::clone(&pending);
let sweeper_handle: JoinHandle<()> = tokio::spawn(async move {
let mut interval = tokio::time::interval(SWEEPER_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
let evicted = sweeper_pending.lock().evict_expired();
if !evicted.is_empty() {
debug!(
count = evicted.len(),
"sweeper evicted expired pending entries"
);
}
}
});
loop {
match connection.connection().accept_bi().await {
Ok((send, recv)) => {
let conn = Arc::clone(&connection);
let dispatcher = self.clone();
tokio::spawn(async move {
dispatcher.handle_stream(conn, send, recv).await;
});
}
Err(StreamError::ConnectionClosed) => break,
Err(StreamError::StreamClosed) => break,
Err(StreamError::Timeout) => break,
Err(err) => {
warn!(error = %err, "accept_bi error; stopping accept loop");
break;
}
}
}
let failed = pending
.lock()
.fail_all(CallError::internal("connection closed"));
if !failed.is_empty() {
debug!(
count = failed.len(),
"failed pending requests on connection close"
);
}
sweeper_handle.abort();
}
}
impl Clone for Dispatcher {
fn clone(&self) -> Self {
Self {
registry: Arc::clone(&self.registry),
identity_provider: Arc::clone(&self.identity_provider),
session_source: self.session_source.clone(),
default_timeout: self.default_timeout,
remote_filter: self.remote_filter,
}
}
}

View File

@@ -7,5 +7,6 @@
pub mod abort;
pub mod adapter;
pub mod connection;
pub mod dispatch;
pub mod pending;
pub mod wire;