feat(iroh): add from_endpoint constructors for shared iroh Endpoint
Enable running wraith alongside iroh-blobs, iroh-gossip, and iroh-docs on the same QUIC endpoint (one connection per peer, multiplexed by ALPN). - IrohTransport::from_endpoint(node_id, endpoint) for client-side shared endpoint - IrohAcceptor::from_endpoint(endpoint) for server-side shared endpoint - Export ALPN constant as IROH_ALPN for Router registration - Add owned() method to track whether the endpoint was created internally - Existing new()/bind() constructors unchanged (backwards compatible) - Add tests for from_endpoint constructors and shared endpoint connectivity
This commit is contained in:
@@ -9,7 +9,7 @@ use tokio::io;
|
|||||||
|
|
||||||
use super::{Transport, TransportAcceptor, TransportInfo, TransportKind};
|
use super::{Transport, TransportAcceptor, TransportInfo, TransportKind};
|
||||||
|
|
||||||
const ALPN: &[u8] = b"wraith-ssh";
|
pub const ALPN: &[u8] = b"wraith-ssh";
|
||||||
const DEFAULT_RELAY_URL: &str = "https://relay.iroh.network/";
|
const DEFAULT_RELAY_URL: &str = "https://relay.iroh.network/";
|
||||||
|
|
||||||
/// A client-side iroh QUIC P2P transport that connects to a remote iroh endpoint.
|
/// A client-side iroh QUIC P2P transport that connects to a remote iroh endpoint.
|
||||||
@@ -18,12 +18,21 @@ const DEFAULT_RELAY_URL: &str = "https://relay.iroh.network/";
|
|||||||
/// QUIC stream with `conn.open_bi()`, and joins the halves with
|
/// QUIC stream with `conn.open_bi()`, and joins the halves with
|
||||||
/// `tokio::io::join(recv, send)` to produce a duplex stream for russh.
|
/// `tokio::io::join(recv, send)` to produce a duplex stream for russh.
|
||||||
/// Per ADR-003, `tokio::io::join` is used instead of a custom wrapper.
|
/// Per ADR-003, `tokio::io::join` is used instead of a custom wrapper.
|
||||||
|
///
|
||||||
|
/// Use [`IrohTransport::new`] to create a standalone endpoint, or
|
||||||
|
/// [`IrohTransport::from_endpoint`] to share an existing iroh `Endpoint`
|
||||||
|
/// with other protocol handlers (blobs, gossip, docs).
|
||||||
pub struct IrohTransport {
|
pub struct IrohTransport {
|
||||||
node_id: NodeId,
|
node_id: NodeId,
|
||||||
endpoint: Endpoint,
|
endpoint: Endpoint,
|
||||||
|
owned: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IrohTransport {
|
impl IrohTransport {
|
||||||
|
/// Create a new iroh transport with its own dedicated endpoint.
|
||||||
|
///
|
||||||
|
/// The endpoint is created with the `wraith-ssh` ALPN and the provided
|
||||||
|
/// relay URL. Use this when wraith is the only iroh service on this node.
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
node_id: NodeId,
|
node_id: NodeId,
|
||||||
relay_url: Option<RelayUrl>,
|
relay_url: Option<RelayUrl>,
|
||||||
@@ -40,7 +49,18 @@ impl IrohTransport {
|
|||||||
builder = builder.proxy_url(proxy.clone());
|
builder = builder.proxy_url(proxy.clone());
|
||||||
}
|
}
|
||||||
let endpoint = builder.bind().await?;
|
let endpoint = builder.bind().await?;
|
||||||
Ok(Self { node_id, endpoint })
|
Ok(Self { node_id, endpoint, owned: true })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create an iroh transport using an existing shared endpoint.
|
||||||
|
///
|
||||||
|
/// The endpoint must already have the `wraith-ssh` ALPN registered
|
||||||
|
/// (typically via [`iroh::protocol::Router::builder`]). This enables
|
||||||
|
/// running wraith alongside iroh-blobs, iroh-gossip, iroh-docs, and
|
||||||
|
/// other protocol handlers on the same QUIC endpoint — one connection
|
||||||
|
/// per peer, multiplexed by ALPN.
|
||||||
|
pub fn from_endpoint(node_id: NodeId, endpoint: Endpoint) -> Self {
|
||||||
|
Self { node_id, endpoint, owned: false }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn endpoint_id(&self) -> String {
|
pub fn endpoint_id(&self) -> String {
|
||||||
@@ -50,6 +70,10 @@ impl IrohTransport {
|
|||||||
pub fn endpoint(&self) -> &Endpoint {
|
pub fn endpoint(&self) -> &Endpoint {
|
||||||
&self.endpoint
|
&self.endpoint
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn owned(&self) -> bool {
|
||||||
|
self.owned
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -73,11 +97,24 @@ impl Transport for IrohTransport {
|
|||||||
/// (ADR-010). Accepts incoming connections, accepts bidirectional QUIC streams,
|
/// (ADR-010). Accepts incoming connections, accepts bidirectional QUIC streams,
|
||||||
/// and joins the halves with `tokio::io::join(recv, send)`. Exposes
|
/// and joins the halves with `tokio::io::join(recv, send)`. Exposes
|
||||||
/// `endpoint_id()` for CLI display of the server's z-base-32 node ID.
|
/// `endpoint_id()` for CLI display of the server's z-base-32 node ID.
|
||||||
|
///
|
||||||
|
/// Use [`IrohAcceptor::bind`] to create a standalone endpoint, or
|
||||||
|
/// [`IrohAcceptor::from_endpoint`] to share an existing iroh `Endpoint`
|
||||||
|
/// with other protocol handlers (blobs, gossip, docs).
|
||||||
|
///
|
||||||
|
/// When using `from_endpoint`, the wraith-ssh ALPN must be registered
|
||||||
|
/// via an iroh `Router` that calls `Handler::accept()` on incoming
|
||||||
|
/// connections with the `wraith-ssh` ALPN, then passes the accepted
|
||||||
|
/// bidirectional stream to `russh::server::run_stream()`.
|
||||||
pub struct IrohAcceptor {
|
pub struct IrohAcceptor {
|
||||||
endpoint: Endpoint,
|
endpoint: Endpoint,
|
||||||
|
owned: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IrohAcceptor {
|
impl IrohAcceptor {
|
||||||
|
/// Bind a new iroh endpoint with a dedicated `wraith-ssh` ALPN.
|
||||||
|
///
|
||||||
|
/// Use this when wraith is the only iroh service on this node.
|
||||||
pub async fn bind(
|
pub async fn bind(
|
||||||
relay_url: Option<RelayUrl>,
|
relay_url: Option<RelayUrl>,
|
||||||
proxy_url: Option<url::Url>,
|
proxy_url: Option<url::Url>,
|
||||||
@@ -93,7 +130,23 @@ impl IrohAcceptor {
|
|||||||
builder = builder.proxy_url(proxy.clone());
|
builder = builder.proxy_url(proxy.clone());
|
||||||
}
|
}
|
||||||
let endpoint = builder.bind().await?;
|
let endpoint = builder.bind().await?;
|
||||||
Ok(Self { endpoint })
|
Ok(Self { endpoint, owned: true })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create an iroh acceptor using an existing shared endpoint.
|
||||||
|
///
|
||||||
|
/// The endpoint must already have the `wraith-ssh` ALPN registered
|
||||||
|
/// (typically via [`iroh::protocol::Router::builder`]). When using a
|
||||||
|
/// shared endpoint, incoming connections with the `wraith-ssh` ALPN
|
||||||
|
/// are routed by the Router to a `ProtocolHandler` that this acceptor
|
||||||
|
/// does not manage — the caller is responsible for bridging the
|
||||||
|
/// Router's `accept()` callback to this acceptor's stream handling.
|
||||||
|
///
|
||||||
|
/// For the standalone case where wraith owns the endpoint, use
|
||||||
|
/// [`IrohAcceptor::bind`] instead, which handles the accept loop
|
||||||
|
/// internally.
|
||||||
|
pub fn from_endpoint(endpoint: Endpoint) -> Self {
|
||||||
|
Self { endpoint, owned: false }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn endpoint_id(&self) -> String {
|
pub fn endpoint_id(&self) -> String {
|
||||||
@@ -103,6 +156,10 @@ impl IrohAcceptor {
|
|||||||
pub fn endpoint(&self) -> &Endpoint {
|
pub fn endpoint(&self) -> &Endpoint {
|
||||||
&self.endpoint
|
&self.endpoint
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn owned(&self) -> bool {
|
||||||
|
self.owned
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -140,6 +197,7 @@ mod tests {
|
|||||||
assert!(!endpoint_id.is_empty());
|
assert!(!endpoint_id.is_empty());
|
||||||
let parsed = NodeId::from_z32(&endpoint_id);
|
let parsed = NodeId::from_z32(&endpoint_id);
|
||||||
assert!(parsed.is_ok());
|
assert!(parsed.is_ok());
|
||||||
|
assert!(acceptor.owned());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@@ -147,6 +205,16 @@ mod tests {
|
|||||||
let relay: RelayUrl = "https://relay.iroh.network/".parse().unwrap();
|
let relay: RelayUrl = "https://relay.iroh.network/".parse().unwrap();
|
||||||
let acceptor = IrohAcceptor::bind(Some(relay), None).await.unwrap();
|
let acceptor = IrohAcceptor::bind(Some(relay), None).await.unwrap();
|
||||||
assert!(!acceptor.endpoint_id().is_empty());
|
assert!(!acceptor.endpoint_id().is_empty());
|
||||||
|
assert!(acceptor.owned());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn iroh_acceptor_from_endpoint() {
|
||||||
|
let acceptor = IrohAcceptor::bind(None, None).await.unwrap();
|
||||||
|
let endpoint = acceptor.endpoint.clone();
|
||||||
|
let shared = IrohAcceptor::from_endpoint(endpoint);
|
||||||
|
assert_eq!(shared.endpoint_id(), acceptor.endpoint_id());
|
||||||
|
assert!(!shared.owned());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@@ -166,6 +234,20 @@ mod tests {
|
|||||||
let transport = IrohTransport::new(node_id, None, None).await.unwrap();
|
let transport = IrohTransport::new(node_id, None, None).await.unwrap();
|
||||||
assert!(transport.describe().starts_with("iroh://"));
|
assert!(transport.describe().starts_with("iroh://"));
|
||||||
assert!(!transport.endpoint_id().is_empty());
|
assert!(!transport.endpoint_id().is_empty());
|
||||||
|
assert!(transport.owned());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn iroh_transport_from_endpoint() {
|
||||||
|
let node_id: NodeId = iroh::SecretKey::generate(rand_core::OsRng)
|
||||||
|
.public()
|
||||||
|
.into();
|
||||||
|
let acceptor = IrohAcceptor::bind(None, None).await.unwrap();
|
||||||
|
let endpoint = acceptor.endpoint.clone();
|
||||||
|
let transport = IrohTransport::from_endpoint(node_id, endpoint);
|
||||||
|
assert!(transport.describe().starts_with("iroh://"));
|
||||||
|
assert_eq!(transport.endpoint_id(), acceptor.endpoint_id());
|
||||||
|
assert!(!transport.owned());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@@ -202,4 +284,38 @@ mod tests {
|
|||||||
transport.connect().await.unwrap();
|
transport.connect().await.unwrap();
|
||||||
let _server_stream = accept_handle.await.unwrap();
|
let _server_stream = accept_handle.await.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[ignore]
|
||||||
|
async fn iroh_shared_endpoint_client_connects_to_server() {
|
||||||
|
let acceptor = IrohAcceptor::bind(None, None).await.unwrap();
|
||||||
|
let server_node_id = acceptor.endpoint().node_id();
|
||||||
|
let shared_endpoint = acceptor.endpoint().clone();
|
||||||
|
|
||||||
|
let transport = IrohTransport::from_endpoint(server_node_id, shared_endpoint);
|
||||||
|
|
||||||
|
let mut addrs_watcher = acceptor.endpoint().direct_addresses();
|
||||||
|
addrs_watcher.initialized().await.unwrap();
|
||||||
|
let addr_set = addrs_watcher.get().unwrap().unwrap_or_default();
|
||||||
|
for addr in addr_set {
|
||||||
|
transport
|
||||||
|
.endpoint
|
||||||
|
.add_node_addr(iroh::NodeAddr::from_parts(
|
||||||
|
server_node_id,
|
||||||
|
None,
|
||||||
|
vec![addr.addr],
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let accept_handle = tokio::spawn(async move {
|
||||||
|
let (stream, info) = acceptor.accept().await.unwrap();
|
||||||
|
assert!(matches!(info.transport_kind, TransportKind::Iroh { .. }));
|
||||||
|
stream
|
||||||
|
});
|
||||||
|
|
||||||
|
let _client_stream: io::Join<RecvStream, iroh::endpoint::SendStream> =
|
||||||
|
transport.connect().await.unwrap();
|
||||||
|
let _server_stream = accept_handle.await.unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -19,7 +19,7 @@ mod iroh_transport;
|
|||||||
|
|
||||||
pub use tcp::{TcpAcceptor, TcpTransport};
|
pub use tcp::{TcpAcceptor, TcpTransport};
|
||||||
#[cfg(feature = "iroh")]
|
#[cfg(feature = "iroh")]
|
||||||
pub use iroh_transport::{IrohAcceptor, IrohTransport};
|
pub use iroh_transport::{IrohAcceptor, IrohTransport, ALPN as IROH_ALPN};
|
||||||
|
|
||||||
#[cfg(feature = "tls")]
|
#[cfg(feature = "tls")]
|
||||||
mod tls;
|
mod tls;
|
||||||
|
|||||||
Reference in New Issue
Block a user