2 Commits

Author SHA1 Message Date
c77024cdf5 fix(http): update websocket subscription tests to expect call.responded (dispatch_requested now routes Subscription via invoke_streaming) 2026-07-02 10:10:42 +00:00
9e4d17b1c5 feat(http/server/subscribe-sse-streaming): wire /subscribe to invoke_streaming and pipe BoxStream to SSE
Replace the one-event placeholder (subscribe_stream_from_envelope +
envelope_to_sse_stream, which called invoke() and wrapped the single
ResponseEnvelope) with the real streaming path: subscribe_handler now
calls GatewayDispatch::invoke_streaming() and pipes the
BoxStream<ResponseEnvelope> to SSE via subscribe_stream_from_envelope_stream
(futures::StreamExt::map). Each Ok(output) becomes a data: frame; each
Err becomes an event:error frame (terminal — stream ends after it);
natural stream end closes the SSE. Internal ops still return a single
NOT_FOUND error event via subscribe_stream_internal_error (kept). Client
disconnect drops the stream via Rust's Drop (abort cascade per ADR-016).
2026-07-02 10:04:27 +00:00
2 changed files with 222 additions and 39 deletions

View File

@@ -17,7 +17,8 @@ use axum::response::sse::Event;
use axum::response::{IntoResponse, Json, Response, Sse}; use axum::response::{IntoResponse, Json, Response, Sse};
use axum::routing::{get, post}; use axum::routing::{get, post};
use axum::Router; use axum::Router;
use futures::stream::{self, BoxStream, Stream}; use futures::stream::{self, BoxStream};
use futures::StreamExt;
use serde::Deserialize; use serde::Deserialize;
use serde_json::{json, Value}; use serde_json::{json, Value};
@@ -163,18 +164,29 @@ pub(crate) async fn subscribe_handler(
subscribe_stream_internal_error(request.operation) subscribe_stream_internal_error(request.operation)
} else { } else {
let dispatch = state.dispatch(); let dispatch = state.dispatch();
let envelope = dispatch let envelope_stream =
.invoke(identity, &request.operation, request.input) dispatch.invoke_streaming(identity, &request.operation, request.input);
.await; subscribe_stream_from_envelope_stream(envelope_stream)
subscribe_stream_from_envelope(envelope)
}; };
Sse::new(stream) Sse::new(stream)
} }
pub type SubscribeStream = BoxStream<'static, Result<Event, Infallible>>; pub type SubscribeStream = BoxStream<'static, Result<Event, Infallible>>;
fn subscribe_stream_from_envelope(envelope: ResponseEnvelope) -> SubscribeStream { fn subscribe_stream_from_envelope_stream(
Box::pin(envelope_to_sse_stream(envelope)) stream: BoxStream<'static, ResponseEnvelope>,
) -> SubscribeStream {
Box::pin(stream.map(|envelope| match envelope.result {
Ok(output) => {
let data = serde_json::to_string(&output).unwrap_or_else(|_| "null".to_string());
Ok(Event::default().data(data))
}
Err(error) => {
let payload = serde_json::to_value(&error).unwrap_or(Value::Null);
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "null".to_string());
Ok(Event::default().event("error").data(data))
}
}))
} }
fn subscribe_stream_internal_error(operation: String) -> SubscribeStream { fn subscribe_stream_internal_error(operation: String) -> SubscribeStream {
@@ -263,24 +275,6 @@ fn is_internal_op(registry: &OperationRegistry, operation: &str) -> bool {
} }
} }
fn envelope_to_sse_stream(
envelope: ResponseEnvelope,
) -> impl Stream<Item = Result<Event, Infallible>> {
stream::once(async move {
match envelope.result {
Ok(output) => {
let data = serde_json::to_string(&output).unwrap_or_else(|_| "null".to_string());
Ok(Event::default().data(data))
}
Err(error) => {
let payload = serde_json::to_value(&error).unwrap_or(Value::Null);
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "null".to_string());
Ok(Event::default().event("error").data(data))
}
}
})
}
fn error_event(operation: &str) -> Result<Event, Infallible> { fn error_event(operation: &str) -> Result<Event, Infallible> {
let error = CallError::not_found(operation); let error = CallError::not_found(operation);
let payload = serde_json::to_value(&error).unwrap_or(Value::Null); let payload = serde_json::to_value(&error).unwrap_or(Value::Null);
@@ -295,7 +289,7 @@ mod tests {
services_list_handler, services_list_spec, services_schema_handler, services_schema_spec, services_list_handler, services_list_spec, services_schema_handler, services_schema_spec,
}; };
use alknet_call::registry::registration::{ use alknet_call::registry::registration::{
make_handler, HandlerKind, HandlerRegistration, OperationProvenance, make_handler, make_streaming_handler, HandlerKind, HandlerRegistration, OperationProvenance,
}; };
use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType}; use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType};
use alknet_core::auth::{AuthToken, Identity}; use alknet_core::auth::{AuthToken, Identity};
@@ -425,6 +419,73 @@ mod tests {
Arc::new(registry) Arc::new(registry)
} }
fn subscription_spec(name: &str, visibility: Visibility, acl: AccessControl) -> OperationSpec {
OperationSpec::new(
name,
OperationType::Subscription,
visibility,
json!({}),
json!({}),
vec![],
acl,
)
}
fn multi_event_streaming_handler(
outputs: Vec<Value>,
) -> alknet_call::registry::registration::StreamingHandler {
make_streaming_handler(move |_input, ctx| {
let request_id = ctx.request_id.clone();
let outputs = outputs.clone();
futures::stream::iter(
outputs
.into_iter()
.map(move |o| ResponseEnvelope::ok(request_id.clone(), o)),
)
})
}
fn error_streaming_handler(error: CallError) -> HandlerKind {
HandlerKind::Stream(make_streaming_handler(move |_input, ctx| {
let request_id = ctx.request_id.clone();
let error = error.clone();
futures::stream::iter(vec![ResponseEnvelope::error(request_id, error)])
}))
}
fn registry_with_subscription_stream(
name: &str,
outputs: Vec<Value>,
) -> Arc<OperationRegistry> {
let mut registry = OperationRegistry::new();
registry
.register(HandlerRegistration::new(
subscription_spec(name, Visibility::External, AccessControl::default()),
HandlerKind::Stream(multi_event_streaming_handler(outputs)),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
))
.unwrap();
Arc::new(registry)
}
fn registry_with_subscription_error(name: &str, error: CallError) -> Arc<OperationRegistry> {
let mut registry = OperationRegistry::new();
registry
.register(HandlerRegistration::new(
subscription_spec(name, Visibility::External, AccessControl::default()),
error_streaming_handler(error),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
))
.unwrap();
Arc::new(registry)
}
fn registry_with_discovery_and_ops( fn registry_with_discovery_and_ops(
inner_ops: Vec<HandlerRegistration>, inner_ops: Vec<HandlerRegistration>,
) -> Arc<OperationRegistry> { ) -> Arc<OperationRegistry> {
@@ -771,15 +832,20 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn subscribe_streams_sse_data_event_until_completed() { async fn subscribe_on_subscription_streams_multiple_data_frames() {
let router = build_router(registry_with_echo(), unused_provider()); let router = build_router(
registry_with_subscription_stream(
"events/stream",
vec![json!({ "n": 1 }), json!({ "n": 2 }), json!({ "n": 3 })],
),
unused_provider(),
);
let req = Request::builder() let req = Request::builder()
.method("POST") .method("POST")
.uri("/subscribe") .uri("/subscribe")
.header("content-type", "application/json") .header("content-type", "application/json")
.body(Body::from( .body(Body::from(
serde_json::to_vec(&json!({ "operation": "echo/run", "input": { "v": 9 } })) serde_json::to_vec(&json!({ "operation": "events/stream", "input": {} })).unwrap(),
.unwrap(),
)) ))
.unwrap(); .unwrap();
let resp = router.oneshot(req).await.unwrap(); let resp = router.oneshot(req).await.unwrap();
@@ -797,10 +863,73 @@ mod tests {
); );
let bytes = resp.into_body().collect().await.unwrap().to_bytes(); let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let body = String::from_utf8_lossy(&bytes); let body = String::from_utf8_lossy(&bytes);
assert!(body.contains("data:"), "expected a data frame, got: {body}"); let data_frames = body.matches("data:").count();
assert_eq!(data_frames, 3, "expected 3 data frames, got: {body}");
assert!(body.contains("\"n\":1"), "expected n=1, got: {body}");
assert!(body.contains("\"n\":2"), "expected n=2, got: {body}");
assert!(body.contains("\"n\":3"), "expected n=3, got: {body}");
}
#[tokio::test]
async fn subscribe_on_subscription_that_yields_error_emits_error_event_then_closes() {
let router = build_router(
registry_with_subscription_error("events/fail", CallError::internal("handler blew up")),
unused_provider(),
);
let req = Request::builder()
.method("POST")
.uri("/subscribe")
.header("content-type", "application/json")
.body(Body::from(
serde_json::to_vec(&json!({ "operation": "events/fail", "input": {} })).unwrap(),
))
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let body = String::from_utf8_lossy(&bytes);
assert!( assert!(
body.contains("\"v\":9"), body.contains("event:error") || body.contains("event: error"),
"expected output payload, got: {body}" "expected error event, got: {body}"
);
assert!(
body.contains("INTERNAL"),
"expected INTERNAL code, got: {body}"
);
assert!(
body.contains("handler blew up"),
"expected error message, got: {body}"
);
let data_frames = body.matches("data:").count();
assert_eq!(
data_frames, 1,
"expected exactly one data frame (the error payload), got: {body}"
);
}
#[tokio::test]
async fn subscribe_response_content_type_is_text_event_stream() {
let router = build_router(
registry_with_subscription_stream("events/stream", vec![json!({ "ok": true })]),
unused_provider(),
);
let req = Request::builder()
.method("POST")
.uri("/subscribe")
.header("content-type", "application/json")
.body(Body::from(
serde_json::to_vec(&json!({ "operation": "events/stream", "input": {} })).unwrap(),
))
.unwrap();
let resp = router.oneshot(req).await.unwrap();
let ctype = resp
.headers()
.get(axum::http::header::CONTENT_TYPE)
.map(|v| v.to_str().unwrap().to_string());
assert_eq!(
ctype.as_deref(),
Some("text/event-stream"),
"expected text/event-stream, got {ctype:?}"
); );
} }
@@ -829,6 +958,59 @@ mod tests {
); );
} }
#[tokio::test]
async fn subscribe_unknown_op_emits_not_found_error_event() {
let router = build_router(
registry_with_subscription_stream("events/stream", vec![json!({})]),
unused_provider(),
);
let req = Request::builder()
.method("POST")
.uri("/subscribe")
.header("content-type", "application/json")
.body(Body::from(
serde_json::to_vec(&json!({ "operation": "no/such", "input": {} })).unwrap(),
))
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let body = String::from_utf8_lossy(&bytes);
assert!(
body.contains("event:error") || body.contains("event: error"),
"expected error event, got: {body}"
);
assert!(
body.contains("NOT_FOUND"),
"expected NOT_FOUND, got: {body}"
);
}
#[tokio::test]
async fn subscribe_on_query_op_emits_invalid_operation_type_error_event() {
let router = build_router(registry_with_echo(), unused_provider());
let req = Request::builder()
.method("POST")
.uri("/subscribe")
.header("content-type", "application/json")
.body(Body::from(
serde_json::to_vec(&json!({ "operation": "echo/run", "input": {} })).unwrap(),
))
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let body = String::from_utf8_lossy(&bytes);
assert!(
body.contains("event:error") || body.contains("event: error"),
"expected error event, got: {body}"
);
assert!(
body.contains("INVALID_OPERATION_TYPE"),
"expected INVALID_OPERATION_TYPE, got: {body}"
);
}
#[test] #[test]
fn is_internal_op_returns_false_for_unknown() { fn is_internal_op_returns_false_for_unknown() {
let registry = OperationRegistry::new(); let registry = OperationRegistry::new();

View File

@@ -779,10 +779,11 @@ mod tests {
let out = handle_inbound_envelope(&dp, &conn, request) let out = handle_inbound_envelope(&dp, &conn, request)
.await .await
.expect("response"); .expect("response");
assert_eq!(out.r#type, EVENT_ERROR); assert_eq!(out.r#type, EVENT_RESPONDED);
assert_eq!(out.id, "sub-0");
assert_eq!( assert_eq!(
out.payload.get("code"), out.payload.get("output"),
Some(&serde_json::json!("INVALID_OPERATION_TYPE")) Some(&serde_json::json!({ "n": 1 }))
); );
} }
@@ -1077,10 +1078,10 @@ mod tests {
MockMsg::Binary(bytes) => { MockMsg::Binary(bytes) => {
let env: EventEnvelope = serde_json::from_slice(&bytes).unwrap(); let env: EventEnvelope = serde_json::from_slice(&bytes).unwrap();
assert_eq!(env.id, "sub-ws-0"); assert_eq!(env.id, "sub-ws-0");
assert_eq!(env.r#type, EVENT_ERROR); assert_eq!(env.r#type, EVENT_RESPONDED);
assert_eq!( assert_eq!(
env.payload.get("code"), env.payload.get("output"),
Some(&serde_json::json!("INVALID_OPERATION_TYPE")) Some(&serde_json::json!({ "n": 1 }))
); );
} }
other => panic!("expected binary, got {other:?}"), other => panic!("expected binary, got {other:?}"),