feat(core): bridge SshSession recv/send to call protocol via alknet-control:0 channel
Implement the SSH session to call protocol bridge: - Add FrameFramedReader/FrameFramedWriter for async I/O of length-prefixed EventEnvelope frames - SshSession::recv() reads InterfaceEvent frames from the alknet-control:0 channel via mpsc - SshSession::send() writes EventEnvelope frames to the alknet-control:0 channel via mpsc - Add ControlChannelBridge implementing ControlChannelHandler for routing channel data - SshHandler::channel_open_direct_tcpip routes alknet-control:0 to the bridge task - Session Identity attached to every InterfaceEvent produced by recv() - ControlChannelRouter gains take_handler() for non-control alknet-* channel routing
This commit is contained in:
@@ -1,3 +1,7 @@
|
|||||||
|
use std::io;
|
||||||
|
|
||||||
|
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||||
|
|
||||||
use crate::call::envelope::EventEnvelope;
|
use crate::call::envelope::EventEnvelope;
|
||||||
|
|
||||||
pub fn encode(envelope: &EventEnvelope) -> Vec<u8> {
|
pub fn encode(envelope: &EventEnvelope) -> Vec<u8> {
|
||||||
@@ -58,6 +62,73 @@ pub enum FrameDecodeError {
|
|||||||
Json(#[from] serde_json::Error),
|
Json(#[from] serde_json::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct FrameFramedReader<S> {
|
||||||
|
stream: S,
|
||||||
|
buf: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> FrameFramedReader<S>
|
||||||
|
where
|
||||||
|
S: AsyncRead + Unpin,
|
||||||
|
{
|
||||||
|
pub fn new(stream: S) -> Self {
|
||||||
|
Self {
|
||||||
|
stream,
|
||||||
|
buf: Vec::with_capacity(4096),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn read_frame(&mut self) -> io::Result<Option<EventEnvelope>> {
|
||||||
|
loop {
|
||||||
|
if self.buf.len() >= 4 {
|
||||||
|
let len = u32::from_be_bytes([self.buf[0], self.buf[1], self.buf[2], self.buf[3]])
|
||||||
|
as usize;
|
||||||
|
let total = 4 + len;
|
||||||
|
if self.buf.len() >= total {
|
||||||
|
let body = &self.buf[4..total];
|
||||||
|
match serde_json::from_slice(body) {
|
||||||
|
Ok(envelope) => {
|
||||||
|
self.buf.drain(..total);
|
||||||
|
return Ok(Some(envelope));
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
self.buf.drain(..total);
|
||||||
|
return Err(io::Error::new(io::ErrorKind::InvalidData, e));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut tmp = [0u8; 4096];
|
||||||
|
match self.stream.read(&mut tmp).await {
|
||||||
|
Ok(0) => return Ok(None),
|
||||||
|
Ok(n) => self.buf.extend_from_slice(&tmp[..n]),
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct FrameFramedWriter<S> {
|
||||||
|
stream: S,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> FrameFramedWriter<S>
|
||||||
|
where
|
||||||
|
S: AsyncWrite + Unpin,
|
||||||
|
{
|
||||||
|
pub fn new(stream: S) -> Self {
|
||||||
|
Self { stream }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn write_frame(&mut self, envelope: &EventEnvelope) -> io::Result<()> {
|
||||||
|
let frame = encode(envelope);
|
||||||
|
self.stream.write_all(&frame).await?;
|
||||||
|
self.stream.flush().await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|||||||
@@ -18,7 +18,9 @@ pub use context::OperationContext;
|
|||||||
pub use env::OperationEnv;
|
pub use env::OperationEnv;
|
||||||
pub use envelope::EventEnvelope;
|
pub use envelope::EventEnvelope;
|
||||||
pub use events::{CALL_ABORTED, CALL_COMPLETED, CALL_ERROR, CALL_REQUESTED, CALL_RESPONDED};
|
pub use events::{CALL_ABORTED, CALL_COMPLETED, CALL_ERROR, CALL_REQUESTED, CALL_RESPONDED};
|
||||||
pub use frame::{decode, decode_with_remainder, encode, FrameDecodeError};
|
pub use frame::{
|
||||||
|
decode, decode_with_remainder, encode, FrameDecodeError, FrameFramedReader, FrameFramedWriter,
|
||||||
|
};
|
||||||
pub use pending::PendingRequestMap;
|
pub use pending::PendingRequestMap;
|
||||||
pub use registry::{Handler, OperationRegistry, OperationRegistryBuilder};
|
pub use registry::{Handler, OperationRegistry, OperationRegistryBuilder};
|
||||||
pub use response::{CallError, ResponseEnvelope};
|
pub use response::{CallError, ResponseEnvelope};
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ pub use http::HttpInterface;
|
|||||||
pub use pairs::{is_valid_pair, TransportKindBase, VALID_TRANSPORT_INTERFACE_PAIRS};
|
pub use pairs::{is_valid_pair, TransportKindBase, VALID_TRANSPORT_INTERFACE_PAIRS};
|
||||||
pub use raw_framing::{RawFramingInterface, RawFramingSession};
|
pub use raw_framing::{RawFramingInterface, RawFramingSession};
|
||||||
pub use session::{InterfaceEvent, InterfaceSession};
|
pub use session::{InterfaceEvent, InterfaceSession};
|
||||||
pub use ssh::{SshInterface, SshSession};
|
pub use ssh::{ControlChannelBridge, SshInterface, SshSession};
|
||||||
|
|
||||||
pub trait TransportStream: AsyncRead + AsyncWrite + Unpin + Send + 'static {}
|
pub trait TransportStream: AsyncRead + AsyncWrite + Unpin + Send + 'static {}
|
||||||
|
|
||||||
|
|||||||
@@ -9,13 +9,18 @@ use russh::keys::ssh_key::HashAlg;
|
|||||||
use russh::server::{self, Config};
|
use russh::server::{self, Config};
|
||||||
use russh::Channel;
|
use russh::Channel;
|
||||||
use russh::ChannelId;
|
use russh::ChannelId;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use crate::auth::identity::{Identity, IdentityProvider};
|
use crate::auth::identity::{Identity, IdentityProvider};
|
||||||
|
use crate::call::frame::{FrameFramedReader, FrameFramedWriter};
|
||||||
use crate::call::EventEnvelope;
|
use crate::call::EventEnvelope;
|
||||||
use crate::config::DynamicConfig;
|
use crate::config::DynamicConfig;
|
||||||
use crate::interface::session::{InterfaceEvent, InterfaceSession};
|
use crate::interface::session::{InterfaceEvent, InterfaceSession};
|
||||||
use crate::interface::{StreamInterface, StreamInterfaceConfig, TransportStream};
|
use crate::interface::{StreamInterface, StreamInterfaceConfig, TransportStream};
|
||||||
use crate::server::control_channel::{ControlChannelRouter, ALKNET_PREFIX};
|
use crate::server::control_channel::{
|
||||||
|
ControlChannelHandler, ControlChannelRouter, DuplexStream, ALKNET_CONTROL_DESTINATION,
|
||||||
|
ALKNET_PREFIX,
|
||||||
|
};
|
||||||
use crate::server::rate_limit::{AuthAttemptLimiter, ConnectionRateLimiter};
|
use crate::server::rate_limit::{AuthAttemptLimiter, ConnectionRateLimiter};
|
||||||
use crate::transport::TransportKind;
|
use crate::transport::TransportKind;
|
||||||
|
|
||||||
@@ -30,6 +35,8 @@ struct SshHandler {
|
|||||||
auth_limiter: AuthAttemptLimiter,
|
auth_limiter: AuthAttemptLimiter,
|
||||||
authenticated_identity: Option<Identity>,
|
authenticated_identity: Option<Identity>,
|
||||||
control_channel_router: ControlChannelRouter,
|
control_channel_router: ControlChannelRouter,
|
||||||
|
bridge_event_tx: Option<mpsc::Sender<InterfaceEvent>>,
|
||||||
|
bridge_envelope_rx: Option<mpsc::Receiver<EventEnvelope>>,
|
||||||
connected_at: Instant,
|
connected_at: Instant,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -76,6 +83,8 @@ impl SshHandler {
|
|||||||
auth_limiter: AuthAttemptLimiter::new(max_auth_attempts),
|
auth_limiter: AuthAttemptLimiter::new(max_auth_attempts),
|
||||||
authenticated_identity: None,
|
authenticated_identity: None,
|
||||||
control_channel_router: ControlChannelRouter::without_handler(),
|
control_channel_router: ControlChannelRouter::without_handler(),
|
||||||
|
bridge_event_tx: None,
|
||||||
|
bridge_envelope_rx: None,
|
||||||
connected_at: Instant::now(),
|
connected_at: Instant::now(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -85,6 +94,20 @@ impl SshHandler {
|
|||||||
self.control_channel_router = router;
|
self.control_channel_router = router;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn with_bridge_channels(
|
||||||
|
mut self,
|
||||||
|
event_tx: mpsc::Sender<InterfaceEvent>,
|
||||||
|
envelope_rx: mpsc::Receiver<EventEnvelope>,
|
||||||
|
) -> Self {
|
||||||
|
self.bridge_event_tx = Some(event_tx);
|
||||||
|
self.bridge_envelope_rx = Some(envelope_rx);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
fn has_control_channel_bridge(&self) -> bool {
|
||||||
|
self.bridge_event_tx.is_some() && self.bridge_envelope_rx.is_some()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for SshHandler {
|
impl Drop for SshHandler {
|
||||||
@@ -176,12 +199,37 @@ impl server::Handler for SshHandler {
|
|||||||
_session: &mut server::Session,
|
_session: &mut server::Session,
|
||||||
) -> Result<bool, Self::Error> {
|
) -> Result<bool, Self::Error> {
|
||||||
if host_to_connect.starts_with(ALKNET_PREFIX) {
|
if host_to_connect.starts_with(ALKNET_PREFIX) {
|
||||||
if !self.control_channel_router.has_handler() {
|
if host_to_connect == ALKNET_CONTROL_DESTINATION && self.has_control_channel_bridge() {
|
||||||
return Ok(false);
|
let event_tx = self.bridge_event_tx.take().unwrap();
|
||||||
}
|
let envelope_rx = self.bridge_envelope_rx.take().unwrap();
|
||||||
let _ = channel;
|
let identity = self.authenticated_identity.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let stream = channel.into_stream();
|
||||||
|
let (read_half, write_half) = tokio::io::split(stream);
|
||||||
|
run_control_channel_bridge(
|
||||||
|
read_half,
|
||||||
|
write_half,
|
||||||
|
identity,
|
||||||
|
event_tx,
|
||||||
|
envelope_rx,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
});
|
||||||
|
let _ = (originator_address, originator_port);
|
||||||
return Ok(true);
|
return Ok(true);
|
||||||
}
|
}
|
||||||
|
if self.control_channel_router.has_handler() {
|
||||||
|
if let Some(handler) = self.control_channel_router.take_handler() {
|
||||||
|
let stream: Box<dyn DuplexStream> = Box::new(channel.into_stream());
|
||||||
|
tokio::spawn(async move {
|
||||||
|
handler.handle_channel(stream).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
let _ = (originator_address, originator_port);
|
||||||
|
return Ok(true);
|
||||||
|
}
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
|
||||||
let identity = self
|
let identity = self
|
||||||
.authenticated_identity
|
.authenticated_identity
|
||||||
@@ -529,6 +577,9 @@ impl SshInterface {
|
|||||||
let identity_provider = Arc::clone(&ssh_config.auth);
|
let identity_provider = Arc::clone(&ssh_config.auth);
|
||||||
let _forwarding = Arc::clone(&ssh_config.forwarding);
|
let _forwarding = Arc::clone(&ssh_config.forwarding);
|
||||||
|
|
||||||
|
let (event_tx, event_rx) = mpsc::channel::<InterfaceEvent>(256);
|
||||||
|
let (envelope_tx, envelope_rx) = mpsc::channel::<EventEnvelope>(256);
|
||||||
|
|
||||||
let handler = SshHandler::new(
|
let handler = SshHandler::new(
|
||||||
Arc::clone(&self.dynamic),
|
Arc::clone(&self.dynamic),
|
||||||
identity_provider,
|
identity_provider,
|
||||||
@@ -537,7 +588,8 @@ impl SshInterface {
|
|||||||
transport,
|
transport,
|
||||||
Arc::clone(&self.connection_limiter),
|
Arc::clone(&self.connection_limiter),
|
||||||
self.max_auth_attempts,
|
self.max_auth_attempts,
|
||||||
);
|
)
|
||||||
|
.with_bridge_channels(event_tx, envelope_rx);
|
||||||
|
|
||||||
let running = server::run_stream(Arc::clone(&self.config), stream, handler).await?;
|
let running = server::run_stream(Arc::clone(&self.config), stream, handler).await?;
|
||||||
let handle = running.handle();
|
let handle = running.handle();
|
||||||
@@ -548,6 +600,8 @@ impl SshInterface {
|
|||||||
Ok(SshSession {
|
Ok(SshSession {
|
||||||
handle,
|
handle,
|
||||||
_join: join,
|
_join: join,
|
||||||
|
event_rx,
|
||||||
|
envelope_tx,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -576,6 +630,8 @@ impl StreamInterface for SshInterface {
|
|||||||
pub struct SshSession {
|
pub struct SshSession {
|
||||||
handle: server::Handle,
|
handle: server::Handle,
|
||||||
_join: tokio::task::JoinHandle<()>,
|
_join: tokio::task::JoinHandle<()>,
|
||||||
|
event_rx: mpsc::Receiver<InterfaceEvent>,
|
||||||
|
envelope_tx: mpsc::Sender<EventEnvelope>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SshSession {
|
impl SshSession {
|
||||||
@@ -586,26 +642,95 @@ impl SshSession {
|
|||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl InterfaceSession for SshSession {
|
impl InterfaceSession for SshSession {
|
||||||
/// Stub for Phase 1 — always returns `None`.
|
|
||||||
///
|
|
||||||
/// TODO: Bridge `alknet-control:0` channel events to call protocol
|
|
||||||
/// `InterfaceEvent` frames. Planned for Phase 2/3.
|
|
||||||
async fn recv(&mut self) -> Option<InterfaceEvent> {
|
async fn recv(&mut self) -> Option<InterfaceEvent> {
|
||||||
None
|
self.event_rx.recv().await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stub for Phase 1 — accepts silently and discards.
|
async fn send(&mut self, envelope: EventEnvelope) -> Result<()> {
|
||||||
///
|
self.envelope_tx
|
||||||
/// TODO: Bridge outgoing `EventEnvelope` frames to the SSH channel
|
.send(envelope)
|
||||||
/// established by the call protocol. Planned for Phase 2/3.
|
.await
|
||||||
async fn send(&mut self, _envelope: EventEnvelope) -> Result<()> {
|
.map_err(|_| anyhow::anyhow!("control channel bridge closed"))
|
||||||
Ok(())
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_control_channel_bridge<R, W>(
|
||||||
|
read_half: R,
|
||||||
|
write_half: W,
|
||||||
|
identity: Option<Identity>,
|
||||||
|
event_tx: mpsc::Sender<InterfaceEvent>,
|
||||||
|
mut envelope_rx: mpsc::Receiver<EventEnvelope>,
|
||||||
|
) where
|
||||||
|
R: tokio::io::AsyncRead + Unpin,
|
||||||
|
W: tokio::io::AsyncWrite + Unpin,
|
||||||
|
{
|
||||||
|
let mut reader = FrameFramedReader::new(read_half);
|
||||||
|
let mut writer = FrameFramedWriter::new(write_half);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
frame = reader.read_frame() => {
|
||||||
|
match frame {
|
||||||
|
Ok(Some(envelope)) => {
|
||||||
|
let event = match &identity {
|
||||||
|
Some(id) => InterfaceEvent::with_identity(envelope, id.clone()),
|
||||||
|
None => InterfaceEvent::new(envelope),
|
||||||
|
};
|
||||||
|
if event_tx.send(event).await.is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => return,
|
||||||
|
Err(_) => return,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
envelope = envelope_rx.recv() => {
|
||||||
|
match envelope {
|
||||||
|
Some(envelope) => {
|
||||||
|
if writer.write_frame(&envelope).await.is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => return,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ControlChannelBridge {
|
||||||
|
identity: Option<Identity>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ControlChannelBridge {
|
||||||
|
pub fn new(identity: Option<Identity>) -> Self {
|
||||||
|
Self { identity }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ControlChannelHandler for ControlChannelBridge {
|
||||||
|
async fn handle_channel(&self, stream: Box<dyn DuplexStream>) {
|
||||||
|
let (event_tx, _event_rx) = mpsc::channel::<InterfaceEvent>(256);
|
||||||
|
let (_envelope_tx, envelope_rx) = mpsc::channel::<EventEnvelope>(256);
|
||||||
|
|
||||||
|
let identity = self.identity.clone();
|
||||||
|
let (read_half, write_half) = tokio::io::split(stream);
|
||||||
|
tokio::spawn(run_control_channel_bridge(
|
||||||
|
read_half,
|
||||||
|
write_half,
|
||||||
|
identity,
|
||||||
|
event_tx,
|
||||||
|
envelope_rx,
|
||||||
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::call::frame::{FrameFramedReader, FrameFramedWriter};
|
||||||
|
use tokio::io::duplex;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn ssh_interface_constructs_with_config() {
|
fn ssh_interface_constructs_with_config() {
|
||||||
@@ -738,4 +863,116 @@ mod tests {
|
|||||||
let result = iface.accept(stream, &raw_config).await;
|
let result = iface.accept(stream, &raw_config).await;
|
||||||
assert!(result.is_err());
|
assert!(result.is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn ssh_session_round_trip_event_envelope() {
|
||||||
|
let (client, server) = duplex(4096);
|
||||||
|
|
||||||
|
let (event_tx, mut event_rx) = mpsc::channel::<InterfaceEvent>(256);
|
||||||
|
let (envelope_tx, envelope_rx) = mpsc::channel::<EventEnvelope>(256);
|
||||||
|
|
||||||
|
let identity = Identity {
|
||||||
|
id: "SHA256:test".to_string(),
|
||||||
|
scopes: vec![],
|
||||||
|
resources: std::collections::HashMap::new(),
|
||||||
|
};
|
||||||
|
let identity_clone = identity.clone();
|
||||||
|
|
||||||
|
let (server_read, server_write) = tokio::io::split(server);
|
||||||
|
tokio::spawn(run_control_channel_bridge(
|
||||||
|
server_read,
|
||||||
|
server_write,
|
||||||
|
Some(identity_clone),
|
||||||
|
event_tx,
|
||||||
|
envelope_rx,
|
||||||
|
));
|
||||||
|
|
||||||
|
let (client_read, client_write) = tokio::io::split(client);
|
||||||
|
let mut client_reader = FrameFramedReader::new(client_read);
|
||||||
|
let mut client_writer = FrameFramedWriter::new(client_write);
|
||||||
|
|
||||||
|
let envelope = EventEnvelope::call_requested("req-1", serde_json::json!({"op": "test"}));
|
||||||
|
client_writer.write_frame(&envelope).await.unwrap();
|
||||||
|
|
||||||
|
let received_event =
|
||||||
|
tokio::time::timeout(std::time::Duration::from_secs(2), event_rx.recv())
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(received_event.envelope, envelope);
|
||||||
|
assert_eq!(received_event.identity.as_ref().unwrap().id, "SHA256:test");
|
||||||
|
|
||||||
|
let response = EventEnvelope::call_responded("req-1", serde_json::json!({"result": 42}));
|
||||||
|
envelope_tx.send(response.clone()).await.unwrap();
|
||||||
|
|
||||||
|
let read_back = tokio::time::timeout(
|
||||||
|
std::time::Duration::from_secs(2),
|
||||||
|
client_reader.read_frame(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(read_back, response);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn ssh_session_recv_without_identity() {
|
||||||
|
let (client, server) = duplex(4096);
|
||||||
|
|
||||||
|
let (event_tx, mut event_rx) = mpsc::channel::<InterfaceEvent>(256);
|
||||||
|
let (_envelope_tx, envelope_rx) = mpsc::channel::<EventEnvelope>(256);
|
||||||
|
|
||||||
|
let (server_read, server_write) = tokio::io::split(server);
|
||||||
|
tokio::spawn(run_control_channel_bridge(
|
||||||
|
server_read,
|
||||||
|
server_write,
|
||||||
|
None,
|
||||||
|
event_tx,
|
||||||
|
envelope_rx,
|
||||||
|
));
|
||||||
|
|
||||||
|
let (client_read, client_write) = tokio::io::split(client);
|
||||||
|
let mut client_writer = FrameFramedWriter::new(client_write);
|
||||||
|
let _client_reader = FrameFramedReader::new(client_read);
|
||||||
|
|
||||||
|
let envelope = EventEnvelope::call_requested("req-2", serde_json::json!({"op": "no-id"}));
|
||||||
|
client_writer.write_frame(&envelope).await.unwrap();
|
||||||
|
|
||||||
|
let received_event =
|
||||||
|
tokio::time::timeout(std::time::Duration::from_secs(2), event_rx.recv())
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(received_event.envelope, envelope);
|
||||||
|
assert!(received_event.identity.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn control_channel_router_with_handler_routes_data() {
|
||||||
|
let called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
|
||||||
|
let called_clone = called.clone();
|
||||||
|
|
||||||
|
struct TrackingHandler {
|
||||||
|
called: std::sync::Arc<std::sync::atomic::AtomicBool>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ControlChannelHandler for TrackingHandler {
|
||||||
|
async fn handle_channel(&self, _stream: Box<dyn DuplexStream>) {
|
||||||
|
self.called.store(true, std::sync::atomic::Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let router = ControlChannelRouter::with_handler(Box::new(TrackingHandler {
|
||||||
|
called: called_clone,
|
||||||
|
}));
|
||||||
|
assert!(router.has_handler());
|
||||||
|
|
||||||
|
let (_client, server) = duplex(64);
|
||||||
|
let stream: Box<dyn DuplexStream> = Box::new(server);
|
||||||
|
let result = router.route(stream).await;
|
||||||
|
assert!(result.is_ok());
|
||||||
|
assert!(called.load(std::sync::atomic::Ordering::SeqCst));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -60,6 +60,10 @@ impl ControlChannelRouter {
|
|||||||
)),
|
)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn take_handler(&mut self) -> Option<Box<dyn ControlChannelHandler>> {
|
||||||
|
self.handler.take()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
Reference in New Issue
Block a user