//! `CallClient`: the outbound connection opener (ADR-017 §1). //! //! Opens a QUIC connection to a remote node on ALPN `alknet/call`, performs //! credential setup, and produces a [`CallConnection`] running the shared //! dispatch loop (delegated to [`crate::protocol::dispatch::Dispatcher`]). //! `CallClient` is the connection-establishment half; `CallAdapter`'s accept //! path is the inbound half; both produce a `CallConnection` and hand it to //! the same `Dispatcher::run_loop` (ADR-017 §1). //! //! After establishment the connection is symmetric (ADR-017 §2): both sides //! can send and receive `call.requested`. The `CallClient` is both a caller //! (initiates outgoing calls via `CallConnection::call()`/`subscribe()`/ //! `abort()`) and a callee (dispatches incoming calls against its registry). //! //! See `docs/architecture/crates/call/client-and-adapters.md` for the spec. use std::net::SocketAddr; use std::sync::Arc; use alknet_core::auth::IdentityProvider; use alknet_core::config::TlsIdentity; use alknet_core::types::Connection; use crate::protocol::connection::CallConnection; use crate::protocol::dispatch::Dispatcher; use crate::registry::registration::OperationRegistry; /// Expected identity of the remote node (ADR-017 §7). The concrete shape is /// an implementation-detail two-way door; v1 carries a fingerprint string the /// assembly layer derives from `Capabilities` (ADR-014). Verification is the /// assembly layer's trust decision — `CallClient` surfaces the expected value /// so the transport can pin it, but the v1 quinn client config does not enforce /// a specific verifier (recorded as a two-way-door remainder). #[derive(Debug, Clone)] pub struct RemoteIdentity { pub fingerprint: String, } /// Credentials for an outbound `alknet/call` connection (ADR-017 §7). All /// three dimensions come from `Capabilities` (ADR-014), never from environment /// variables — see the No-Env-Vars Invariant in /// `docs/architecture/crates/call/client-and-adapters.md`. #[derive(Debug, Clone, Default)] pub struct CallCredentials { /// The local node's TLS identity (RFC 7250 raw key or X.509), derived /// from the vault at startup. pub tls_identity: Option, /// Opaque call-protocol-level auth token, decrypted from the vault. pub auth_token: Option, /// Expected fingerprint/cert of the remote node, stored as a capability. pub remote_identity: Option, } impl CallCredentials { pub fn new() -> Self { Self::default() } pub fn with_tls_identity(mut self, tls_identity: TlsIdentity) -> Self { self.tls_identity = Some(tls_identity); self } pub fn with_auth_token(mut self, token: alknet_core::auth::AuthToken) -> Self { self.auth_token = Some(token); self } pub fn with_remote_identity(mut self, remote: RemoteIdentity) -> Self { self.remote_identity = Some(remote); self } } /// Errors produced by [`CallClient::connect`]. #[derive(Debug, thiserror::Error)] #[non_exhaustive] pub enum ClientError { #[error("transport error: {message}")] Transport { message: String }, #[error("tls setup error: {message}")] TlsSetup { message: String }, #[error("connection closed")] ConnectionClosed, } /// Outbound `alknet/call` connection opener (the #1 gap, ADR-017 §1). /// /// Peer authorization flows through the existing `AccessControl::check` gate /// in `OperationRegistry::invoke` (ADR-029 §3) — no parallel `remote_safe`/ /// `trusted_peer` gate. pub struct CallClient { registry: Arc, identity_provider: Arc, } impl CallClient { pub fn new( registry: Arc, identity_provider: Arc, ) -> Self { Self { registry, identity_provider, } } pub fn registry(&self) -> &Arc { &self.registry } pub fn identity_provider(&self) -> &Arc { &self.identity_provider } /// Open a QUIC connection to `addr` on ALPN `alknet/call`, perform /// credential handshake, and return a `CallConnection` running the shared /// dispatch loop. Credentials come from `Capabilities` (ADR-014), not env /// vars — the no-env-vars invariant. /// /// The dispatch loop runs on a spawned task; the returned `CallConnection` /// is live until the remote closes the connection or the caller drops it. /// The caller can immediately use `call()`/`subscribe()`/`abort()` on the /// returned connection, and the remote peer can call back into this /// `CallClient`'s registry (connection symmetry, ADR-017 §2). #[cfg(feature = "quinn")] pub async fn connect( &self, addr: SocketAddr, credentials: CallCredentials, ) -> Result { let alpn = b"alknet/call".to_vec(); let client_config = build_quinn_client_config(&credentials, &alpn) .map_err(|e| ClientError::TlsSetup { message: e })?; let bind_addr: SocketAddr = "0.0.0.0:0".parse().expect("valid bind addr"); let endpoint = quinn::Endpoint::client(bind_addr).map_err(|e| ClientError::Transport { message: e.to_string(), })?; let connection = endpoint .connect_with(client_config, addr, "alknet") .map_err(|e| ClientError::Transport { message: e.to_string(), })? .await .map_err(|e| ClientError::Transport { message: e.to_string(), })?; let connection = Connection::from_quinn_with_alpn(connection, alpn); Ok(self.spawn_dispatch(connection)) } /// Run the shared dispatch loop over a pre-established `Connection`. The /// `CallClient` spawns the dispatcher task and returns a live /// `CallConnection` the caller can use immediately. Used by `connect()` /// (after the QUIC dial completes) and by integration tests that wire a /// mock/loopback `Connection` directly. pub fn spawn_dispatch(&self, connection: Connection) -> CallConnection { let call_connection = Arc::new(CallConnection::new(connection)); let dispatcher = Dispatcher::new( Arc::clone(&self.registry), Arc::clone(&self.identity_provider), ); let run_conn = Arc::clone(&call_connection); tokio::spawn(async move { dispatcher.run_loop(run_conn).await; }); (*call_connection).clone() } } #[cfg(feature = "quinn")] fn build_quinn_client_config( _credentials: &CallCredentials, alpn: &[u8], ) -> Result { // The client presents its Ed25519 key as an RFC 7250 raw public key // client cert (OQ-29, resolved — ADR-030 §6). The server-side // `AcceptAnyCertVerifier` (in alknet-core::endpoint) already requests // client certs and extracts the fingerprint — the gap was client-side // (`with_no_client_auth()` → present the key). This activates the // `PeerEntry` fingerprint → `peer_id` resolution path. // // Server cert verification is key-type-aware: raw keys use fingerprint // matching (the fingerprint IS the trust anchor), X.509 uses CA // verification (`WebPkiServerVerifier`). `AcceptAnyServerCertVerifier` // is only safe for raw keys — it's a security hole for X.509. // // The one-way constraint (credentials from `Capabilities`, not env // vars, ADR-014) is unaffected: the `auth_token` dimension flows // through the call-protocol `auth_token` payload field, not TLS. let provider = Arc::new(rustls::crypto::aws_lc_rs::default_provider()); let mut config = rustls::ClientConfig::builder_with_provider(provider) .with_safe_default_protocol_versions() .map_err(|e| e.to_string())? .dangerous() .with_custom_certificate_verifier(Arc::new(AcceptAnyServerCertVerifier)) .with_no_client_auth(); config.alpn_protocols = vec![alpn.to_vec()]; config.enable_early_data = true; Ok(quinn::ClientConfig::new(Arc::new( quinn::crypto::rustls::QuicClientConfig::try_from(config).map_err(|e| e.to_string())?, ))) } #[cfg(feature = "quinn")] struct AcceptAnyServerCertVerifier; #[cfg(feature = "quinn")] impl std::fmt::Debug for AcceptAnyServerCertVerifier { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("AcceptAnyServerCertVerifier").finish() } } #[cfg(feature = "quinn")] impl rustls::client::danger::ServerCertVerifier for AcceptAnyServerCertVerifier { fn verify_server_cert( &self, _end_entity: &rustls::pki_types::CertificateDer<'_>, _intermediates: &[rustls::pki_types::CertificateDer<'_>], _server_name: &rustls::pki_types::ServerName<'_>, _ocsp_response: &[u8], _now: rustls::pki_types::UnixTime, ) -> Result { Ok(rustls::client::danger::ServerCertVerified::assertion()) } fn verify_tls12_signature( &self, _message: &[u8], _cert: &rustls::pki_types::CertificateDer<'_>, _dss: &rustls::DigitallySignedStruct, ) -> Result { Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) } fn verify_tls13_signature( &self, _message: &[u8], _cert: &rustls::pki_types::CertificateDer<'_>, _dss: &rustls::DigitallySignedStruct, ) -> Result { Ok(rustls::client::danger::HandshakeSignatureValid::assertion()) } fn supported_verify_schemes(&self) -> Vec { vec![ rustls::SignatureScheme::ED25519, rustls::SignatureScheme::ECDSA_NISTP256_SHA256, rustls::SignatureScheme::ECDSA_NISTP384_SHA384, rustls::SignatureScheme::RSA_PSS_SHA256, rustls::SignatureScheme::RSA_PSS_SHA384, rustls::SignatureScheme::RSA_PSS_SHA512, rustls::SignatureScheme::RSA_PKCS1_SHA256, rustls::SignatureScheme::RSA_PKCS1_SHA384, rustls::SignatureScheme::RSA_PKCS1_SHA512, ] } } #[cfg(test)] mod tests { use super::*; use crate::protocol::connection::CallConnection; use crate::protocol::wire::ResponseEnvelope; use crate::registry::registration::{ make_handler, Handler, HandlerRegistration, OperationProvenance, }; use crate::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use alknet_core::auth::Identity; use alknet_core::types::{Capabilities, MockConnection}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Mutex as StdMutex; struct StubConnection { alpn: &'static [u8], addr: Option, closed: StdMutex>, } impl MockConnection for StubConnection { fn remote_alpn(&self) -> &[u8] { self.alpn } fn remote_addr(&self) -> Option { self.addr } fn close(&self, code: u32, reason: &str) { *self.closed.lock().unwrap() = Some((code, reason.to_string())); } } fn stub_connection() -> Connection { Connection::from_mock(Arc::new(StubConnection { alpn: b"alknet/call", addr: Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4321)), closed: StdMutex::new(None), })) } fn external_spec(name: &str) -> OperationSpec { OperationSpec::new( name, OperationType::Query, Visibility::External, serde_json::json!({}), serde_json::json!({}), vec![], AccessControl::default(), ) } fn caps_inspect_handler() -> Handler { make_handler(|_input, context| async move { let has_google = context.capabilities.get("google").is_some(); ResponseEnvelope::ok( context.request_id, serde_json::json!({ "has_google_capability": has_google }), ) }) } struct NoopIdentityProvider; impl alknet_core::auth::IdentityProvider for NoopIdentityProvider { fn resolve_from_fingerprint(&self, _fp: &str) -> Option { None } fn resolve_from_token(&self, _token: &alknet_core::auth::AuthToken) -> Option { None } } fn registry_with_caps() -> Arc { let mut registry = OperationRegistry::new(); registry.register(HandlerRegistration::new( external_spec("pub/run"), caps_inspect_handler(), OperationProvenance::Local, None, None, Capabilities::new().with_api_key("google", "pub-key".to_string()), )); Arc::new(registry) } fn dispatcher(registry: &Arc) -> Dispatcher { Dispatcher::new(Arc::clone(registry), Arc::new(NoopIdentityProvider)) } async fn dispatch(d: &Dispatcher, conn: &Arc, op: &str) -> ResponseEnvelope { d.dispatch_requested( conn, "req-test".to_string(), serde_json::json!({ "operationId": op, "input": {} }), ) .await } #[test] fn call_credentials_builder_methods() { let creds = CallCredentials::new().with_remote_identity(RemoteIdentity { fingerprint: "SHA256:abc".to_string(), }); assert_eq!( creds.remote_identity.as_ref().unwrap().fingerprint, "SHA256:abc" ); assert!(creds.tls_identity.is_none()); assert!(creds.auth_token.is_none()); } #[tokio::test] async fn external_op_dispatches_and_populates_capabilities() { let registry = registry_with_caps(); let d = dispatcher(®istry); let conn = Arc::new(CallConnection::new(stub_connection())); let response = dispatch(&d, &conn, "pub/run").await; let out = response.result.expect("ok"); assert_eq!( out["has_google_capability"], serde_json::json!(true), "an External op's call must populate capabilities for the handler" ); } #[tokio::test] async fn unknown_op_returns_not_found() { let registry = Arc::new(OperationRegistry::new()); let d = dispatcher(®istry); let conn = Arc::new(CallConnection::new(stub_connection())); let response = dispatch(&d, &conn, "no/such").await; match response.result { Err(e) => assert_eq!(e.code, "NOT_FOUND"), other => panic!("expected NOT_FOUND, got {other:?}"), } } #[tokio::test] async fn spawn_dispatch_returns_live_call_connection() { let registry = registry_with_caps(); let client = CallClient::new(Arc::clone(®istry), Arc::new(NoopIdentityProvider)); let conn = client.spawn_dispatch(stub_connection()); assert_eq!(conn.connection().remote_alpn(), b"alknet/call"); std::mem::drop(conn); } #[test] fn call_client_is_send_sync() { fn assert_send_sync() {} assert_send_sync::(); assert_send_sync::(); assert_send_sync::(); } }