feat(http/server/subscribe-sse-streaming): wire /subscribe to invoke_streaming and pipe BoxStream to SSE
Replace the one-event placeholder (subscribe_stream_from_envelope + envelope_to_sse_stream, which called invoke() and wrapped the single ResponseEnvelope) with the real streaming path: subscribe_handler now calls GatewayDispatch::invoke_streaming() and pipes the BoxStream<ResponseEnvelope> to SSE via subscribe_stream_from_envelope_stream (futures::StreamExt::map). Each Ok(output) becomes a data: frame; each Err becomes an event:error frame (terminal — stream ends after it); natural stream end closes the SSE. Internal ops still return a single NOT_FOUND error event via subscribe_stream_internal_error (kept). Client disconnect drops the stream via Rust's Drop (abort cascade per ADR-016).
This commit is contained in:
@@ -17,7 +17,8 @@ use axum::response::sse::Event;
|
|||||||
use axum::response::{IntoResponse, Json, Response, Sse};
|
use axum::response::{IntoResponse, Json, Response, Sse};
|
||||||
use axum::routing::{get, post};
|
use axum::routing::{get, post};
|
||||||
use axum::Router;
|
use axum::Router;
|
||||||
use futures::stream::{self, BoxStream, Stream};
|
use futures::stream::{self, BoxStream};
|
||||||
|
use futures::StreamExt;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde_json::{json, Value};
|
use serde_json::{json, Value};
|
||||||
|
|
||||||
@@ -163,18 +164,29 @@ pub(crate) async fn subscribe_handler(
|
|||||||
subscribe_stream_internal_error(request.operation)
|
subscribe_stream_internal_error(request.operation)
|
||||||
} else {
|
} else {
|
||||||
let dispatch = state.dispatch();
|
let dispatch = state.dispatch();
|
||||||
let envelope = dispatch
|
let envelope_stream =
|
||||||
.invoke(identity, &request.operation, request.input)
|
dispatch.invoke_streaming(identity, &request.operation, request.input);
|
||||||
.await;
|
subscribe_stream_from_envelope_stream(envelope_stream)
|
||||||
subscribe_stream_from_envelope(envelope)
|
|
||||||
};
|
};
|
||||||
Sse::new(stream)
|
Sse::new(stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type SubscribeStream = BoxStream<'static, Result<Event, Infallible>>;
|
pub type SubscribeStream = BoxStream<'static, Result<Event, Infallible>>;
|
||||||
|
|
||||||
fn subscribe_stream_from_envelope(envelope: ResponseEnvelope) -> SubscribeStream {
|
fn subscribe_stream_from_envelope_stream(
|
||||||
Box::pin(envelope_to_sse_stream(envelope))
|
stream: BoxStream<'static, ResponseEnvelope>,
|
||||||
|
) -> SubscribeStream {
|
||||||
|
Box::pin(stream.map(|envelope| match envelope.result {
|
||||||
|
Ok(output) => {
|
||||||
|
let data = serde_json::to_string(&output).unwrap_or_else(|_| "null".to_string());
|
||||||
|
Ok(Event::default().data(data))
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
let payload = serde_json::to_value(&error).unwrap_or(Value::Null);
|
||||||
|
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "null".to_string());
|
||||||
|
Ok(Event::default().event("error").data(data))
|
||||||
|
}
|
||||||
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn subscribe_stream_internal_error(operation: String) -> SubscribeStream {
|
fn subscribe_stream_internal_error(operation: String) -> SubscribeStream {
|
||||||
@@ -263,24 +275,6 @@ fn is_internal_op(registry: &OperationRegistry, operation: &str) -> bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn envelope_to_sse_stream(
|
|
||||||
envelope: ResponseEnvelope,
|
|
||||||
) -> impl Stream<Item = Result<Event, Infallible>> {
|
|
||||||
stream::once(async move {
|
|
||||||
match envelope.result {
|
|
||||||
Ok(output) => {
|
|
||||||
let data = serde_json::to_string(&output).unwrap_or_else(|_| "null".to_string());
|
|
||||||
Ok(Event::default().data(data))
|
|
||||||
}
|
|
||||||
Err(error) => {
|
|
||||||
let payload = serde_json::to_value(&error).unwrap_or(Value::Null);
|
|
||||||
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "null".to_string());
|
|
||||||
Ok(Event::default().event("error").data(data))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn error_event(operation: &str) -> Result<Event, Infallible> {
|
fn error_event(operation: &str) -> Result<Event, Infallible> {
|
||||||
let error = CallError::not_found(operation);
|
let error = CallError::not_found(operation);
|
||||||
let payload = serde_json::to_value(&error).unwrap_or(Value::Null);
|
let payload = serde_json::to_value(&error).unwrap_or(Value::Null);
|
||||||
@@ -295,7 +289,7 @@ mod tests {
|
|||||||
services_list_handler, services_list_spec, services_schema_handler, services_schema_spec,
|
services_list_handler, services_list_spec, services_schema_handler, services_schema_spec,
|
||||||
};
|
};
|
||||||
use alknet_call::registry::registration::{
|
use alknet_call::registry::registration::{
|
||||||
make_handler, HandlerKind, HandlerRegistration, OperationProvenance,
|
make_handler, make_streaming_handler, HandlerKind, HandlerRegistration, OperationProvenance,
|
||||||
};
|
};
|
||||||
use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType};
|
use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType};
|
||||||
use alknet_core::auth::{AuthToken, Identity};
|
use alknet_core::auth::{AuthToken, Identity};
|
||||||
@@ -425,6 +419,73 @@ mod tests {
|
|||||||
Arc::new(registry)
|
Arc::new(registry)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn subscription_spec(name: &str, visibility: Visibility, acl: AccessControl) -> OperationSpec {
|
||||||
|
OperationSpec::new(
|
||||||
|
name,
|
||||||
|
OperationType::Subscription,
|
||||||
|
visibility,
|
||||||
|
json!({}),
|
||||||
|
json!({}),
|
||||||
|
vec![],
|
||||||
|
acl,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn multi_event_streaming_handler(
|
||||||
|
outputs: Vec<Value>,
|
||||||
|
) -> alknet_call::registry::registration::StreamingHandler {
|
||||||
|
make_streaming_handler(move |_input, ctx| {
|
||||||
|
let request_id = ctx.request_id.clone();
|
||||||
|
let outputs = outputs.clone();
|
||||||
|
futures::stream::iter(
|
||||||
|
outputs
|
||||||
|
.into_iter()
|
||||||
|
.map(move |o| ResponseEnvelope::ok(request_id.clone(), o)),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn error_streaming_handler(error: CallError) -> HandlerKind {
|
||||||
|
HandlerKind::Stream(make_streaming_handler(move |_input, ctx| {
|
||||||
|
let request_id = ctx.request_id.clone();
|
||||||
|
let error = error.clone();
|
||||||
|
futures::stream::iter(vec![ResponseEnvelope::error(request_id, error)])
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn registry_with_subscription_stream(
|
||||||
|
name: &str,
|
||||||
|
outputs: Vec<Value>,
|
||||||
|
) -> Arc<OperationRegistry> {
|
||||||
|
let mut registry = OperationRegistry::new();
|
||||||
|
registry
|
||||||
|
.register(HandlerRegistration::new(
|
||||||
|
subscription_spec(name, Visibility::External, AccessControl::default()),
|
||||||
|
HandlerKind::Stream(multi_event_streaming_handler(outputs)),
|
||||||
|
OperationProvenance::Local,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
Capabilities::new(),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
Arc::new(registry)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn registry_with_subscription_error(name: &str, error: CallError) -> Arc<OperationRegistry> {
|
||||||
|
let mut registry = OperationRegistry::new();
|
||||||
|
registry
|
||||||
|
.register(HandlerRegistration::new(
|
||||||
|
subscription_spec(name, Visibility::External, AccessControl::default()),
|
||||||
|
error_streaming_handler(error),
|
||||||
|
OperationProvenance::Local,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
Capabilities::new(),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
Arc::new(registry)
|
||||||
|
}
|
||||||
|
|
||||||
fn registry_with_discovery_and_ops(
|
fn registry_with_discovery_and_ops(
|
||||||
inner_ops: Vec<HandlerRegistration>,
|
inner_ops: Vec<HandlerRegistration>,
|
||||||
) -> Arc<OperationRegistry> {
|
) -> Arc<OperationRegistry> {
|
||||||
@@ -771,15 +832,20 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn subscribe_streams_sse_data_event_until_completed() {
|
async fn subscribe_on_subscription_streams_multiple_data_frames() {
|
||||||
let router = build_router(registry_with_echo(), unused_provider());
|
let router = build_router(
|
||||||
|
registry_with_subscription_stream(
|
||||||
|
"events/stream",
|
||||||
|
vec![json!({ "n": 1 }), json!({ "n": 2 }), json!({ "n": 3 })],
|
||||||
|
),
|
||||||
|
unused_provider(),
|
||||||
|
);
|
||||||
let req = Request::builder()
|
let req = Request::builder()
|
||||||
.method("POST")
|
.method("POST")
|
||||||
.uri("/subscribe")
|
.uri("/subscribe")
|
||||||
.header("content-type", "application/json")
|
.header("content-type", "application/json")
|
||||||
.body(Body::from(
|
.body(Body::from(
|
||||||
serde_json::to_vec(&json!({ "operation": "echo/run", "input": { "v": 9 } }))
|
serde_json::to_vec(&json!({ "operation": "events/stream", "input": {} })).unwrap(),
|
||||||
.unwrap(),
|
|
||||||
))
|
))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let resp = router.oneshot(req).await.unwrap();
|
let resp = router.oneshot(req).await.unwrap();
|
||||||
@@ -797,10 +863,73 @@ mod tests {
|
|||||||
);
|
);
|
||||||
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
|
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
|
||||||
let body = String::from_utf8_lossy(&bytes);
|
let body = String::from_utf8_lossy(&bytes);
|
||||||
assert!(body.contains("data:"), "expected a data frame, got: {body}");
|
let data_frames = body.matches("data:").count();
|
||||||
|
assert_eq!(data_frames, 3, "expected 3 data frames, got: {body}");
|
||||||
|
assert!(body.contains("\"n\":1"), "expected n=1, got: {body}");
|
||||||
|
assert!(body.contains("\"n\":2"), "expected n=2, got: {body}");
|
||||||
|
assert!(body.contains("\"n\":3"), "expected n=3, got: {body}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn subscribe_on_subscription_that_yields_error_emits_error_event_then_closes() {
|
||||||
|
let router = build_router(
|
||||||
|
registry_with_subscription_error("events/fail", CallError::internal("handler blew up")),
|
||||||
|
unused_provider(),
|
||||||
|
);
|
||||||
|
let req = Request::builder()
|
||||||
|
.method("POST")
|
||||||
|
.uri("/subscribe")
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.body(Body::from(
|
||||||
|
serde_json::to_vec(&json!({ "operation": "events/fail", "input": {} })).unwrap(),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
let resp = router.oneshot(req).await.unwrap();
|
||||||
|
assert_eq!(resp.status(), StatusCode::OK);
|
||||||
|
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
|
||||||
|
let body = String::from_utf8_lossy(&bytes);
|
||||||
assert!(
|
assert!(
|
||||||
body.contains("\"v\":9"),
|
body.contains("event:error") || body.contains("event: error"),
|
||||||
"expected output payload, got: {body}"
|
"expected error event, got: {body}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
body.contains("INTERNAL"),
|
||||||
|
"expected INTERNAL code, got: {body}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
body.contains("handler blew up"),
|
||||||
|
"expected error message, got: {body}"
|
||||||
|
);
|
||||||
|
let data_frames = body.matches("data:").count();
|
||||||
|
assert_eq!(
|
||||||
|
data_frames, 1,
|
||||||
|
"expected exactly one data frame (the error payload), got: {body}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn subscribe_response_content_type_is_text_event_stream() {
|
||||||
|
let router = build_router(
|
||||||
|
registry_with_subscription_stream("events/stream", vec![json!({ "ok": true })]),
|
||||||
|
unused_provider(),
|
||||||
|
);
|
||||||
|
let req = Request::builder()
|
||||||
|
.method("POST")
|
||||||
|
.uri("/subscribe")
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.body(Body::from(
|
||||||
|
serde_json::to_vec(&json!({ "operation": "events/stream", "input": {} })).unwrap(),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
let resp = router.oneshot(req).await.unwrap();
|
||||||
|
let ctype = resp
|
||||||
|
.headers()
|
||||||
|
.get(axum::http::header::CONTENT_TYPE)
|
||||||
|
.map(|v| v.to_str().unwrap().to_string());
|
||||||
|
assert_eq!(
|
||||||
|
ctype.as_deref(),
|
||||||
|
Some("text/event-stream"),
|
||||||
|
"expected text/event-stream, got {ctype:?}"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -829,6 +958,59 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn subscribe_unknown_op_emits_not_found_error_event() {
|
||||||
|
let router = build_router(
|
||||||
|
registry_with_subscription_stream("events/stream", vec![json!({})]),
|
||||||
|
unused_provider(),
|
||||||
|
);
|
||||||
|
let req = Request::builder()
|
||||||
|
.method("POST")
|
||||||
|
.uri("/subscribe")
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.body(Body::from(
|
||||||
|
serde_json::to_vec(&json!({ "operation": "no/such", "input": {} })).unwrap(),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
let resp = router.oneshot(req).await.unwrap();
|
||||||
|
assert_eq!(resp.status(), StatusCode::OK);
|
||||||
|
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
|
||||||
|
let body = String::from_utf8_lossy(&bytes);
|
||||||
|
assert!(
|
||||||
|
body.contains("event:error") || body.contains("event: error"),
|
||||||
|
"expected error event, got: {body}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
body.contains("NOT_FOUND"),
|
||||||
|
"expected NOT_FOUND, got: {body}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn subscribe_on_query_op_emits_invalid_operation_type_error_event() {
|
||||||
|
let router = build_router(registry_with_echo(), unused_provider());
|
||||||
|
let req = Request::builder()
|
||||||
|
.method("POST")
|
||||||
|
.uri("/subscribe")
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.body(Body::from(
|
||||||
|
serde_json::to_vec(&json!({ "operation": "echo/run", "input": {} })).unwrap(),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
let resp = router.oneshot(req).await.unwrap();
|
||||||
|
assert_eq!(resp.status(), StatusCode::OK);
|
||||||
|
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
|
||||||
|
let body = String::from_utf8_lossy(&bytes);
|
||||||
|
assert!(
|
||||||
|
body.contains("event:error") || body.contains("event: error"),
|
||||||
|
"expected error event, got: {body}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
body.contains("INVALID_OPERATION_TYPE"),
|
||||||
|
"expected INVALID_OPERATION_TYPE, got: {body}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn is_internal_op_returns_false_for_unknown() {
|
fn is_internal_op_returns_false_for_unknown() {
|
||||||
let registry = OperationRegistry::new();
|
let registry = OperationRegistry::new();
|
||||||
|
|||||||
Reference in New Issue
Block a user