fix: resolve review #004 findings W1-W4 + close review gate
W1 (call/protocol/abort-cascade-wiring): wire AbortCascade into CallAdapter handle_stream for EVENT_ABORTED. Cascades with AbortPolicy::AbortDependents, aborts root, no descendant frames on wire (ADR-016 Decision 2). Two integration tests added. W2 (core/endpoint-client-fingerprint): extract TLS client cert fingerprint in dispatch_quinn (SHA256:<hex> of leaf cert DER via peer_identity) and dispatch_iroh (ed25519:<hex> of peer NodeId). Fingerprint format documented in auth.md. Server config change (with_no_client_auth → request-but-don't-require) deferred to new follow-up task core/endpoint-request-client-cert. W3 (vault/mnemonic-debug-redaction): replace Mnemonic derive(Debug) with manual redacting impl (phrase: "[REDACTED]"). Seed confirmed no Debug impl. Redaction test added. W4 (core/auth-apikey-resources): Option B — drop entry.resources from spec. External identities (token/fingerprint) grant scopes only; resource-scoped ACLs are composition-internal (ADR-015/022). auth.md corrected + limitation documented. Two tests confirm empty resources. review-post-impl-fixes: all 4 verified, workspace green (326 tests, 0 failures, 0 clippy warnings). Review #004 status → resolved. Graph: 34 tasks, 12 gens.
This commit is contained in:
@@ -16,10 +16,11 @@ use serde_json::Value;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
use super::abort::AbortCascade;
|
||||
use super::connection::CallConnection;
|
||||
use super::wire::{
|
||||
CallError, EventEnvelope, FrameFramedReader, FrameFramedWriter, ResponseEnvelope,
|
||||
EVENT_REQUESTED,
|
||||
EVENT_ABORTED, EVENT_REQUESTED,
|
||||
};
|
||||
use crate::registry::context::{AbortPolicy, OperationContext, ScopedOperationEnv};
|
||||
use crate::registry::env::{CompositeOperationEnv, LocalOperationEnv, OperationEnv};
|
||||
@@ -207,22 +208,34 @@ impl CallAdapter {
|
||||
}
|
||||
};
|
||||
|
||||
if envelope.r#type != EVENT_REQUESTED {
|
||||
debug!(event_type = %envelope.r#type, id = %envelope.id, "ignoring non-requested event on inbound stream");
|
||||
continue;
|
||||
}
|
||||
match envelope.r#type.as_str() {
|
||||
EVENT_REQUESTED => {
|
||||
let request_id = envelope.id.clone();
|
||||
let payload = envelope.payload.clone();
|
||||
|
||||
let request_id = envelope.id.clone();
|
||||
let payload = envelope.payload.clone();
|
||||
let response = self
|
||||
.dispatch_requested(&connection, request_id.clone(), payload)
|
||||
.await;
|
||||
|
||||
let response = self
|
||||
.dispatch_requested(&connection, request_id.clone(), payload)
|
||||
.await;
|
||||
|
||||
let event: EventEnvelope = response.into();
|
||||
if let Err(err) = writer.write_frame(&event).await {
|
||||
warn!(error = %err, "failed to write response frame; closing stream");
|
||||
break;
|
||||
let event: EventEnvelope = response.into();
|
||||
if let Err(err) = writer.write_frame(&event).await {
|
||||
warn!(error = %err, "failed to write response frame; closing stream");
|
||||
break;
|
||||
}
|
||||
}
|
||||
EVENT_ABORTED => {
|
||||
let request_id = envelope.id.clone();
|
||||
let mut pending = connection.pending().lock();
|
||||
let mut cascade = AbortCascade::new(&mut pending);
|
||||
let aborted = cascade.cascade_abort(&request_id, AbortPolicy::AbortDependents);
|
||||
pending.handle_aborted(&request_id);
|
||||
if !aborted.is_empty() {
|
||||
debug!(count = aborted.len(), "abort cascade evicted descendants");
|
||||
}
|
||||
}
|
||||
other => {
|
||||
debug!(event_type = %other, id = %envelope.id, "ignoring non-requested/non-aborted event on inbound stream");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -312,6 +325,7 @@ mod tests {
|
||||
use std::collections::HashMap;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::sync::Mutex as StdMutex;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
struct StaticIdentityProvider {
|
||||
tokens: StdMutex<HashMap<String, Identity>>,
|
||||
@@ -946,4 +960,92 @@ mod tests {
|
||||
other => panic!("expected NOT_FOUND, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn encode_frame(envelope: &EventEnvelope) -> Vec<u8> {
|
||||
let body = serde_json::to_vec(envelope).unwrap();
|
||||
let mut buf = (body.len() as u32).to_be_bytes().to_vec();
|
||||
buf.extend_from_slice(&body);
|
||||
buf
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn handle_stream_aborted_cascades_parent_and_child() {
|
||||
let registry = registry_with(
|
||||
"parent/run",
|
||||
Visibility::External,
|
||||
AccessControl::default(),
|
||||
echo_handler(),
|
||||
);
|
||||
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||
let adapter = CallAdapter::new(registry, provider);
|
||||
let conn = Arc::new(CallConnection::new(stub_connection()));
|
||||
|
||||
{
|
||||
let mut pending = conn.pending().lock();
|
||||
pending.register_call(
|
||||
"parent-1".to_string(),
|
||||
Instant::now() + Duration::from_secs(30),
|
||||
None,
|
||||
);
|
||||
pending.register_call(
|
||||
"child-1".to_string(),
|
||||
Instant::now() + Duration::from_secs(30),
|
||||
Some("parent-1".to_string()),
|
||||
);
|
||||
}
|
||||
|
||||
let frame = encode_frame(&EventEnvelope::aborted("parent-1"));
|
||||
let recv = tokio::io::BufReader::new(std::io::Cursor::new(frame));
|
||||
let (send, _recv_sink) = tokio::io::duplex(64);
|
||||
let send = alknet_core::types::SendStream::from_mock(send);
|
||||
let recv = alknet_core::types::RecvStream::from_mock(recv);
|
||||
|
||||
adapter.handle_stream(conn.clone(), send, recv).await;
|
||||
|
||||
let pending = conn.pending().lock();
|
||||
assert!(
|
||||
!pending.contains("parent-1"),
|
||||
"parent entry must be removed after abort"
|
||||
);
|
||||
assert!(
|
||||
!pending.contains("child-1"),
|
||||
"child entry must be removed by cascade"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn handle_stream_aborted_unknown_request_id_is_noop() {
|
||||
let registry = registry_with(
|
||||
"parent/run",
|
||||
Visibility::External,
|
||||
AccessControl::default(),
|
||||
echo_handler(),
|
||||
);
|
||||
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||
let adapter = CallAdapter::new(registry, provider);
|
||||
let conn = Arc::new(CallConnection::new(stub_connection()));
|
||||
|
||||
{
|
||||
let mut pending = conn.pending().lock();
|
||||
pending.register_call(
|
||||
"unrelated-1".to_string(),
|
||||
Instant::now() + Duration::from_secs(30),
|
||||
None,
|
||||
);
|
||||
}
|
||||
|
||||
let frame = encode_frame(&EventEnvelope::aborted("does-not-exist"));
|
||||
let recv = tokio::io::BufReader::new(std::io::Cursor::new(frame));
|
||||
let (send, _recv_sink) = tokio::io::duplex(64);
|
||||
let send = alknet_core::types::SendStream::from_mock(send);
|
||||
let recv = alknet_core::types::RecvStream::from_mock(recv);
|
||||
|
||||
adapter.handle_stream(conn.clone(), send, recv).await;
|
||||
|
||||
let pending = conn.pending().lock();
|
||||
assert!(
|
||||
pending.contains("unrelated-1"),
|
||||
"unrelated entry must survive abort of unknown id"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user