6.9 KiB
id, name, status, depends_on, scope, risk, impact, level
| id | name | status | depends_on | scope | risk | impact | level | ||
|---|---|---|---|---|---|---|---|---|---|
| call/protocol/call-connection | Implement CallConnection with imported-ops overlay (Layer 2) and call/subscribe/abort methods | completed |
|
moderate | medium | component | implementation |
Description
Implement CallConnection in src/protocol/connection.rs. This represents an
established alknet/call connection, regardless of which side opened it
(ADR-017). It holds the connection's imported-ops overlay (Layer 2, ADR-024).
CallConnection
pub struct CallConnection {
connection: Connection,
imported_operations: Arc<RwLock<HashMap<String, HandlerRegistration>>>,
}
An established alknet/call connection (either direction — accepted or opened).
Holds the Layer 2 overlay (imported ops from from_call discovery).
Layer 2 registration API
impl CallConnection {
/// Register an imported operation into this connection's overlay (Layer 2, ADR-024).
/// Called by from_call after discovery.
pub fn register_imported(&self, registration: HandlerRegistration) {
let name = registration.spec.name.clone();
self.imported_operations.write().insert(name, registration);
}
/// Register multiple imported operations (bulk variant for from_call).
pub fn register_imported_all(&self, registrations: Vec<HandlerRegistration>) {
let mut overlay = self.imported_operations.write();
for reg in registrations {
overlay.insert(reg.spec.name.clone(), reg);
}
}
}
Layer 0 (curated) is built via OperationRegistryBuilder at startup. Layer 2
(per-connection) registration uses CallConnection::register_imported() at
runtime. When the connection drops, the overlay (and all imported ops) is
dropped — no explicit deregistration needed.
Overlay env
impl CallConnection {
/// Build an OperationEnv impl for this connection's overlay.
/// Used by the CallAdapter when composing the root OperationContext.env.
/// Returns an OperationEnv that dispatches to this connection's imported ops
/// (and reports contains only for ops in the overlay).
pub fn overlay_env(&self) -> Arc<dyn OperationEnv + Send + Sync>;
}
This is an OperationEnv impl that dispatches to the connection's imported ops.
The contains() method returns true only for ops in the overlay. The
invoke_with_policy() method looks up the op in the overlay and dispatches to
its handler.
This env is composed into the CompositeOperationEnv by the CallAdapter as the
connection layer (Layer 2).
Call methods (outgoing)
impl CallConnection {
/// Call an operation on the remote peer (sends call.requested).
pub async fn call(&self, operation_id: &str, input: Value) -> ResponseEnvelope;
/// Subscribe to a streaming operation on the remote peer.
pub async fn subscribe(&self, operation_id: &str, input: Value) -> impl Stream<Item = ResponseEnvelope>;
/// Abort an in-flight request (sends call.aborted, cascades per ADR-016).
pub async fn abort(&self, request_id: &str);
}
These methods:
- Open a bidirectional stream with
connection.open_bi() - Send
call.requestedon that stream (via FrameFramedWriter) - Add the request ID to the PendingRequestMap
- Read responses from any stream, correlate by ID (via PendingRequestMap)
call() resolves on the first call.responded. subscribe() yields each
call.responded until call.completed or call.aborted.
abort() sends call.aborted for the given request ID. The abort cascade
(ADR-016) is handled by the abort-cascade task.
Connection direction independence
Per ADR-017, connection direction is independent of call direction. Both
sides can call each other once connected. The CallConnection type is the same
whether the connection was accepted (server side) or opened (client side via
CallClient). The call/subscribe/abort methods work the same way.
from_call integration
The from_call adapter (ADR-017) discovers operations on a remote call
protocol endpoint via services/list and services/schema, then registers
them with register_imported() / register_imported_all(). This makes
cross-node composition transparent — a handler calling
env.invoke("worker", "exec", ...) doesn't know whether the operation is
local or remote.
The from_call adapter itself is not implemented in this task — it's a future
task. This task implements the CallConnection infrastructure that from_call
will use.
Acceptance Criteria
CallConnectionstruct with connection and imported_operations fieldsregister_imported()adds to the Layer 2 overlayregister_imported_all()bulk adds to the overlayoverlay_env()returns an OperationEnv dispatching to imported opsoverlay_env().contains()returns true only for ops in the overlaycall()sends call.requested, resolves on first call.respondedsubscribe()sends call.requested, yields call.responded until completed/abortedabort()sends call.aborted for the request ID- Outgoing calls open a stream, send request, add to PendingRequestMap
- Connection drop drops the overlay (no explicit deregistration)
- Unit test: register_imported adds to overlay, contains returns true
- Unit test: overlay_env dispatches to imported op
- Unit test: overlay_env contains returns false for non-imported op
cargo test -p alknet-callsucceedscargo clippy -p alknet-callsucceeds with no warnings
References
- docs/architecture/crates/call/call-protocol.md — CallConnection section
- docs/architecture/decisions/017-call-protocol-client-and-adapter-contract.md — ADR-017
- docs/architecture/decisions/024-operation-registry-layering.md — ADR-024 (Layer 2)
Notes
Connection direction is independent of call direction (ADR-017) — both sides can call each other. The Layer 2 overlay is per-connection: when the connection drops, the overlay drops (no deregistration needed). The overlay_env() is composed into CompositeOperationEnv by the CallAdapter as the connection layer. The from_call adapter itself is a future task — this implements the infrastructure it will use.
Summary
Implemented CallConnection in protocol/connection.rs with Layer 2 imported-ops
overlay (Arc<RwLock<HashMap>>), register_imported/register_imported_all,
overlay_env() returning an OperationEnv that dispatches to imported ops
(contains() returns true only for overlay ops), and call()/subscribe()/abort()
methods that open a stream, send call.requested, register in PendingRequestMap,
spawn a stream reader, and correlate responses by ID. Connection drop drops the
overlay. Exposed MockConnection + Connection::from_mock in alknet-core for
cross-crate testing. Added parking_lot dep. 9 new connection tests (115 total in
call crate). Clippy clean. Merged to develop.