Files
alknet/docs/research/references/nats.rs/nats-server/04-connection-handler-and-data-flow.md

338 lines
11 KiB
Markdown

# Connection Handler and Data Flow
This document covers the internal `ConnectionHandler` that drives all protocol I/O, and the data flow through the system.
## ConnectionHandler
**Location**: `lib.rs`
The `ConnectionHandler` is the heart of the client. It runs as a single Tokio task and manages all communication with the NATS server.
```rust
pub(crate) struct ConnectionHandler {
connection: Connection, // Low-level I/O
connector: Connector, // Server pool, reconnection
subscriptions: HashMap<u64, Subscription>, // Active subscriptions
multiplexer: Option<Multiplexer>, // Request-reply multiplexer
pending_pings: usize, // Unanswered PINGs
info_sender: tokio::sync::watch::Sender<Option<ServerInfo>>,
ping_interval: Interval, // Periodic PING timer
should_reconnect: bool, // Flag for forced reconnect
flush_observers: Vec<oneshot::Sender<()>>, // Pending flush callbacks
is_draining: bool, // Connection is draining
drain_pings: VecDeque<u64>, // SIDs being drained
}
```
## Data Flow: Publish
```
Application
│ client.publish("events.data", payload)
Client
│ validates subject & payload size
│ sends Command::Publish(OutboundMessage) via mpsc channel
ConnectionHandler::handle_command(Command::Publish)
│ increments out_messages, out_bytes statistics
│ calls connection.enqueue_write_op(&ClientOp::Publish { ... })
Connection::enqueue_write_op
│ serializes to wire format:
│ "PUB events.data 11\r\n" or "HPUB events.data 23 34\r\n"
│ appends to flattened_writes or write_buf
Connection::poll_write
│ uses vectored writes (64 chunks) if supported
│ or sequential writes otherwise
Connection::poll_flush
│ flushes the TCP/TLS/WS stream
│ notifies flush_observers
NATS Server (TCP/TLS/WebSocket)
```
## Data Flow: Subscribe
```
Application
│ client.subscribe("events.>")
Client::subscribe
│ validates subject (always, regardless of skip_subject_validation)
│ allocates next sid via AtomicU64
│ creates mpsc channel for messages
│ sends Command::Subscribe { sid, subject, sender }
│ returns Subscriber { sid, receiver }
ConnectionHandler::handle_command(Command::Subscribe)
│ creates Subscription { subject, sender, delivered: 0, max: None }
│ inserts into subscriptions HashMap
│ calls connection.enqueue_write_op(&ClientOp::Subscribe { sid, subject, queue_group })
Connection::enqueue_write_op
│ serializes: "SUB events.> 42\r\n"
Server sends MSG for matching subjects:
ConnectionHandler::handle_server_op(ServerOp::Message { sid, subject, ... })
│ looks up sid in subscriptions HashMap
│ constructs Message { subject, reply, payload, headers, status, description }
│ tries subscription.sender.try_send(message)
├── Ok → increments subscription.delivered, checks max
├── Full → emits Event::SlowConsumer(sid)
└── Closed → removes subscription, sends ClientOp::Unsubscribe
Subscriber::poll_next (Stream impl)
│ receives from mpsc::Receiver
Application processes Message
```
## Data Flow: Request-Response
The request-response pattern uses the **multiplexer** — a single wildcard subscription that routes responses to their waiting requesters.
```
Application
│ client.request("service", payload)
Client::send_request
│ validates subject & payload size
│ creates oneshot channel for response
│ generates unique inbox: "_INBOX.<nuid>.<token>"
│ sends Command::Request { subject, payload, respond, sender }
ConnectionHandler::handle_command(Command::Request)
│ extracts token from respond subject (after last '.')
│ if no multiplexer exists:
│ creates Multiplexer with wildcard sub "_INBOX.<id>.*" (SID 0)
│ sends ClientOp::Subscribe { sid: 0, subject: "_INBOX.<id>.*" }
│ inserts token → oneshot::Sender in multiplexer.senders
│ sends ClientOp::Publish { subject, payload, respond: "<prefix><token>" }
Server routes request to service:
Service responds by publishing to the reply subject:
ConnectionHandler::handle_server_op(ServerOp::Message { sid: 0, ... })
│ sid == MULTIPLEXER_SID (0), so enters multiplexer path
│ extracts token by stripping prefix from subject
│ looks up token in multiplexer.senders
│ sends Message via oneshot::Sender
Client::send_request receives via oneshot::Receiver
│ applies timeout (default 10s)
│ checks for NO_RESPONDERS status (503)
Application receives Message
```
### Custom Inbox Request
If the `Request` builder specifies a custom `inbox`, the flow is different:
- The client subscribes to the inbox directly (not via multiplexer)
- Publishes with the inbox as the reply subject
- Waits for the message on that subscription
- No multiplexer involvement
## Data Flow: Flush
```
Application
│ client.flush()
Client::flush
│ creates oneshot channel
│ sends Command::Flush { observer }
ConnectionHandler::handle_command(Command::Flush)
│ pushes observer into flush_observers Vec
ProcessFut::poll (main loop)
│ after writing all pending data...
│ checks should_flush():
│ Yes (write buffers empty, not yet flushed) → poll_flush
│ May (write buffers not empty) → poll_flush
│ No (already flushed) → skip
│ on successful flush:
│ drains flush_observers, sending () to each
Client::flush receives via oneshot::Receiver
```
## Data Flow: Drain
```
Application
│ client.drain() or subscriber.drain()
Client::drain / Subscriber::drain
│ sends Command::Drain { sid: None } (whole client)
│ or Command::Drain { sid: Some(n) } (single subscription)
ConnectionHandler::handle_command(Command::Drain)
│ if sid is Some:
│ pushes sid to drain_pings
│ sends ClientOp::Unsubscribe { sid, max: None }
│ if sid is None (whole client):
│ sets is_draining = true
│ emits Event::Draining
│ for each subscription: drain_pings.push(sid), Unsubscribe
│ sends ClientOp::Ping (to flush the UNSUB messages)
ProcessFut::poll (main loop)
│ processes any remaining server messages
│ removes drained subscriptions from HashMap
│ if is_draining: returns ExitReason::Closed
ConnectionHandler exits, emits Event::Closed
```
## Main Processing Loop
The `ConnectionHandler::process` method implements the core event loop via a custom `Future` (`ProcessFut`):
```rust
impl Future for ProcessFut<'_> {
type Output = ExitReason;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// 1. Check ping interval — send PING if due, disconnect if too many pending
while self.handler.ping_interval.poll_tick(cx).is_ready() {
if let Poll::Ready(exit) = self.ping() { return Poll::Ready(exit); }
}
// 2. Read all available server operations
loop {
match self.handler.connection.poll_read_op(cx) {
Poll::Pending => break,
Poll::Ready(Ok(Some(server_op))) => self.handler.handle_server_op(server_op),
Poll::Ready(Ok(None)) => return Poll::Ready(ExitReason::Disconnected(None)),
Poll::Ready(Err(err)) => return Poll::Ready(ExitReason::Disconnected(Some(err))),
}
}
// 3. Clean up drained subscriptions
while let Some(sid) = self.handler.drain_pings.pop_front() {
self.handler.subscriptions.remove(&sid);
}
// 4. If draining, exit
if self.handler.is_draining { return Poll::Ready(ExitReason::Closed); }
// 5. Process client commands (batch of up to 16)
// while write buffer not full
loop {
while !self.handler.connection.is_write_buf_full() {
match receiver.poll_recv_many(cx, recv_buf, 16) {
Poll::Pending => break,
Poll::Ready(1..) => { for cmd in recv_buf.drain(..) { handler.handle_command(cmd); } }
Poll::Ready(0) => return Poll::Ready(ExitReason::Closed),
}
}
// 6. Write pending data to stream
match self.handler.connection.poll_write(cx) {
Poll::Pending => break,
Poll::Ready(Ok(())) => continue, // write buffer empty, try more commands
Poll::Ready(Err(err)) => return Poll::Ready(ExitReason::Disconnected(Some(err))),
}
}
// 7. Flush stream and notify observers
match self.handler.connection.poll_flush(cx) { ... }
// 8. Check for forced reconnect
if mem::take(&mut self.handler.should_reconnect) {
return Poll::Ready(ExitReason::ReconnectRequested);
}
Poll::Pending
}
}
```
### Exit Reasons
The main loop exits for three reasons:
| Reason | Action |
|--------|--------|
| `Disconnected(Option<io::Error>)` | Attempt reconnection via `handle_disconnect()` |
| `ReconnectRequested` | Force reconnect (user-triggered) |
| `Closed` | Connection handler terminates, emit `Event::Closed` |
On disconnection, `handle_disconnect()` is called which:
1. Resets `pending_pings` to 0
2. Emits `Event::Disconnected`
3. Updates connection state to `Disconnected`
4. Calls `handle_reconnect()` which uses `Connector::connect()`
5. On successful reconnect, re-subscribes all active subscriptions
6. Re-subscribes the multiplexer wildcard if present
## Slow Consumer Handling
When a subscription's `mpsc::Sender` channel is full (the application isn't consuming messages fast enough):
1. `try_send` returns `TrySendError::Full`
2. The `ConnectionHandler` emits `Event::SlowConsumer(sid)`
3. The message is **dropped** (not queued)
4. The subscription remains active
When a subscription's receiver is dropped (application closed the stream):
1. `try_send` returns `TrySendError::Closed`
2. The subscription is removed from the HashMap
3. An `UNSUB` command is sent to the server
## Ping/Pong Health Check
The `ConnectionHandler` maintains a periodic PING interval (default 60 seconds):
1. `ping_interval` fires every N seconds
2. A `ClientOp::Ping` is enqueued
3. `pending_pings` counter increments
4. If `pending_pings > MAX_PENDING_PINGS (2)`, the connection is considered dead
5. When `ServerOp::Pong` is received, `pending_pings` decrements
6. Any server operation resets the ping interval timer
## Batched Command Processing
Commands from the `Client` are received in batches of up to 16 (`RECV_CHUNK_SIZE`) using `poll_recv_many`. This amortizes the cost of waking the task and enables pipelining multiple operations (e.g., publishing many messages) in a single poll cycle.