12 KiB
id, name, status, depends_on, scope, risk, impact, level
| id | name | status | depends_on | scope | risk | impact | level | ||||
|---|---|---|---|---|---|---|---|---|---|---|---|
| call/protocol/call-adapter | Implement CallAdapter (ProtocolHandler for alknet/call) with stream handling, identity resolution, and root context construction | completed |
|
broad | high | component | implementation |
Description
Implement CallAdapter in src/protocol/adapter.rs. This is the
ProtocolHandler implementation for ALPN alknet/call — the merge point of the
registry and protocol strands. It ties everything together: stream handling,
identity resolution, root context construction, env composition, dispatch.
CallAdapter struct
pub struct CallAdapter {
registry: Arc<OperationRegistry>, // Layer 0 — curated, immutable
identity_provider: Arc<dyn IdentityProvider>,
session_source: Option<Arc<dyn SessionOverlaySource + Send + Sync>>, // Layer 1
default_timeout: Duration, // 30s default
}
impl CallAdapter {
pub fn new(registry: Arc<OperationRegistry>, identity_provider: Arc<dyn IdentityProvider>) -> Self {
Self { registry, identity_provider, session_source: None,
default_timeout: Duration::from_secs(30) }
}
pub fn with_session_source(mut self, source: Arc<dyn SessionOverlaySource + Send + Sync>) -> Self {
self.session_source = Some(source);
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.default_timeout = timeout;
self
}
}
SessionOverlaySource trait
pub trait SessionOverlaySource: Send + Sync {
fn overlay_for(&self, context: &OperationContext) -> Option<Arc<dyn OperationEnv + Send + Sync>>;
}
Defined in alknet-call because CallAdapter must name the type — alknet-call cannot depend on alknet-agent (agent depends on call, not reverse). The agent crate implements this trait; alknet-call defines it. Same pattern as IdentityProvider (ADR-004).
ProtocolHandler impl
#[async_trait]
impl ProtocolHandler for CallAdapter {
fn alpn(&self) -> &'static [u8] { b"alknet/call" }
async fn handle(&self, connection: Connection, auth: &AuthContext) -> Result<(), HandlerError> {
// 1. Create CallConnection from the Connection
// 2. Spawn a task that continuously calls connection.accept_bi()
// 3. For each accepted stream, read EventEnvelope frames (FrameFramedReader)
// 4. Dispatch call.requested events to the operation registry
// 5. Write response EventEnvelope frames (FrameFramedWriter)
// 6. Manage PendingRequestMap for outgoing calls
// 7. On connection close: fail all pending, return Ok or Err(ConnectionClosed)
}
}
Stream handling
The adapter:
- Spawns a task that continuously calls
connection.accept_bi()to receive incoming streams - For each accepted stream, reads
EventEnvelopeframes usingFrameFramedReader - Dispatches
call.requestedevents to the operation registry - Writes response
EventEnvelopeframes usingFrameFramedWriter - Manages
PendingRequestMapfor outgoing calls initiated by the server
For outgoing calls (server → client), the adapter:
- Opens a bidirectional stream with
connection.open_bi() - Sends
call.requestedon that stream - Adds the request ID to the
PendingRequestMap - Reads responses from any stream, correlates by ID
Identity resolution (per-request)
The CallAdapter resolves identity per-request, not per-connection:
- The endpoint provides
AuthContextwith whatever identity it resolved at the TLS layer (may beNone) - When a
call.requestedevent arrives, the CallAdapter constructs anOperationContextwith the connection-levelAuthContext.identity - If the
call.requestedpayload includes anauth_tokenfield, the CallAdapter resolves it usingIdentityProvider::resolve_from_token(). If resolution succeeds, the resultingIdentityreplaces the connection-level identity in theOperationContext. If resolution fails, the request proceeds with the connection-level identity (which may beNone) - The
OperationContext.identityis passed to theOperationRegistryfor ACL checking - If
identityisNoneand the operation'sAccessControlhas restrictions, the registry returnsFORBIDDENwith message"authentication required"
Key point: Identity is resolved per-request. This allows a single connection to upgrade authentication mid-session and allows different operations on the same connection to have different identity levels.
Root OperationContext construction
When a call.requested arrives from the wire, the CallAdapter constructs the
root OperationContext — the entry point of the call tree. This sets
internal: false, meaning ACL runs against the caller's identity, not a
handler's composition authority (ADR-015, ADR-022).
fn build_root_context(
&self,
request_id: String,
operation_name: &str,
identity: Option<Identity>,
/* connection, session */
) -> OperationContext {
let registration = self.registry.registration(operation_name);
OperationContext {
request_id,
parent_request_id: None, // wire request — top of call tree
identity: identity.clone(), // caller's identity (inbound)
handler_identity: registration.composition_authority.clone(),
capabilities: registration.capabilities.clone(),
metadata: HashMap::new(),
deadline: Some(Instant::now() + self.default_timeout),
scoped_env: registration.scoped_env.clone()
.unwrap_or_else(ScopedOperationEnv::empty),
env: self.compose_root_env(/* connection, session */),
abort_policy: AbortPolicy::default(), // abort-dependents
internal: false, // external call — ACL against caller identity
}
}
compose_root_env
The per-call env composition (ADR-024) builds a CompositeOperationEnv from:
- Layer 0:
LocalOperationEnv(curated registry) - Layer 1: session overlay (if active, from
session_source.overlay_for()) - Layer 2: connection overlay (from
CallConnection.overlay_env())
fn compose_root_env(&self, connection: &CallConnection, context: &OperationContext) -> Arc<dyn OperationEnv + Send + Sync> {
let base = Arc::new(LocalOperationEnv { registry: self.registry.clone() });
let session = self.session_source.as_ref()
.and_then(|s| s.overlay_for(context));
let connection_overlay = connection.overlay_env();
Arc::new(CompositeOperationEnv { session, connection: Some(connection_overlay), base })
}
operationId normalization
The call.requested payload's operationId has a leading slash (/fs/readFile).
The CallAdapter strips it before registry lookup (fs/readFile). This is a
single rule applied consistently — the registry stores names without leading
slash, the wire format adds it.
ResponseEnvelope → EventEnvelope
The CallAdapter converts ResponseEnvelope (from local dispatch) to
EventEnvelope for the wire:
ResponseEnvelope |
EventEnvelope |
|---|---|
Ok(value) |
{ type: "call.responded", id: request_id, payload: { output: value } } |
Err(call_error) |
{ type: "call.error", id: request_id, payload: <serialized CallError> } |
For subscriptions, each call.responded is a separate EventEnvelope with the
same id; call.completed is { type: "call.completed", id, payload: {} }.
Timeout handling
- Default timeout for wire calls is 30 seconds (
default_timeout) build_root_contextsetsOperationContext.deadlinetonow + default_timeout- Composed calls inherit the parent's deadline (children do NOT get a fresh 30s)
- A composed call that exceeds the deadline is cancelled and returns
CallError { code: "TIMEOUT", retryable: true } - Subscriptions default to no deadline (
deadline: None— unbounded); the client can specify a timeout in thecall.requestedpayload - The
PendingRequestMapsweeper runs every 10 seconds and removes expired wire entries
Error handling in handle()
- If a handler panics, the stream is closed and the PendingRequestMap entry is cleaned up by the next sweeper pass. Other streams and the connection are unaffected.
- Connection drop: all pending requests failed with
call.errorcodeINTERNALand message"connection closed". All subscription channels closed.handle()returnsOk(())(clean) orErr(ConnectionClosed). - Stream reset:
FrameFramedReaderreturns an error. If subscription, remove PendingRequestMap entry, close mpsc. If call, resolve oneshot with error. Nocall.abortedsent — stream is gone.
Acceptance Criteria
CallAdapterstruct with registry, identity_provider, session_source, default_timeoutCallAdapter::new(),with_session_source(),with_timeout()constructorsSessionOverlaySourcetrait defined withoverlay_for()methodProtocolHandler::alpn()returnsb"alknet/call"handle()accepts streams, reads EventEnvelope frames, dispatcheshandle()spawns task for continuousaccept_bi()- Outgoing calls: open_bi, send call.requested, add to PendingRequestMap
- Identity resolution: AuthContext.identity used, auth_token overrides per-request
- auth_token resolution failure → proceed with connection-level identity
build_root_contextsets internal: false, deadline, capabilities from registrationcompose_root_envbuilds CompositeOperationEnv (base + session + connection)- operationId leading slash stripped before registry lookup
- ResponseEnvelope → EventEnvelope conversion (Ok → responded, Err → error)
- Subscriptions: multiple call.responded with same id, then call.completed
- Timeout: 30s default, composed calls inherit parent deadline
- Handler panic: stream closed, PendingRequestMap cleaned up, others unaffected
- Connection drop: fail all pending with INTERNAL, return Ok or Err
- Unit test: CallAdapter alpn returns b"alknet/call"
- Integration test: call.requested → dispatch → call.responded round-trip
- Integration test: auth_token overrides connection-level identity
- Integration test: Internal op called from wire → NOT_FOUND
- Integration test: ACL denied → FORBIDDEN
cargo test -p alknet-callsucceedscargo clippy -p alknet-callsucceeds with no warnings
References
- docs/architecture/crates/call/call-protocol.md — CallAdapter, stream handling, root context
- docs/architecture/crates/call/operation-registry.md — OperationContext construction
- docs/architecture/decisions/015-privilege-model-and-authority-context.md — ADR-015 (internal: false for wire)
- docs/architecture/decisions/024-operation-registry-layering.md — ADR-024 (env composition)
- docs/architecture/decisions/012-call-protocol-stream-model.md — ADR-012
Notes
This is the merge point of the registry and protocol strands — the highest- risk task in the call crate. It ties together stream handling, identity resolution, root context construction, env composition, and dispatch. The per-request identity resolution (auth_token overrides connection-level) is important — a single connection can upgrade auth mid-session. The compose_root_env builds the CompositeOperationEnv per call from the active layers. operationId on the wire has a leading slash; strip it before lookup.
Summary
Implemented CallAdapter (ProtocolHandler for ALPN alknet/call) in
protocol/adapter.rs: stream handling via accept_bi loop, per-request identity
resolution (auth_token overrides connection identity, falls back on failure),
root context construction (internal: false, deadline, capabilities + scoped_env
from registration bundle), env composition (CompositeOperationEnv with Layer 0
base + Layer 2 connection overlay + optional Layer 1 session overlay),
operationId leading slash stripped, ResponseEnvelope→EventEnvelope conversion,
PendingRequestMap sweeper every 10s, connection drop fails all pending.
SessionOverlaySource trait. Added pending() accessor to CallConnection.
22 unit tests (142 total in call crate). Clippy clean. Merged to develop.