Subscription ops discovered via services/list + services/schema now register a StreamingHandler (HandlerKind::Stream) that calls CallConnection::subscribe_with_payload and forwards the remote stream end-to-end (ADR-049 §8). Query/Mutation ops keep the existing make_forwarding_handler (HandlerKind::Once). - Add CallConnection::subscribe_with_payload(payload) mirroring call_with_payload so the forwarding handler can populate forwarded_for (ADR-032) + auth_token on the subscription payload. subscribe() now delegates to subscribe_with_payload. - Add make_streaming_forwarding_handler() in from_call.rs using make_streaming_handler + futures::stream::once(...).flatten() to await subscribe_with_payload then forward its stream. - Branch build_bundles on spec.op_type (already parsed by rebuild_spec_for). - Reuse build_forwarded_payload — no new payload-construction code. - composition_authority: None, scoped_env: None for FromCall streaming leaves (same as Query/Mutation FromCall leaves). - Abort cascade (ADR-016 §6) already wired via PendingRequestMap in subscribe_with_payload. Closes the gap where a from_call-imported Subscription truncated to the first value.
1024 lines
35 KiB
Rust
1024 lines
35 KiB
Rust
//! `CallConnection`: an established `alknet/call` connection (either
|
|
//! direction — accepted or opened). Holds the connection's Layer 2 overlay
|
|
//! (imported ops).
|
|
//!
|
|
//! See `docs/architecture/crates/call/call-protocol.md` for the full
|
|
//! specification.
|
|
|
|
use std::collections::HashMap;
|
|
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
use std::task::{Context, Poll};
|
|
use std::time::{Duration, Instant};
|
|
|
|
use alknet_core::auth::Identity;
|
|
use alknet_core::types::Connection;
|
|
use futures::stream::Stream;
|
|
use parking_lot::{Mutex, RwLock};
|
|
use serde_json::Value;
|
|
use tokio::sync::mpsc;
|
|
|
|
use super::pending::PendingRequestMap;
|
|
use super::wire::{
|
|
CallError, EventEnvelope, FrameFramedReader, FrameFramedWriter, EVENT_ABORTED, EVENT_COMPLETED,
|
|
EVENT_ERROR, EVENT_RESPONDED,
|
|
};
|
|
use crate::protocol::wire::ResponseEnvelope;
|
|
use crate::registry::context::{generate_request_id, AbortPolicy, OperationContext, ScopedPeerEnv};
|
|
use crate::registry::env::OperationEnv;
|
|
use crate::registry::registration::{HandlerKind, HandlerRegistration};
|
|
use crate::registry::spec::AccessResult;
|
|
|
|
const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(30);
|
|
|
|
pub struct CallConnection {
|
|
connection: Option<Arc<Connection>>,
|
|
stored_identity: Option<Identity>,
|
|
imported_operations: Arc<RwLock<HashMap<String, HandlerRegistration>>>,
|
|
pending: Arc<Mutex<PendingRequestMap>>,
|
|
}
|
|
|
|
impl Clone for CallConnection {
|
|
fn clone(&self) -> Self {
|
|
Self {
|
|
connection: self.connection.clone(),
|
|
stored_identity: self.stored_identity.clone(),
|
|
imported_operations: Arc::clone(&self.imported_operations),
|
|
pending: Arc::clone(&self.pending),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl CallConnection {
|
|
pub fn new(connection: Connection) -> Self {
|
|
Self {
|
|
connection: Some(Arc::new(connection)),
|
|
stored_identity: None,
|
|
imported_operations: Arc::new(RwLock::new(HashMap::new())),
|
|
pending: Arc::new(Mutex::new(PendingRequestMap::new())),
|
|
}
|
|
}
|
|
|
|
pub fn new_overlay_only(identity: Identity) -> Self {
|
|
Self {
|
|
connection: None,
|
|
stored_identity: Some(identity),
|
|
imported_operations: Arc::new(RwLock::new(HashMap::new())),
|
|
pending: Arc::new(Mutex::new(PendingRequestMap::new())),
|
|
}
|
|
}
|
|
|
|
pub fn connection(&self) -> Option<&Arc<Connection>> {
|
|
self.connection.as_ref()
|
|
}
|
|
|
|
pub fn identity(&self) -> Option<&Identity> {
|
|
match &self.connection {
|
|
Some(c) => c.identity(),
|
|
None => self.stored_identity.as_ref(),
|
|
}
|
|
}
|
|
|
|
pub fn pending(&self) -> &Arc<Mutex<PendingRequestMap>> {
|
|
&self.pending
|
|
}
|
|
|
|
pub fn register_imported(&self, registration: HandlerRegistration) {
|
|
let name = registration.spec.name.clone();
|
|
self.imported_operations.write().insert(name, registration);
|
|
}
|
|
|
|
pub fn register_imported_all(&self, registrations: Vec<HandlerRegistration>) {
|
|
let mut overlay = self.imported_operations.write();
|
|
for reg in registrations {
|
|
overlay.insert(reg.spec.name.clone(), reg);
|
|
}
|
|
}
|
|
|
|
pub fn overlay_env(&self) -> Arc<dyn OperationEnv + Send + Sync> {
|
|
Arc::new(OverlayOperationEnv {
|
|
overlay: Arc::clone(&self.imported_operations),
|
|
})
|
|
}
|
|
|
|
pub async fn call(&self, operation_id: &str, input: Value) -> ResponseEnvelope {
|
|
let payload = serde_json::json!({
|
|
"operationId": operation_id,
|
|
"input": input,
|
|
});
|
|
self.call_with_payload(payload).await
|
|
}
|
|
|
|
/// Invoke a remote op with a caller-constructed `call.requested` payload.
|
|
/// The payload MUST include `operationId` and `input`; the caller may add
|
|
/// `forwarded_for` (ADR-032) and `auth_token` (ADR-017 §7) for the hub
|
|
/// forwarding path used by `from_call`.
|
|
pub async fn call_with_payload(&self, payload: Value) -> ResponseEnvelope {
|
|
let request_id = generate_request_id();
|
|
|
|
let connection = match &self.connection {
|
|
Some(c) => c,
|
|
None => {
|
|
return ResponseEnvelope::error(
|
|
request_id,
|
|
CallError::internal("no underlying connection (overlay-only)"),
|
|
);
|
|
}
|
|
};
|
|
|
|
let (send, recv) = match connection.open_bi().await {
|
|
Ok(pair) => pair,
|
|
Err(err) => {
|
|
let call_error = CallError::internal(format!("failed to open stream: {err}"));
|
|
return ResponseEnvelope::error(request_id, call_error);
|
|
}
|
|
};
|
|
|
|
let receiver = {
|
|
let mut pending = self.pending.lock();
|
|
pending.register_call(
|
|
request_id.clone(),
|
|
Instant::now() + DEFAULT_CALL_TIMEOUT,
|
|
None,
|
|
)
|
|
};
|
|
|
|
if let Err(err) = self.write_request(send, &request_id, payload).await {
|
|
let call_error = CallError::internal(err);
|
|
self.pending
|
|
.lock()
|
|
.handle_error(&request_id, call_error.clone());
|
|
return ResponseEnvelope::error(request_id, call_error);
|
|
}
|
|
|
|
let pending = Arc::clone(&self.pending);
|
|
tokio::spawn(async move {
|
|
read_stream_until_closed(recv, &pending).await;
|
|
});
|
|
|
|
match receiver.await {
|
|
Ok(Ok(value)) => ResponseEnvelope::ok(request_id, value),
|
|
Ok(Err(error)) => ResponseEnvelope::error(request_id, error),
|
|
Err(_) => ResponseEnvelope::error(request_id, CallError::internal("request cancelled")),
|
|
}
|
|
}
|
|
|
|
pub async fn subscribe(
|
|
&self,
|
|
operation_id: &str,
|
|
input: Value,
|
|
) -> impl Stream<Item = ResponseEnvelope> {
|
|
let payload = serde_json::json!({
|
|
"operationId": operation_id,
|
|
"input": input,
|
|
});
|
|
self.subscribe_with_payload(payload).await
|
|
}
|
|
|
|
/// Subscribe to a remote op with a caller-constructed `call.requested`
|
|
/// payload. The payload MUST include `operationId` and `input`; the
|
|
/// caller may add `forwarded_for` (ADR-032) and `auth_token` (ADR-017 §7)
|
|
/// for the hub forwarding path used by `from_call`'s streaming forwarding
|
|
/// handler. Mirrors [`call_with_payload`](Self::call_with_payload) so the
|
|
/// forwarding handler can populate `forwarded_for` + `auth_token` on the
|
|
/// subscription payload (the plain [`subscribe`](Self::subscribe) builds
|
|
/// the payload internally and omits those fields).
|
|
pub async fn subscribe_with_payload(
|
|
&self,
|
|
payload: Value,
|
|
) -> impl Stream<Item = ResponseEnvelope> {
|
|
let request_id = generate_request_id();
|
|
|
|
let connection = match &self.connection {
|
|
Some(c) => c,
|
|
None => {
|
|
let call_error = CallError::internal("no underlying connection (overlay-only)");
|
|
return SubscriptionStream::closed(request_id, call_error);
|
|
}
|
|
};
|
|
|
|
let (send, recv) = match connection.open_bi().await {
|
|
Ok(pair) => pair,
|
|
Err(err) => {
|
|
let call_error = CallError::internal(format!("failed to open stream: {err}"));
|
|
return SubscriptionStream::closed(request_id, call_error);
|
|
}
|
|
};
|
|
|
|
let receiver = {
|
|
let mut pending = self.pending.lock();
|
|
pending.register_subscribe(request_id.clone(), None, None)
|
|
};
|
|
|
|
if let Err(err) = self.write_request(send, &request_id, payload).await {
|
|
let call_error = CallError::internal(err);
|
|
self.pending
|
|
.lock()
|
|
.handle_error(&request_id, call_error.clone());
|
|
return SubscriptionStream::closed(request_id, call_error);
|
|
}
|
|
|
|
let pending = Arc::clone(&self.pending);
|
|
tokio::spawn(async move {
|
|
read_stream_until_closed(recv, &pending).await;
|
|
});
|
|
|
|
SubscriptionStream::new(request_id, receiver)
|
|
}
|
|
|
|
pub async fn abort(&self, request_id: &str) {
|
|
let envelope = EventEnvelope::aborted(request_id);
|
|
if let Err(err) = self.write_envelope(&envelope).await {
|
|
tracing::warn!(error = %err, request_id, "failed to send call.aborted");
|
|
return;
|
|
}
|
|
self.pending.lock().handle_aborted(request_id);
|
|
}
|
|
|
|
async fn write_request(
|
|
&self,
|
|
send: alknet_core::types::SendStream,
|
|
request_id: &str,
|
|
payload: Value,
|
|
) -> Result<(), String> {
|
|
let envelope = EventEnvelope::requested(request_id, payload);
|
|
let mut writer = FrameFramedWriter::new(send);
|
|
writer
|
|
.write_frame(&envelope)
|
|
.await
|
|
.map_err(|e| format!("failed to write frame: {e}"))
|
|
}
|
|
|
|
async fn write_envelope(&self, envelope: &EventEnvelope) -> Result<(), String> {
|
|
let connection = self
|
|
.connection
|
|
.as_ref()
|
|
.ok_or_else(|| "no underlying connection (overlay-only)".to_string())?;
|
|
let (send, _recv) = connection
|
|
.open_bi()
|
|
.await
|
|
.map_err(|e| format!("failed to open stream: {e}"))?;
|
|
let mut writer = FrameFramedWriter::new(send);
|
|
writer
|
|
.write_frame(envelope)
|
|
.await
|
|
.map_err(|e| format!("failed to write frame: {e}"))
|
|
}
|
|
}
|
|
|
|
async fn read_stream_until_closed(
|
|
recv: alknet_core::types::RecvStream,
|
|
pending: &Arc<Mutex<PendingRequestMap>>,
|
|
) {
|
|
let mut reader = FrameFramedReader::new(recv);
|
|
while let Ok(envelope) = reader.read_frame().await {
|
|
dispatch_envelope(pending, envelope);
|
|
}
|
|
}
|
|
|
|
fn dispatch_envelope(pending: &Arc<Mutex<PendingRequestMap>>, envelope: EventEnvelope) {
|
|
let request_id = envelope.id.clone();
|
|
match envelope.r#type.as_str() {
|
|
EVENT_RESPONDED => {
|
|
let output = envelope
|
|
.payload
|
|
.get("output")
|
|
.cloned()
|
|
.unwrap_or(Value::Null);
|
|
pending.lock().handle_responded(&request_id, output);
|
|
}
|
|
EVENT_COMPLETED => {
|
|
pending.lock().handle_completed(&request_id);
|
|
}
|
|
EVENT_ABORTED => {
|
|
pending.lock().handle_aborted(&request_id);
|
|
}
|
|
EVENT_ERROR => {
|
|
if let Ok(error) = serde_json::from_value::<CallError>(envelope.payload) {
|
|
pending.lock().handle_error(&request_id, error);
|
|
}
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
struct OverlayOperationEnv {
|
|
overlay: Arc<RwLock<HashMap<String, HandlerRegistration>>>,
|
|
}
|
|
|
|
#[async_trait::async_trait]
|
|
impl OperationEnv for OverlayOperationEnv {
|
|
async fn invoke_with_policy(
|
|
&self,
|
|
namespace: &str,
|
|
operation: &str,
|
|
input: Value,
|
|
parent: &OperationContext,
|
|
policy: AbortPolicy,
|
|
) -> ResponseEnvelope {
|
|
let name = format!("{namespace}/{operation}");
|
|
|
|
if !parent.scoped_env.allows(&name) {
|
|
return ResponseEnvelope::not_found(parent.request_id.clone(), &name);
|
|
}
|
|
|
|
let handler: HandlerKind;
|
|
let composition_authority;
|
|
let scoped_env;
|
|
let access_control;
|
|
{
|
|
let overlay = self.overlay.read();
|
|
let Some(registration) = overlay.get(&name) else {
|
|
return ResponseEnvelope::not_found(parent.request_id.clone(), &name);
|
|
};
|
|
handler = registration.handler.clone();
|
|
composition_authority = registration.composition_authority.clone();
|
|
scoped_env = registration
|
|
.scoped_env
|
|
.clone()
|
|
.unwrap_or_else(ScopedPeerEnv::empty);
|
|
access_control = registration.spec.access_control.clone();
|
|
}
|
|
|
|
let caller_identity = if parent.internal {
|
|
parent
|
|
.handler_identity
|
|
.as_ref()
|
|
.and_then(|ca| ca.as_identity())
|
|
} else {
|
|
parent.identity.clone()
|
|
};
|
|
if let AccessResult::Forbidden(message) = access_control.check(caller_identity.as_ref()) {
|
|
return ResponseEnvelope::forbidden(parent.request_id.clone(), message);
|
|
}
|
|
|
|
let context = OperationContext {
|
|
request_id: generate_request_id(),
|
|
parent_request_id: Some(parent.request_id.clone()),
|
|
identity: parent
|
|
.handler_identity
|
|
.as_ref()
|
|
.and_then(|ca| ca.as_identity()),
|
|
handler_identity: composition_authority,
|
|
forwarded_for: None,
|
|
capabilities: parent.capabilities.clone(),
|
|
metadata: HashMap::new(),
|
|
abort_policy: policy,
|
|
deadline: parent.deadline,
|
|
scoped_env,
|
|
env: parent.env.clone(),
|
|
internal: true,
|
|
};
|
|
|
|
match handler {
|
|
HandlerKind::Once(h) => h(input, context).await,
|
|
HandlerKind::Stream(_) => ResponseEnvelope::error(
|
|
parent.request_id.clone(),
|
|
CallError::invalid_operation_type(
|
|
"OperationEnv::invoke() called on a Subscription op; composition is request/response-only",
|
|
),
|
|
),
|
|
}
|
|
}
|
|
|
|
fn contains(&self, name: &str) -> bool {
|
|
self.overlay.read().contains_key(name)
|
|
}
|
|
}
|
|
|
|
pub struct SubscriptionStream {
|
|
request_id: String,
|
|
receiver: mpsc::Receiver<Result<Value, CallError>>,
|
|
done: bool,
|
|
}
|
|
|
|
impl SubscriptionStream {
|
|
fn new(request_id: String, receiver: mpsc::Receiver<Result<Value, CallError>>) -> Self {
|
|
Self {
|
|
request_id,
|
|
receiver,
|
|
done: false,
|
|
}
|
|
}
|
|
|
|
fn closed(request_id: String, error: CallError) -> Self {
|
|
let (tx, rx) = mpsc::channel(1);
|
|
let _ = tx.try_send(Err(error));
|
|
Self {
|
|
request_id,
|
|
receiver: rx,
|
|
done: false,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Stream for SubscriptionStream {
|
|
type Item = ResponseEnvelope;
|
|
|
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
if self.done {
|
|
return Poll::Ready(None);
|
|
}
|
|
let this = self.get_mut();
|
|
match this.receiver.poll_recv(cx) {
|
|
Poll::Ready(None) => {
|
|
this.done = true;
|
|
Poll::Ready(None)
|
|
}
|
|
Poll::Ready(Some(Ok(value))) => {
|
|
Poll::Ready(Some(ResponseEnvelope::ok(this.request_id.clone(), value)))
|
|
}
|
|
Poll::Ready(Some(Err(error))) => {
|
|
this.done = true;
|
|
Poll::Ready(Some(ResponseEnvelope::error(
|
|
this.request_id.clone(),
|
|
error,
|
|
)))
|
|
}
|
|
Poll::Pending => Poll::Pending,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::registry::context::CompositionAuthority;
|
|
use crate::registry::registration::{make_handler, Handler, HandlerKind, OperationProvenance};
|
|
use crate::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility};
|
|
use alknet_core::types::{Capabilities, MockConnection};
|
|
use std::collections::HashMap;
|
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
|
use std::sync::Mutex as StdMutex;
|
|
use std::time::{Duration, Instant};
|
|
|
|
struct StubConnection {
|
|
alpn: &'static [u8],
|
|
addr: Option<SocketAddr>,
|
|
closed: StdMutex<Option<(u32, String)>>,
|
|
}
|
|
|
|
impl MockConnection for StubConnection {
|
|
fn remote_alpn(&self) -> &[u8] {
|
|
self.alpn
|
|
}
|
|
fn remote_addr(&self) -> Option<SocketAddr> {
|
|
self.addr
|
|
}
|
|
fn close(&self, code: u32, reason: &str) {
|
|
*self.closed.lock().unwrap() = Some((code, reason.to_string()));
|
|
}
|
|
}
|
|
|
|
fn stub_connection() -> Connection {
|
|
Connection::from_mock(Arc::new(StubConnection {
|
|
alpn: b"alknet/call",
|
|
addr: Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4321)),
|
|
closed: StdMutex::new(None),
|
|
}))
|
|
}
|
|
|
|
fn external_spec(name: &str) -> OperationSpec {
|
|
OperationSpec::new(
|
|
name,
|
|
OperationType::Query,
|
|
Visibility::External,
|
|
serde_json::json!({}),
|
|
serde_json::json!({}),
|
|
vec![],
|
|
AccessControl::default(),
|
|
)
|
|
}
|
|
|
|
fn echo_handler() -> Handler {
|
|
make_handler(
|
|
|input, context| async move { ResponseEnvelope::ok(context.request_id, input) },
|
|
)
|
|
}
|
|
|
|
fn imported_registration(name: &str) -> HandlerRegistration {
|
|
HandlerRegistration::new(
|
|
external_spec(name),
|
|
HandlerKind::Once(echo_handler()),
|
|
OperationProvenance::FromCall,
|
|
None,
|
|
None,
|
|
Capabilities::new(),
|
|
)
|
|
}
|
|
|
|
fn root_context(
|
|
request_id: &str,
|
|
scoped_env: ScopedPeerEnv,
|
|
env: Arc<dyn OperationEnv + Send + Sync>,
|
|
) -> OperationContext {
|
|
OperationContext {
|
|
request_id: request_id.to_string(),
|
|
parent_request_id: None,
|
|
identity: None,
|
|
handler_identity: Some(CompositionAuthority::new("agent", ["fs:read".to_string()])),
|
|
forwarded_for: None,
|
|
capabilities: Capabilities::new(),
|
|
metadata: HashMap::new(),
|
|
scoped_env,
|
|
env,
|
|
abort_policy: AbortPolicy::default(),
|
|
deadline: Some(Instant::now() + Duration::from_secs(30)),
|
|
internal: true,
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn register_imported_adds_to_overlay_and_contains_returns_true() {
|
|
let conn = CallConnection::new(stub_connection());
|
|
let env = conn.overlay_env();
|
|
|
|
assert!(!env.contains("worker/exec"));
|
|
|
|
conn.register_imported(imported_registration("worker/exec"));
|
|
|
|
assert!(env.contains("worker/exec"));
|
|
assert!(!env.contains("worker/missing"));
|
|
}
|
|
|
|
#[test]
|
|
fn register_imported_all_bulk_adds_to_overlay() {
|
|
let conn = CallConnection::new(stub_connection());
|
|
let env = conn.overlay_env();
|
|
|
|
conn.register_imported_all(vec![
|
|
imported_registration("worker/exec"),
|
|
imported_registration("worker/status"),
|
|
imported_registration("fs/readFile"),
|
|
]);
|
|
|
|
assert!(env.contains("worker/exec"));
|
|
assert!(env.contains("worker/status"));
|
|
assert!(env.contains("fs/readFile"));
|
|
assert!(!env.contains("worker/missing"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn overlay_env_dispatches_to_imported_op() {
|
|
let conn = CallConnection::new(stub_connection());
|
|
conn.register_imported(imported_registration("worker/exec"));
|
|
let env = conn.overlay_env();
|
|
|
|
let scoped = ScopedPeerEnv::new(["worker/exec"]);
|
|
let ctx = root_context("root-1", scoped, env.clone());
|
|
|
|
let response = env
|
|
.invoke("worker", "exec", serde_json::json!({"hi": 1}), &ctx)
|
|
.await;
|
|
|
|
assert!(response.result.is_ok());
|
|
assert_eq!(response.result.unwrap(), serde_json::json!({"hi": 1}));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn overlay_env_contains_returns_false_for_non_imported_op() {
|
|
let conn = CallConnection::new(stub_connection());
|
|
conn.register_imported(imported_registration("worker/exec"));
|
|
let env = conn.overlay_env();
|
|
|
|
assert!(!env.contains("worker/missing"));
|
|
|
|
let scoped = ScopedPeerEnv::new(["worker/missing"]);
|
|
let ctx = root_context("root-2", scoped, env.clone());
|
|
|
|
let response = env
|
|
.invoke("worker", "missing", serde_json::json!({}), &ctx)
|
|
.await;
|
|
|
|
match response.result {
|
|
Err(e) => assert_eq!(e.code, "NOT_FOUND"),
|
|
other => panic!("expected NOT_FOUND, got {other:?}"),
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn overlay_env_reachability_check_returns_not_found_for_disallowed_op() {
|
|
let conn = CallConnection::new(stub_connection());
|
|
conn.register_imported(imported_registration("worker/exec"));
|
|
let env = conn.overlay_env();
|
|
|
|
let scoped = ScopedPeerEnv::empty();
|
|
let ctx = root_context("root-3", scoped, env.clone());
|
|
|
|
let response = env
|
|
.invoke("worker", "exec", serde_json::json!({}), &ctx)
|
|
.await;
|
|
|
|
match response.result {
|
|
Err(e) => assert_eq!(e.code, "NOT_FOUND"),
|
|
other => panic!("expected NOT_FOUND, got {other:?}"),
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn overlay_env_dispatched_child_has_internal_true_and_parent_set() {
|
|
let conn = CallConnection::new(stub_connection());
|
|
let inspect_handler = make_handler(|_input, context| async move {
|
|
let internal = context.is_internal();
|
|
let parent_set = context.parent_request_id.is_some();
|
|
ResponseEnvelope::ok(
|
|
context.request_id,
|
|
serde_json::json!({
|
|
"internal": internal,
|
|
"parent_set": parent_set,
|
|
}),
|
|
)
|
|
});
|
|
conn.register_imported(HandlerRegistration::new(
|
|
external_spec("worker/exec"),
|
|
HandlerKind::Once(inspect_handler),
|
|
OperationProvenance::FromCall,
|
|
None,
|
|
None,
|
|
Capabilities::new(),
|
|
));
|
|
let env = conn.overlay_env();
|
|
|
|
let scoped = ScopedPeerEnv::new(["worker/exec"]);
|
|
let ctx = root_context("root-4", scoped, env.clone());
|
|
|
|
let response = env
|
|
.invoke("worker", "exec", serde_json::json!({}), &ctx)
|
|
.await;
|
|
let out = response.result.expect("ok");
|
|
assert_eq!(out["internal"], Value::Bool(true));
|
|
assert_eq!(out["parent_set"], Value::Bool(true));
|
|
}
|
|
|
|
#[test]
|
|
fn connection_accessor_returns_underlying_connection() {
|
|
let conn = CallConnection::new(stub_connection());
|
|
assert_eq!(
|
|
conn.connection()
|
|
.expect("quic connection present")
|
|
.remote_alpn(),
|
|
b"alknet/call"
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn empty_overlay_contains_nothing() {
|
|
let conn = CallConnection::new(stub_connection());
|
|
let env = conn.overlay_env();
|
|
assert!(!env.contains("anything"));
|
|
assert!(!env.contains(""));
|
|
}
|
|
|
|
#[test]
|
|
fn overlay_drops_with_connection() {
|
|
let captured: Arc<RwLock<HashMap<String, HandlerRegistration>>> =
|
|
Arc::new(RwLock::new(HashMap::new()));
|
|
{
|
|
let conn = CallConnection::new(stub_connection());
|
|
conn.register_imported(imported_registration("worker/exec"));
|
|
assert!(conn.overlay_env().contains("worker/exec"));
|
|
std::mem::swap(
|
|
&mut *captured.write(),
|
|
&mut *conn.imported_operations.write(),
|
|
);
|
|
}
|
|
assert!(captured.read().contains_key("worker/exec"));
|
|
}
|
|
|
|
// --- dispatch_envelope -------------------------------------------------
|
|
|
|
fn empty_pending() -> Arc<Mutex<PendingRequestMap>> {
|
|
Arc::new(Mutex::new(PendingRequestMap::new()))
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn dispatch_envelope_responded_resolves_call_receiver() {
|
|
let pending = empty_pending();
|
|
let rx = pending.lock().register_call(
|
|
"req-1".to_string(),
|
|
Instant::now() + Duration::from_secs(30),
|
|
None,
|
|
);
|
|
let envelope = EventEnvelope::responded("req-1", serde_json::json!({"v": 42}));
|
|
dispatch_envelope(&pending, envelope);
|
|
assert!(!pending.lock().contains("req-1"));
|
|
let result = tokio::time::timeout(Duration::from_millis(100), rx).await;
|
|
match result {
|
|
Ok(Ok(Ok(value))) => assert_eq!(value, serde_json::json!({"v": 42})),
|
|
other => panic!("expected Ok({{v:42}}), got {other:?}"),
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn dispatch_envelope_responded_pushes_to_subscribe_channel() {
|
|
let pending = empty_pending();
|
|
let mut rx = pending
|
|
.lock()
|
|
.register_subscribe("sub-1".to_string(), None, None);
|
|
dispatch_envelope(
|
|
&pending,
|
|
EventEnvelope::responded("sub-1", serde_json::json!("first")),
|
|
);
|
|
dispatch_envelope(
|
|
&pending,
|
|
EventEnvelope::responded("sub-1", serde_json::json!("second")),
|
|
);
|
|
assert!(pending.lock().contains("sub-1"));
|
|
let a = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await;
|
|
let b = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await;
|
|
match (a, b) {
|
|
(Ok(Some(Ok(x))), Ok(Some(Ok(y)))) => {
|
|
assert_eq!(x, serde_json::json!("first"));
|
|
assert_eq!(y, serde_json::json!("second"));
|
|
}
|
|
other => panic!("expected two Ok values, got {other:?}"),
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn dispatch_envelope_completed_removes_entry() {
|
|
let pending = empty_pending();
|
|
let _rx = pending
|
|
.lock()
|
|
.register_subscribe("sub-2".to_string(), None, None);
|
|
assert!(pending.lock().contains("sub-2"));
|
|
dispatch_envelope(&pending, EventEnvelope::completed("sub-2"));
|
|
assert!(!pending.lock().contains("sub-2"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn dispatch_envelope_aborted_removes_entry() {
|
|
let pending = empty_pending();
|
|
let _rx = pending.lock().register_call(
|
|
"req-2".to_string(),
|
|
Instant::now() + Duration::from_secs(30),
|
|
None,
|
|
);
|
|
assert!(pending.lock().contains("req-2"));
|
|
dispatch_envelope(&pending, EventEnvelope::aborted("req-2"));
|
|
assert!(!pending.lock().contains("req-2"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn dispatch_envelope_error_resolves_call_with_error() {
|
|
let pending = empty_pending();
|
|
let rx = pending.lock().register_call(
|
|
"req-3".to_string(),
|
|
Instant::now() + Duration::from_secs(30),
|
|
None,
|
|
);
|
|
let err = CallError::new("FILE_NOT_FOUND", "missing", false);
|
|
dispatch_envelope(&pending, EventEnvelope::error("req-3", &err));
|
|
assert!(!pending.lock().contains("req-3"));
|
|
let result = tokio::time::timeout(Duration::from_millis(100), rx).await;
|
|
match result {
|
|
Ok(Ok(Err(e))) => {
|
|
assert_eq!(e.code, "FILE_NOT_FOUND");
|
|
assert!(!e.retryable);
|
|
}
|
|
other => panic!("expected Err(FILE_NOT_FOUND), got {other:?}"),
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn dispatch_envelope_error_pushes_error_to_subscribe_channel() {
|
|
let pending = empty_pending();
|
|
let mut rx = pending
|
|
.lock()
|
|
.register_subscribe("sub-3".to_string(), None, None);
|
|
let err = CallError::new("RATE_LIMITED", "slow down", true);
|
|
dispatch_envelope(&pending, EventEnvelope::error("sub-3", &err));
|
|
assert!(!pending.lock().contains("sub-3"));
|
|
let result = tokio::time::timeout(Duration::from_millis(100), rx.recv()).await;
|
|
match result {
|
|
Ok(Some(Err(e))) => {
|
|
assert_eq!(e.code, "RATE_LIMITED");
|
|
assert!(e.retryable);
|
|
}
|
|
other => panic!("expected Err(RATE_LIMITED), got {other:?}"),
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn dispatch_envelope_error_with_invalid_payload_is_no_op() {
|
|
let pending = empty_pending();
|
|
let _rx = pending.lock().register_call(
|
|
"req-4".to_string(),
|
|
Instant::now() + Duration::from_secs(30),
|
|
None,
|
|
);
|
|
let malformed =
|
|
EventEnvelope::new(EVENT_ERROR, "req-4", serde_json::json!("not-an-object"));
|
|
dispatch_envelope(&pending, malformed);
|
|
assert!(pending.lock().contains("req-4"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn dispatch_envelope_unknown_event_type_is_no_op() {
|
|
let pending = empty_pending();
|
|
let _rx = pending.lock().register_call(
|
|
"req-5".to_string(),
|
|
Instant::now() + Duration::from_secs(30),
|
|
None,
|
|
);
|
|
let unknown = EventEnvelope::new("call.mystery", "req-5", serde_json::json!({}));
|
|
dispatch_envelope(&pending, unknown);
|
|
assert!(pending.lock().contains("req-5"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn dispatch_envelope_unknown_request_id_is_no_op() {
|
|
let pending = empty_pending();
|
|
dispatch_envelope(
|
|
&pending,
|
|
EventEnvelope::responded("ghost", serde_json::json!(1)),
|
|
);
|
|
dispatch_envelope(&pending, EventEnvelope::completed("ghost"));
|
|
dispatch_envelope(&pending, EventEnvelope::aborted("ghost"));
|
|
assert!(pending.lock().is_empty());
|
|
}
|
|
|
|
// --- SubscriptionStream ------------------------------------------------
|
|
|
|
#[tokio::test]
|
|
async fn subscription_stream_closed_yields_one_error_then_ends() {
|
|
use futures::stream::StreamExt;
|
|
let err = CallError::internal("stream closed before send");
|
|
let mut stream = SubscriptionStream::closed("req-x".to_string(), err);
|
|
let first = stream.next().await;
|
|
match first {
|
|
Some(env) => {
|
|
assert_eq!(env.request_id, "req-x");
|
|
assert!(env.result.is_err());
|
|
assert_eq!(env.result.unwrap_err().code, "INTERNAL");
|
|
}
|
|
other => panic!("expected one error envelope, got {other:?}"),
|
|
}
|
|
let second = stream.next().await;
|
|
assert!(second.is_none(), "stream must terminate after the error");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn subscription_stream_emits_ok_values_then_completes() {
|
|
use futures::stream::StreamExt;
|
|
let (tx, rx) = mpsc::channel(8);
|
|
let mut stream = SubscriptionStream::new("req-y".to_string(), rx);
|
|
tx.try_send(Ok(serde_json::json!(1))).unwrap();
|
|
tx.try_send(Ok(serde_json::json!(2))).unwrap();
|
|
drop(tx);
|
|
|
|
let a = stream.next().await.unwrap();
|
|
assert_eq!(a.request_id, "req-y");
|
|
assert_eq!(a.result.unwrap(), serde_json::json!(1));
|
|
let b = stream.next().await.unwrap();
|
|
assert_eq!(b.result.unwrap(), serde_json::json!(2));
|
|
assert!(
|
|
stream.next().await.is_none(),
|
|
"stream ends after channel closes"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn subscription_stream_emits_error_then_terminates() {
|
|
use futures::stream::StreamExt;
|
|
let (tx, rx) = mpsc::channel(8);
|
|
let mut stream = SubscriptionStream::new("req-z".to_string(), rx);
|
|
tx.try_send(Ok(serde_json::json!("ok"))).unwrap();
|
|
tx.try_send(Err(CallError::timeout("timed out"))).unwrap();
|
|
drop(tx);
|
|
|
|
let first = stream.next().await.unwrap();
|
|
assert_eq!(first.result.unwrap(), serde_json::json!("ok"));
|
|
let second = stream.next().await.unwrap();
|
|
assert_eq!(second.request_id, "req-z");
|
|
assert_eq!(second.result.unwrap_err().code, "TIMEOUT");
|
|
assert!(
|
|
stream.next().await.is_none(),
|
|
"stream terminates after error"
|
|
);
|
|
}
|
|
|
|
// --- non-QUIC (overlay-only) CallConnection ---------------------------
|
|
|
|
fn sample_identity(id: &str) -> Identity {
|
|
Identity {
|
|
id: id.to_string(),
|
|
scopes: vec!["fs:read".to_string()],
|
|
resources: HashMap::new(),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn overlay_only_constructor_has_no_quic_connection() {
|
|
let conn = CallConnection::new_overlay_only(sample_identity("ws-peer"));
|
|
assert!(conn.connection().is_none(), "no QUIC connection stored");
|
|
}
|
|
|
|
#[test]
|
|
fn overlay_only_identity_returns_stored_identity() {
|
|
let conn = CallConnection::new_overlay_only(sample_identity("ws-peer"));
|
|
let identity = conn.identity().expect("identity stored");
|
|
assert_eq!(identity.id, "ws-peer");
|
|
assert_eq!(identity.scopes, vec!["fs:read".to_string()]);
|
|
}
|
|
|
|
#[test]
|
|
fn overlay_only_holds_independent_pending_map() {
|
|
let conn = CallConnection::new_overlay_only(sample_identity("ws-peer"));
|
|
let pending = Arc::clone(conn.pending());
|
|
assert!(pending.lock().is_empty());
|
|
let _rx = pending.lock().register_call(
|
|
"req-overlay-1".to_string(),
|
|
Instant::now() + Duration::from_secs(30),
|
|
None,
|
|
);
|
|
assert!(pending.lock().contains("req-overlay-1"));
|
|
}
|
|
|
|
#[test]
|
|
fn overlay_only_register_imported_populates_overlay() {
|
|
let conn = CallConnection::new_overlay_only(sample_identity("ws-peer"));
|
|
let env = conn.overlay_env();
|
|
assert!(!env.contains("worker/exec"));
|
|
conn.register_imported(imported_registration("worker/exec"));
|
|
assert!(env.contains("worker/exec"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn overlay_only_overlay_env_dispatches_imported_op() {
|
|
let conn = CallConnection::new_overlay_only(sample_identity("ws-peer"));
|
|
conn.register_imported(imported_registration("worker/exec"));
|
|
let env = conn.overlay_env();
|
|
|
|
let scoped = ScopedPeerEnv::new(["worker/exec"]);
|
|
let ctx = root_context("overlay-root-1", scoped, env.clone());
|
|
|
|
let response = env
|
|
.invoke("worker", "exec", serde_json::json!({"v": 1}), &ctx)
|
|
.await;
|
|
assert!(response.result.is_ok());
|
|
assert_eq!(response.result.unwrap(), serde_json::json!({"v": 1}));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn overlay_only_call_without_connection_returns_error() {
|
|
let conn = CallConnection::new_overlay_only(sample_identity("ws-peer"));
|
|
let response = conn.call("fs/readFile", serde_json::json!({})).await;
|
|
let err = response.result.expect_err("no connection → error");
|
|
assert_eq!(err.code, "INTERNAL");
|
|
}
|
|
|
|
#[test]
|
|
fn quic_path_identity_returns_connection_identity() {
|
|
let conn = CallConnection::new(stub_connection());
|
|
conn.connection()
|
|
.expect("quic connection present")
|
|
.set_identity(sample_identity("quic-peer"))
|
|
.expect("identity not yet set");
|
|
let identity = conn.identity().expect("identity from connection");
|
|
assert_eq!(identity.id, "quic-peer");
|
|
}
|
|
|
|
#[test]
|
|
fn quic_path_stored_identity_is_none_when_connection_present() {
|
|
let conn = CallConnection::new(stub_connection());
|
|
assert!(conn.connection().is_some(), "QUIC connection present");
|
|
assert!(conn.identity().is_none(), "no identity set yet");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn overlay_env_invoke_on_stream_kind_returns_invalid_operation_type() {
|
|
use crate::registry::registration::make_streaming_handler;
|
|
let conn = CallConnection::new(stub_connection());
|
|
let streaming_handler = make_streaming_handler(|input, ctx| {
|
|
futures::stream::iter(vec![ResponseEnvelope::ok(ctx.request_id, input)])
|
|
});
|
|
conn.register_imported(HandlerRegistration::new(
|
|
OperationSpec::new(
|
|
"events/stream",
|
|
OperationType::Subscription,
|
|
Visibility::External,
|
|
serde_json::json!({}),
|
|
serde_json::json!({}),
|
|
vec![],
|
|
AccessControl::default(),
|
|
),
|
|
HandlerKind::Stream(streaming_handler),
|
|
OperationProvenance::FromCall,
|
|
None,
|
|
None,
|
|
Capabilities::new(),
|
|
));
|
|
let env = conn.overlay_env();
|
|
let scoped = ScopedPeerEnv::new(["events/stream"]);
|
|
let ctx = root_context("root-stream", scoped, env.clone());
|
|
let response = env
|
|
.invoke("events", "stream", serde_json::json!({}), &ctx)
|
|
.await;
|
|
match response.result {
|
|
Err(e) => assert_eq!(e.code, "INVALID_OPERATION_TYPE"),
|
|
other => panic!("expected INVALID_OPERATION_TYPE, got {other:?}"),
|
|
}
|
|
}
|
|
}
|