# async-nats: JetStream ## Overview JetStream is NATS' built-in persistence layer, providing stream-based messaging with at-least-once and exactly-once delivery semantics. The `async-nats` JetStream API is accessed through a `Context` object. ### Creating a Context ```rust // Default context (prefix: $JS.API) let jetstream = async_nats::jetstream::new(client); // With domain (prefix: $JS..API) let jetstream = async_nats::jetstream::with_domain(client, "hub"); // With custom prefix let jetstream = async_nats::jetstream::with_prefix(client, "JS.acc@hub.API"); // Builder with fine-grained control let context = ContextBuilder::new() .timeout(Duration::from_secs(5)) .api_prefix("MY.JS.API") .max_ack_inflight(1000) .backpressure_on_inflight(true) .ack_timeout(Duration::from_secs(30)) .build(client); ``` ## Context ```rust #[derive(Debug, Clone)] pub struct Context { pub(crate) client: Client, pub(crate) prefix: String, pub(crate) timeout: Duration, pub(crate) max_ack_semaphore: Arc, pub(crate) ack_sender: mpsc::Sender<(oneshot::Receiver, OwnedSemaphorePermit)>, pub(crate) backpressure_on_inflight: bool, pub(crate) semaphore_capacity: usize, } ``` ### Publish Backpressure The context uses a semaphore to limit the number of pending publish acknowledgments: - `max_ack_inflight(n)` — sets semaphore capacity (default 5000) - `backpressure_on_inflight(true)` — `publish()` waits for a permit when limit is reached - `backpressure_on_inflight(false)` — `publish()` returns `MaxAckPending` error immediately when limit is reached A background **acker task** monitors pending acks with a timeout (`ack_timeout`, default 30s), releasing permits when acks arrive or time out. ### JetStream API Request Pattern All JetStream API calls follow the same pattern: 1. Build a subject from the prefix: `format!("{}.STREAM.CREATE.", self.prefix)` 2. Serialize the request payload as JSON 3. Send a request via `client.send_request()` with the API subject 4. Deserialize the response as `Response` (which is `Ok(T)` or `Err(ErrorCode)`) ## Streams ### Stream Handle ```rust pub struct Stream { context: Context, info: I, name: String, } ``` `Stream` carries server-side info. `Stream<()>` is a lightweight handle that skips the INFO fetch. `Stream` (no generic) defaults to `Stream`. ### Stream Config ```rust pub struct Config { pub name: String, pub description: Option, pub subjects: Vec, pub retention: RetentionPolicy, pub max_consumers: i64, pub max_messages: i64, pub max_messages_per_subject: i64, pub max_bytes: i64, pub max_age: Duration, pub max_messages_per_stream: i64, pub max_msg_size: i32, pub discard: DiscardPolicy, pub discard_new_per_subject: bool, pub storage: StorageType, pub num_replicas: usize, pub no_ack: bool, pub duplicate_window: Duration, pub placement: Option, pub mirror: Option, pub sources: Option>, pub sealed: bool, pub allow_direct: bool, pub allow_rollup_hdrs: bool, // server_2_10 features: pub compression: Option, pub first_sequence: Option, pub subject_transform: Option, pub republish: Option, pub metadata: Option>, } ``` ### Stream Operations | Method | Description | |--------|-------------| | `create_stream(config)` | Create a new stream | | `get_stream(name)` | Get stream handle (with INFO) | | `get_stream_no_info(name)` | Get lightweight handle (no server round-trip) | | `get_or_create_stream(config)` | Get existing or create new | | `delete_stream(name)` | Delete a stream | | `update_stream(config)` | Update stream configuration | | `create_or_update_stream(config)` | Update or create if not found | | `stream_names()` | `Stream` of stream names (paginated) | | `streams()` | `Stream` of stream info (paginated) | | `stream_by_subject(subject)` | Find stream name containing subject | ### Stream Handle Methods ```rust let stream: Stream = jetstream.get_stream("events").await?; // Info let info: Info = stream.info().await?; // Fresh info from server let info: &Info = stream.cached_info(); // Cached info from last fetch // Message operations stream.get_raw_message(seq).await?; // Get raw message by sequence stream.get_last_raw_message_by_subject(subj).await?; // Get last message for subject stream.direct_get(seq).await?; // Direct get (if allow_direct) stream.direct_get_last_for_subject(subj).await?; // Direct last by subject stream.delete_message(seq).await?; // Delete a specific message stream.purge().await?; // Purge all messages stream.purge().filter(subj).await?; // Purge messages for subject // Consumers stream.create_consumer(config).await?; // Create consumer bound to stream stream.get_consumer(name).await?; // Get existing consumer stream.delete_consumer(name).await?; // Delete consumer ``` ## Consumers ### Consumer Types Two consumer types, each with distinct delivery models: 1. **Pull Consumer** (`pull::Config` / `PullConsumer`) — Client explicitly requests batches of messages 2. **Push Consumer** (`push::Config` / `PushConsumer`) — Server pushes messages to a deliver subject ### Pull Consumer ```rust let consumer: PullConsumer = stream .get_or_create_consumer("my-consumer", pull::Config { durable_name: Some("my-consumer".to_string()), ..Default::default() }) .await?; ``` **Key methods**: - `consumer.batch(n).await?` — Fetch up to `n` messages (one-shot batch) - `consumer.messages().await?` — Continuous `Stream` of messages - `consumer.sequence(n).await?` — Continuous `Stream` of batches of `n` messages - `consumer.fetch().max(n).expires(dur).await?` — Configurable fetch Each message from a pull consumer is a `jetstream::Message` which has `ack()` methods. ### Push Consumer Two push consumer variants: 1. **Standard** (`push::Config`) — messages delivered to a specific subject 2. **Ordered** (`push::OrderedConfig`) — auto-recreated on failure, with flow control ```rust // Standard push let consumer = stream.create_consumer(push::Config { deliver_subject: "deliver.subject".to_string(), durable_name: Some("push-consumer".to_string()), ..Default::default() }).await?; // Ordered push (no durable name, auto-recreates on failure) let consumer = stream.create_consumer(push::OrderedConfig { deliver_subject: client.new_inbox(), filter_subject: "events.>".to_string(), ..Default::default() }).await?; ``` ### Consumer Config (Shared Fields) ```rust pub struct Config { // Pull fields pub durable_name: Option, pub name: Option, // Push fields pub deliver_subject: Option, pub deliver_group: Option, pub deliver_policy: DeliverPolicy, pub opt_start_time: Option, pub opt_start_sequence: Option, pub ack_policy: AckPolicy, pub ack_wait: Duration, pub max_deliver: i64, pub backoff: Vec, pub filter_subject: String, pub filter_subjects: Vec, // server_2_10+ pub replay_policy: ReplayPolicy, pub rate_limit_bps: Option, pub max_waiting: i64, // pull: max outstanding pull requests pub max_ack_pending: i64, pub flow_control: bool, pub idle_heartbeat: Duration, pub headers_only: bool, pub num_replicas: usize, pub mem_storage: bool, pub description: Option, pub metadata: Option>, pub inactive_threshold: Option, // for ephemeral consumers } ``` ### Deliver Policy ```rust pub enum DeliverPolicy { All, // Deliver all messages Last, // Deliver last message only New, // Deliver only new messages ByStartSequence { start_sequence: u64 }, ByStartTime { start_time: DateTime }, LastPerSubject, // Deliver last message per subject } ``` ### Ack Policy ```rust pub enum AckPolicy { None, // No acknowledgment needed All, // Ack all messages up to this one Explicit, // Ack each message individually } ``` ## JetStream Messages ### `jetstream::Message` Wraps a core `Message` with JetStream-specific metadata: ```rust pub struct Message { pub message: crate::Message, // The underlying NATS message pub ack_subject: Subject, // Subject for sending acks pub stream: String, // Stream name pub consumer: String, // Consumer name pub stream_sequence: u64, // Sequence in stream pub consumer_sequence: u64, // Sequence for this consumer pub delivered: u64, // Delivery count pub pending: u64, // Pending message count pub published: DateTime, // Original publish time } ``` ### Ack Methods ```rust // In-memory ack (non-persistent, fast) message.ack().await?; // Ack with specific type message.ack_with(AckKind::Nak).await?; message.ack_with(AckKind::Progress).await?; message.ack_with(AckKind::Term).await?; message.ack_with(AckKind::NakWithDelay(duration)).await?; message.ack_with(AckKind::TermWithReason("reason")).await?; ``` ### `AckKind` ```rust pub enum AckKind { Ack, // +ACK — message processed Nak, // -NAK — re-deliver Progress, // PRI — still working Term, // +TERM — don't redeliver NakWithDelay(Duration), // -NAK with re-delivery delay TermWithReason(String), // +TERM with reason } ``` ## JetStream Publish ### `Context::publish()` JetStream publish returns a `PublishAckFuture` — a future that resolves to a `PublishAck`: ```rust let ack_future = jetstream.publish("events", "data".into()).await?; let ack: PublishAck = ack_future.await?; // Wait for server acknowledgment ``` ### `PublishAck` ```rust pub struct PublishAck { pub stream: String, pub sequence: u64, pub domain: String, pub duplicate: bool, } ``` ### `PublishMessage` Builder ```rust let ack = jetstream.send_publish( "events", PublishMessage::build() .payload("data".into()) .message_id("uuid-123") // Deduplication ID .expected_stream("events") // Fail if wrong stream .expected_last_msg_id("prev-id") .expected_last_sequence(42) .headers(headers), ).await?; ``` ## Pagination Stream and consumer listing uses pagination internally: ```rust pub struct StreamNames { context: Context, offset: usize, page_request: Option, streams: Vec, subject: Option, done: bool, } ``` Implements `futures_util::Stream>`, lazily fetching pages as needed. ## Error Handling JetStream errors follow the `Response` pattern: ```rust pub enum Response { Ok(T), Err { error: ErrorCode }, } ``` `ErrorCode` carries the server's error code and description. Most JetStream-specific errors map to typed error enums (e.g., `CreateStreamError`, `ConsumerError`, etc.).