Compare commits
4 Commits
c34b4d2df4
...
develop
| Author | SHA1 | Date | |
|---|---|---|---|
| e258ce0523 | |||
| ab610730c0 | |||
| c77024cdf5 | |||
| 9e4d17b1c5 |
@@ -17,7 +17,8 @@ use axum::response::sse::Event;
|
||||
use axum::response::{IntoResponse, Json, Response, Sse};
|
||||
use axum::routing::{get, post};
|
||||
use axum::Router;
|
||||
use futures::stream::{self, BoxStream, Stream};
|
||||
use futures::stream::{self, BoxStream};
|
||||
use futures::StreamExt;
|
||||
use serde::Deserialize;
|
||||
use serde_json::{json, Value};
|
||||
|
||||
@@ -163,18 +164,29 @@ pub(crate) async fn subscribe_handler(
|
||||
subscribe_stream_internal_error(request.operation)
|
||||
} else {
|
||||
let dispatch = state.dispatch();
|
||||
let envelope = dispatch
|
||||
.invoke(identity, &request.operation, request.input)
|
||||
.await;
|
||||
subscribe_stream_from_envelope(envelope)
|
||||
let envelope_stream =
|
||||
dispatch.invoke_streaming(identity, &request.operation, request.input);
|
||||
subscribe_stream_from_envelope_stream(envelope_stream)
|
||||
};
|
||||
Sse::new(stream)
|
||||
}
|
||||
|
||||
pub type SubscribeStream = BoxStream<'static, Result<Event, Infallible>>;
|
||||
|
||||
fn subscribe_stream_from_envelope(envelope: ResponseEnvelope) -> SubscribeStream {
|
||||
Box::pin(envelope_to_sse_stream(envelope))
|
||||
fn subscribe_stream_from_envelope_stream(
|
||||
stream: BoxStream<'static, ResponseEnvelope>,
|
||||
) -> SubscribeStream {
|
||||
Box::pin(stream.map(|envelope| match envelope.result {
|
||||
Ok(output) => {
|
||||
let data = serde_json::to_string(&output).unwrap_or_else(|_| "null".to_string());
|
||||
Ok(Event::default().data(data))
|
||||
}
|
||||
Err(error) => {
|
||||
let payload = serde_json::to_value(&error).unwrap_or(Value::Null);
|
||||
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "null".to_string());
|
||||
Ok(Event::default().event("error").data(data))
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
fn subscribe_stream_internal_error(operation: String) -> SubscribeStream {
|
||||
@@ -263,24 +275,6 @@ fn is_internal_op(registry: &OperationRegistry, operation: &str) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
fn envelope_to_sse_stream(
|
||||
envelope: ResponseEnvelope,
|
||||
) -> impl Stream<Item = Result<Event, Infallible>> {
|
||||
stream::once(async move {
|
||||
match envelope.result {
|
||||
Ok(output) => {
|
||||
let data = serde_json::to_string(&output).unwrap_or_else(|_| "null".to_string());
|
||||
Ok(Event::default().data(data))
|
||||
}
|
||||
Err(error) => {
|
||||
let payload = serde_json::to_value(&error).unwrap_or(Value::Null);
|
||||
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "null".to_string());
|
||||
Ok(Event::default().event("error").data(data))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn error_event(operation: &str) -> Result<Event, Infallible> {
|
||||
let error = CallError::not_found(operation);
|
||||
let payload = serde_json::to_value(&error).unwrap_or(Value::Null);
|
||||
@@ -295,7 +289,7 @@ mod tests {
|
||||
services_list_handler, services_list_spec, services_schema_handler, services_schema_spec,
|
||||
};
|
||||
use alknet_call::registry::registration::{
|
||||
make_handler, HandlerKind, HandlerRegistration, OperationProvenance,
|
||||
make_handler, make_streaming_handler, HandlerKind, HandlerRegistration, OperationProvenance,
|
||||
};
|
||||
use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType};
|
||||
use alknet_core::auth::{AuthToken, Identity};
|
||||
@@ -425,6 +419,73 @@ mod tests {
|
||||
Arc::new(registry)
|
||||
}
|
||||
|
||||
fn subscription_spec(name: &str, visibility: Visibility, acl: AccessControl) -> OperationSpec {
|
||||
OperationSpec::new(
|
||||
name,
|
||||
OperationType::Subscription,
|
||||
visibility,
|
||||
json!({}),
|
||||
json!({}),
|
||||
vec![],
|
||||
acl,
|
||||
)
|
||||
}
|
||||
|
||||
fn multi_event_streaming_handler(
|
||||
outputs: Vec<Value>,
|
||||
) -> alknet_call::registry::registration::StreamingHandler {
|
||||
make_streaming_handler(move |_input, ctx| {
|
||||
let request_id = ctx.request_id.clone();
|
||||
let outputs = outputs.clone();
|
||||
futures::stream::iter(
|
||||
outputs
|
||||
.into_iter()
|
||||
.map(move |o| ResponseEnvelope::ok(request_id.clone(), o)),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn error_streaming_handler(error: CallError) -> HandlerKind {
|
||||
HandlerKind::Stream(make_streaming_handler(move |_input, ctx| {
|
||||
let request_id = ctx.request_id.clone();
|
||||
let error = error.clone();
|
||||
futures::stream::iter(vec![ResponseEnvelope::error(request_id, error)])
|
||||
}))
|
||||
}
|
||||
|
||||
fn registry_with_subscription_stream(
|
||||
name: &str,
|
||||
outputs: Vec<Value>,
|
||||
) -> Arc<OperationRegistry> {
|
||||
let mut registry = OperationRegistry::new();
|
||||
registry
|
||||
.register(HandlerRegistration::new(
|
||||
subscription_spec(name, Visibility::External, AccessControl::default()),
|
||||
HandlerKind::Stream(multi_event_streaming_handler(outputs)),
|
||||
OperationProvenance::Local,
|
||||
None,
|
||||
None,
|
||||
Capabilities::new(),
|
||||
))
|
||||
.unwrap();
|
||||
Arc::new(registry)
|
||||
}
|
||||
|
||||
fn registry_with_subscription_error(name: &str, error: CallError) -> Arc<OperationRegistry> {
|
||||
let mut registry = OperationRegistry::new();
|
||||
registry
|
||||
.register(HandlerRegistration::new(
|
||||
subscription_spec(name, Visibility::External, AccessControl::default()),
|
||||
error_streaming_handler(error),
|
||||
OperationProvenance::Local,
|
||||
None,
|
||||
None,
|
||||
Capabilities::new(),
|
||||
))
|
||||
.unwrap();
|
||||
Arc::new(registry)
|
||||
}
|
||||
|
||||
fn registry_with_discovery_and_ops(
|
||||
inner_ops: Vec<HandlerRegistration>,
|
||||
) -> Arc<OperationRegistry> {
|
||||
@@ -771,15 +832,20 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subscribe_streams_sse_data_event_until_completed() {
|
||||
let router = build_router(registry_with_echo(), unused_provider());
|
||||
async fn subscribe_on_subscription_streams_multiple_data_frames() {
|
||||
let router = build_router(
|
||||
registry_with_subscription_stream(
|
||||
"events/stream",
|
||||
vec![json!({ "n": 1 }), json!({ "n": 2 }), json!({ "n": 3 })],
|
||||
),
|
||||
unused_provider(),
|
||||
);
|
||||
let req = Request::builder()
|
||||
.method("POST")
|
||||
.uri("/subscribe")
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(
|
||||
serde_json::to_vec(&json!({ "operation": "echo/run", "input": { "v": 9 } }))
|
||||
.unwrap(),
|
||||
serde_json::to_vec(&json!({ "operation": "events/stream", "input": {} })).unwrap(),
|
||||
))
|
||||
.unwrap();
|
||||
let resp = router.oneshot(req).await.unwrap();
|
||||
@@ -797,10 +863,73 @@ mod tests {
|
||||
);
|
||||
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
|
||||
let body = String::from_utf8_lossy(&bytes);
|
||||
assert!(body.contains("data:"), "expected a data frame, got: {body}");
|
||||
let data_frames = body.matches("data:").count();
|
||||
assert_eq!(data_frames, 3, "expected 3 data frames, got: {body}");
|
||||
assert!(body.contains("\"n\":1"), "expected n=1, got: {body}");
|
||||
assert!(body.contains("\"n\":2"), "expected n=2, got: {body}");
|
||||
assert!(body.contains("\"n\":3"), "expected n=3, got: {body}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subscribe_on_subscription_that_yields_error_emits_error_event_then_closes() {
|
||||
let router = build_router(
|
||||
registry_with_subscription_error("events/fail", CallError::internal("handler blew up")),
|
||||
unused_provider(),
|
||||
);
|
||||
let req = Request::builder()
|
||||
.method("POST")
|
||||
.uri("/subscribe")
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(
|
||||
serde_json::to_vec(&json!({ "operation": "events/fail", "input": {} })).unwrap(),
|
||||
))
|
||||
.unwrap();
|
||||
let resp = router.oneshot(req).await.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
|
||||
let body = String::from_utf8_lossy(&bytes);
|
||||
assert!(
|
||||
body.contains("\"v\":9"),
|
||||
"expected output payload, got: {body}"
|
||||
body.contains("event:error") || body.contains("event: error"),
|
||||
"expected error event, got: {body}"
|
||||
);
|
||||
assert!(
|
||||
body.contains("INTERNAL"),
|
||||
"expected INTERNAL code, got: {body}"
|
||||
);
|
||||
assert!(
|
||||
body.contains("handler blew up"),
|
||||
"expected error message, got: {body}"
|
||||
);
|
||||
let data_frames = body.matches("data:").count();
|
||||
assert_eq!(
|
||||
data_frames, 1,
|
||||
"expected exactly one data frame (the error payload), got: {body}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subscribe_response_content_type_is_text_event_stream() {
|
||||
let router = build_router(
|
||||
registry_with_subscription_stream("events/stream", vec![json!({ "ok": true })]),
|
||||
unused_provider(),
|
||||
);
|
||||
let req = Request::builder()
|
||||
.method("POST")
|
||||
.uri("/subscribe")
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(
|
||||
serde_json::to_vec(&json!({ "operation": "events/stream", "input": {} })).unwrap(),
|
||||
))
|
||||
.unwrap();
|
||||
let resp = router.oneshot(req).await.unwrap();
|
||||
let ctype = resp
|
||||
.headers()
|
||||
.get(axum::http::header::CONTENT_TYPE)
|
||||
.map(|v| v.to_str().unwrap().to_string());
|
||||
assert_eq!(
|
||||
ctype.as_deref(),
|
||||
Some("text/event-stream"),
|
||||
"expected text/event-stream, got {ctype:?}"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -829,6 +958,59 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subscribe_unknown_op_emits_not_found_error_event() {
|
||||
let router = build_router(
|
||||
registry_with_subscription_stream("events/stream", vec![json!({})]),
|
||||
unused_provider(),
|
||||
);
|
||||
let req = Request::builder()
|
||||
.method("POST")
|
||||
.uri("/subscribe")
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(
|
||||
serde_json::to_vec(&json!({ "operation": "no/such", "input": {} })).unwrap(),
|
||||
))
|
||||
.unwrap();
|
||||
let resp = router.oneshot(req).await.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
|
||||
let body = String::from_utf8_lossy(&bytes);
|
||||
assert!(
|
||||
body.contains("event:error") || body.contains("event: error"),
|
||||
"expected error event, got: {body}"
|
||||
);
|
||||
assert!(
|
||||
body.contains("NOT_FOUND"),
|
||||
"expected NOT_FOUND, got: {body}"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn subscribe_on_query_op_emits_invalid_operation_type_error_event() {
|
||||
let router = build_router(registry_with_echo(), unused_provider());
|
||||
let req = Request::builder()
|
||||
.method("POST")
|
||||
.uri("/subscribe")
|
||||
.header("content-type", "application/json")
|
||||
.body(Body::from(
|
||||
serde_json::to_vec(&json!({ "operation": "echo/run", "input": {} })).unwrap(),
|
||||
))
|
||||
.unwrap();
|
||||
let resp = router.oneshot(req).await.unwrap();
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
|
||||
let body = String::from_utf8_lossy(&bytes);
|
||||
assert!(
|
||||
body.contains("event:error") || body.contains("event: error"),
|
||||
"expected error event, got: {body}"
|
||||
);
|
||||
assert!(
|
||||
body.contains("INVALID_OPERATION_TYPE"),
|
||||
"expected INVALID_OPERATION_TYPE, got: {body}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_internal_op_returns_false_for_unknown() {
|
||||
let registry = OperationRegistry::new();
|
||||
|
||||
@@ -779,10 +779,11 @@ mod tests {
|
||||
let out = handle_inbound_envelope(&dp, &conn, request)
|
||||
.await
|
||||
.expect("response");
|
||||
assert_eq!(out.r#type, EVENT_ERROR);
|
||||
assert_eq!(out.r#type, EVENT_RESPONDED);
|
||||
assert_eq!(out.id, "sub-0");
|
||||
assert_eq!(
|
||||
out.payload.get("code"),
|
||||
Some(&serde_json::json!("INVALID_OPERATION_TYPE"))
|
||||
out.payload.get("output"),
|
||||
Some(&serde_json::json!({ "n": 1 }))
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1077,10 +1078,10 @@ mod tests {
|
||||
MockMsg::Binary(bytes) => {
|
||||
let env: EventEnvelope = serde_json::from_slice(&bytes).unwrap();
|
||||
assert_eq!(env.id, "sub-ws-0");
|
||||
assert_eq!(env.r#type, EVENT_ERROR);
|
||||
assert_eq!(env.r#type, EVENT_RESPONDED);
|
||||
assert_eq!(
|
||||
env.payload.get("code"),
|
||||
Some(&serde_json::json!("INVALID_OPERATION_TYPE"))
|
||||
env.payload.get("output"),
|
||||
Some(&serde_json::json!({ "n": 1 }))
|
||||
);
|
||||
}
|
||||
other => panic!("expected binary, got {other:?}"),
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
---
|
||||
id: http/server/subscribe-sse-streaming
|
||||
name: Wire /subscribe handler to GatewayDispatch::invoke_streaming() and pipe BoxStream to SSE
|
||||
status: pending
|
||||
status: completed
|
||||
depends_on: [http/gateway/invoke-streaming]
|
||||
scope: narrow
|
||||
risk: medium
|
||||
@@ -153,4 +153,4 @@ stream is dropped (not leaked) on disconnect.
|
||||
|
||||
## Summary
|
||||
|
||||
> To be filled on completion
|
||||
> Replaced /subscribe one-event placeholder with real streaming path. subscribe_handler now calls GatewayDispatch::invoke_streaming() and pipes BoxStream to SSE via subscribe_stream_from_envelope_stream (StreamExt::map). Ok → data: frame, Err → event:error (terminal, stream ends after). Removed placeholder helpers (subscribe_stream_from_envelope, envelope_to_sse_stream). Kept subscribe_stream_internal_error for Internal ops (NOT_FOUND). Added 6 unit tests. Also fixed 2 pre-existing websocket subscription tests that expected INVALID_OPERATION_TYPE but now get call.responded (dispatch_requested routes Subscription via invoke_streaming). 247 tests pass.
|
||||
@@ -1,7 +1,7 @@
|
||||
---
|
||||
id: review-streaming-impl
|
||||
name: Review ADR-049 streaming handler implementation for spec conformance and end-to-end correctness
|
||||
status: pending
|
||||
status: completed
|
||||
depends_on: [call/protocol/dispatch-streaming-branch, call/client/from-call-streaming-forwarding, http/gateway/invoke-streaming, http/server/subscribe-sse-streaming, http/adapters/from-openapi-sse-streaming]
|
||||
scope: broad
|
||||
risk: low
|
||||
@@ -207,4 +207,4 @@ review.
|
||||
|
||||
## Summary
|
||||
|
||||
> To be filled on completion
|
||||
> Reviewed ADR-049 streaming handler implementation across all 12 checklist points. All type surface, registry, builder, dispatch, from_call, gateway, /subscribe SSE, from_openapi SSE, ADR conformance, end-to-end correctness, pattern consistency, and test coverage items verified. 555 tests pass (306 call + 2 integration + 247 http), clippy clean, fmt clean. Fixed 2 pre-existing websocket subscription tests that expected INVALID_OPERATION_TYPE but now get call.responded (dispatch_requested routes Subscription via invoke_streaming). All 9 ADR-049 decisions implemented. Placeholders removed (subscribe_stream_from_envelope, envelope_to_sse_stream, stream_subscription). from_mcp unchanged (always HandlerKind::Once).
|
||||
Reference in New Issue
Block a user