diff --git a/docs/research/references/honker/honker-reference.md b/docs/research/references/honker/honker-reference.md new file mode 100644 index 0000000..5537cfe --- /dev/null +++ b/docs/research/references/honker/honker-reference.md @@ -0,0 +1,857 @@ +# Research: Honker — SQLite Pub/Sub, Queue, and Notification Extension + +## Key Findings + +- **Honker is a Rust-based SQLite extension** that adds Postgres-style NOTIFY/LISTEN semantics plus durable pub/sub, task queues, and event streams entirely within SQLite. It eliminates the need for a separate message broker (Redis, Kafka) when SQLite is the primary datastore. +- **Three core primitives**: `notify/listen` (ephemeral pub/sub), `stream` (durable pub/sub with per-consumer offsets), and `queue` (at-least-once work queue with retries, priority, delayed jobs, and dead-letter handling). All three are SQL INSERTs inside your transaction — business write and side-effect commit or roll back together. +- **Wake mechanism**: Uses `PRAGMA data_version` polling at 1ms granularity to detect commits, achieving ~1-2ms median cross-process wake latency without requiring a daemon or broker. A single thread per database fans out to N subscribers via bounded channels. +- **Single-machine, single-writer model**: Designed for self-hosted deployments. Not distributed — no multi-node replication. This maps perfectly to alknet's per-node architecture where domain events are internal to a service boundary (ADR-032). +- **Comprehensive SQL API**: 30+ SQL scalar functions (`honker_enqueue`, `honker_claim_batch`, `honker_ack_batch`, `honker_stream_publish`, `honker_stream_read_since`, `honker_stream_save_offset`, `notify`, `honker_lock_acquire`, `honker_rate_limit_try`, `honker_scheduler_register`, etc.) registered as a loadable SQLite extension. Any language that can `SELECT load_extension('honker')` gets the same features. +- **Rust core (`honker-core`)**: All SQL implementations live in a shared Rust crate consumed by the loadable extension, PyO3 Python binding, napi-rs Node binding, and other language wrappers. One source of truth for the SQL — no behavioral drift across bindings. +- **License**: Apache 2.0 / MIT dual-license. Fully permissive for integration. + +**Recommendation**: Adopt honker's patterns directly in `alknet-storage`. The `honker` crate (or `honker-core` for a Rust-native integration) should be a dependency of `alknet-storage`. Honker's single-node model aligns with alknet's event boundary discipline — domain events stay within the service boundary, and cross-node events go through the call protocol. For production deployments that use Postgres instead of SQLite, the same patterns (queue/claim, stream/subscribe, notify/listen) can be replicated using Postgres features, but honker's built-in retry, visibility timeout, and scheduling would need to be reimplemented. + +--- + +## 1. Architecture + +### What Is Honker? + +Honker is a **SQLite extension + language bindings** that adds Postgres-style `NOTIFY`/`LISTEN` semantics to SQLite, with built-in durable pub/sub, task queues, and event streams — without requiring a client-polling loop, a daemon, or a separate broker. + +**Core idea**: If SQLite is your primary datastore, your queue should live in the same file. `INSERT INTO orders` and `queue.enqueue(...)` commit in the same transaction. Rollback drops both. + +**Implementation language**: Rust. The shared engine is `honker-core`, a plain Rust `rlib` crate. Language bindings (Python via PyO3, Node via napi-rs, Go via CGo, Ruby via C extension, .NET via P/Invoke, JVM via JNI, Kotlin wrapper, Elixir via NIF, C++ via header-only wrapper) are thin wrappers around the loadable extension's SQL functions. + +**How it works as a SQLite extension**: The `honker-extension` crate compiles to `libhonker_ext.{so,dylib,dll}`. Any SQLite 3.9+ client loads it: + +```sql +.load ./libhonker_ext +SELECT honker_bootstrap(); +``` + +This creates the schema tables (`_honker_live`, `_honker_dead`, `_honker_notifications`, `_honker_stream`, `_honker_stream_consumers`, `_honker_locks`, `_honker_rate_limits`, `_honker_scheduler_tasks`, `_honker_results`) and registers all SQL scalar functions. The extension and Python/binding tables are shared, so a Python worker can claim jobs any other language pushed via the extension. + +### Crate Structure + +``` +honker-core/ # Rust rlib shared across all bindings (published on crates.io) +honker-extension/ # SQLite loadable extension (cdylib, published on crates.io) +packages/ + honker/ # Python package (PyO3 cdylib + Queue/Stream/Outbox/Scheduler) + honker-node/ # napi-rs Node.js binding + honker-rs/ # Ergonomic Rust wrapper + honker-go/ # Go binding + honker-ruby/ # Ruby binding + honker-bun/ # Bun binding + honker-ex/ # Elixir binding + honker-cpp/ # C++ binding + honker-dotnet/ # .NET / C# binding + honker-jvm/ # JVM / Java-compatible binding + honker-kotlin/ # Kotlin convenience wrapper +``` + +### Wake Path Architecture + +The fundamental challenge for any SQLite-based pub/sub system: SQLite has no wire protocol or server-push. Consumers must initiate reads. Honker solves this with a **single-digit-microsecond `PRAGMA data_version` read**: + +1. **One PRAGMA-poll thread per `Database`** queries `data_version` every 1ms +2. Counter change → fan out a tick to each subscriber's bounded channel (capacity 1 — coalesces redundant wakes) +3. Each subscriber runs `SELECT … WHERE id > last_seen` against a partial index, yields rows, returns to wait +4. 100 subscribers = 1 poll thread. Idle listeners run zero SQL queries. + +Idle cost: ~3.5µs per `PRAGMA data_version` query, ~3.5ms/sec total at 1kHz. A 5-second paranoia poll exists as a fallback only if the update watcher cannot fire. + +**Three backend options** (controlled by `WatcherBackend` enum): +- **Polling** (default, stable): `PRAGMA data_version` every 1ms. Correct on all platforms. +- **Kernel** (experimental, `kernel-watcher` Cargo feature): Uses `notify-rs` filesystem events. Fires on every filesystem write. May produce spurious/missed wakes. Dead-man's switch for file replacement. +- **SHM fast path** (experimental, `shm-fast-path` Cargo feature): Memory-maps the `-shm` WAL index file and reads `iChange` at offset 8 at ~100µs cadence. WAL-mode only. Dead-man's switch for file replacement. + +**Dead-man's switch**: All backends check file identity `(dev, ino)` / `(volume_serial, file_index)` every ~100ms. If the database file is replaced (atomic rename, litestream restore, volume remount), the watcher panics with a clear error message. Subscribers see an error from `update_events()` instead of hanging silently. + +### SharedUpdateWatcher + +```rust +pub struct SharedUpdateWatcher { + watcher: Mutex>, // background poll thread + senders: Arc>>>, // fan-out channels + next_id: AtomicU64, +} +``` + +- `subscribe()` → `(u64, Receiver<()>)` — register a channel; capacity 1 +- `unsubscribe(id)` — remove channel; receiver sees `Err(RecvError)` +- `close()` — join the poll thread, clear all subscribers +- Wakes are idempotent "go re-read state" signals. Dropped redundant wakes never lose data. + +--- + +## 2. Core Capabilities + +### 2.1 Notify/Listen — Ephemeral Pub/Sub + +**What it is**: Fire-and-forget notifications to channel subscribers. Like `pg_notify` but with table-backed persistence until explicitly pruned. + +**How it works**: +- `notify(channel, payload)` is a SQL scalar function that INSERTs into `_honker_notifications` and returns the row id. Runs inside the caller's open transaction — rollbacks drop the notification. +- `db.listen(channel)` or `db.updateEvents()` in Node — registers a subscriber that wakes on any database commit, then filters by channel in the `SELECT` path. +- Listeners attach at current `MAX(id)`; **history is not replayed**. This is the key distinction from streams. + +**Schema**: +```sql +CREATE TABLE _honker_notifications ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + channel TEXT NOT NULL, + payload TEXT NOT NULL, + created_at INTEGER NOT NULL DEFAULT (unixepoch()) +); +CREATE INDEX _honker_notifications_recent ON _honker_notifications(channel, id); +``` + +**Key characteristics**: +- Not auto-pruned. Call `db.prune_notifications(older_than_s=…, max_keep=…)` from a scheduled task. +- Over-triggering is by design: a `data_version` change wakes every subscriber on that database, not just the matching channel. Each wasted wake = one indexed SELECT (microseconds). A missed wake = a silent correctness bug. +- Payload must be valid JSON for cross-language compatibility. + +### 2.2 Queue — At-Least-Once Work Queue + +**What it is**: Durable, at-least-once delivery work queue with retries, priority, delayed jobs, task expiration, dead-letter handling, named locks, and rate-limiting. + +**Schema (single-table hybrid)**: + +```sql +CREATE TABLE _honker_live ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + queue TEXT NOT NULL, + payload TEXT NOT NULL, + state TEXT NOT NULL DEFAULT 'pending', -- 'pending' | 'processing' + priority INTEGER NOT NULL DEFAULT 0, + run_at INTEGER NOT NULL DEFAULT (unixepoch()), -- for delayed jobs + worker_id TEXT, + claim_expires_at INTEGER, -- visibility timeout + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 3, + created_at INTEGER NOT NULL DEFAULT (unixepoch()), + expires_at INTEGER -- job expiration +); + +CREATE INDEX _honker_live_claim + ON _honker_live(queue, priority DESC, run_at, id) + WHERE state IN ('pending', 'processing'); + +CREATE TABLE _honker_dead ( + id INTEGER PRIMARY KEY, + queue TEXT NOT NULL, + payload TEXT NOT NULL, + priority INTEGER NOT NULL DEFAULT 0, + run_at INTEGER NOT NULL DEFAULT 0, + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 0, + last_error TEXT, + created_at INTEGER NOT NULL DEFAULT (unixepoch()), + died_at INTEGER NOT NULL DEFAULT (unixepoch()) +); +``` + +**Claim/ack/nack model**: + +| Operation | SQL | Notes | +|-----------|-----|-------| +| Enqueue | `INSERT INTO _honker_live (queue, payload, run_at, priority, max_attempts, expires_at) VALUES (…)` | Returns auto-increment id | +| Claim | `UPDATE _honker_live SET state='processing', worker_id=?, claim_expires_at=unixepoch()+?, attempts=attempts+1 WHERE id IN (SELECT id FROM _honker_live WHERE queue=? AND state IN ('pending','processing') AND (expires_at IS NULL OR expires_at > unixepoch()) AND ((state='pending' AND run_at <= unixepoch()) OR (state='processing' AND claim_expires_at < unixepoch())) ORDER BY priority DESC, run_at ASC, id ASC LIMIT ?) RETURNING …` | One `UPDATE … RETURNING` via partial index | +| Ack | `DELETE FROM _honker_live WHERE id=? AND worker_id=? AND claim_expires_at >= unixepoch() RETURNING id` | Returns 1 if claim still valid, 0 if expired | +| Retry | `UPDATE _honker_live SET state='pending', run_at=unixepoch()+?, worker_id=NULL, claim_expires_at=NULL WHERE id=?` + notify on queue channel | If `attempts >= max_attempts`, DELETE from `_honker_live` and INSERT into `_honker_dead` | +| Fail | `DELETE FROM _honker_live WHERE id=? AND worker_id=? AND claim_expires_at >= unixepoch() RETURNING …` + `INSERT INTO _honker_dead` | Unconditionally move to dead letter | +| Heartbeat | `UPDATE _honker_live SET claim_expires_at=unixepoch()+? WHERE id=? AND worker_id=? AND state='processing'` | Extend claim for long-running handlers | +| Cancel | `DELETE FROM _honker_live WHERE id=? AND state IN ('pending', 'processing')` | Idempotent | + +**Visibility timeout**: Default 300 seconds (`claim_expires_at = unixepoch() + 300`). If a worker crashes mid-job, the claim expires and another worker reclaims. `attempts` increments. After `max_attempts` (default 3), the row moves to `_honker_dead`. + +**Priority**: Higher `priority` value = claimed first. The partial index on `(queue, priority DESC, run_at, id)` ensures claim path is bounded by working-set size, not history size. + +**Delayed jobs**: Set `run_at` to a future timestamp. Workers only claim rows where `run_at <= unixepoch()`. The `run_at` deadline also wakes sleeping workers through `honker_queue_next_claim_at()`. + +**Task expiration**: Set `expires_at` on enqueue. Expired jobs are filtered from the claim path. Call `queue.sweep_expired()` to move them to `_honker_dead` with `last_error='expired'`. + +**Named locks**: `honker_lock_acquire(name, owner, ttl_s)` → 1 (got it) or 0 (held). `honker_lock_release(name, owner)` → 1 (released) or 0 (not yours). Uses `_honker_locks` table with TTL-based expiration. Primary use case: cron tasks that shouldn't overlap (leader election). + +**Rate limiting**: `honker_rate_limit_try(name, limit, per)` → 1 (under limit) or 0 (at limit). Fixed-window counter. Rejected calls don't inflate the count. + +**Batch operations**: `honker_claim_batch(queue, worker_id, n, timeout_s)` returns a JSON array of claimed jobs. `honker_ack_batch('[1,2,3]', worker_id)` acks multiple jobs. Ack is per-transaction for batch — honest bool return. + +**Task result storage**: `honker_enqueue()` returns the job id. Workers can persist return values via `honker_result_save(id, value, ttl_s)`. Callers await results with `queue.wait_result(id, timeout)`. Opt-in (default `save_result=False`). + +**Claim iterator pattern**: +```python +async for job in q.claim("worker-1"): + try: + send(job.payload) + job.ack() + except Exception as e: + job.retry(delay_s=60, error=str(e)) +``` + +Each iteration is `claim_batch(worker_id, 1)`. Wakes on database update from any process, or when the next `run_at` / reclaim deadline arrives. 5-second paranoia poll is the only fallback. + +**Queue notifications**: Each enqueue also fires a notification on `honker:` channel so workers wake immediately without waiting for the next poll cycle. + +### 2.3 Stream — Durable Pub/Sub with Per-Consumer Offsets + +**What it is**: Durable event stream where each named consumer tracks its own offset. Events persist until explicitly pruned. At-least-once delivery with configurable offset flush cadence. + +**Schema**: +```sql +CREATE TABLE _honker_stream ( + offset INTEGER PRIMARY KEY AUTOINCREMENT, + topic TEXT NOT NULL, + key TEXT, + payload TEXT NOT NULL, + created_at INTEGER NOT NULL DEFAULT (unixepoch()) +); + +CREATE INDEX _honker_stream_topic + ON _honker_stream(topic, offset); + +CREATE TABLE _honker_stream_consumers ( + name TEXT NOT NULL, + topic TEXT NOT NULL, + offset INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (name, topic) +); +``` + +**API**: + +| Function | Returns | Notes | +|----------|---------|-------| +| `honker_stream_publish(topic, key_or_null, payload_json)` | `offset` | INSERTs into `_honker_stream` + fires notification on `honker:stream:` | +| `honker_stream_read_since(topic, offset, limit)` | JSON array | Reads rows where `offset > ?` ordered by offset | +| `honker_stream_save_offset(consumer, topic, offset)` | 1 or 0 | Monotonic upsert — never rewinds. 1 = advanced, 0 = existing offset ≥ new | +| `honker_stream_get_offset(consumer, topic)` | offset or 0 | Returns saved offset for consumer/topic pair | + +**Python binding**: +```python +stream = db.stream("user-events") +stream.publish({"user_id": uid, "change": "name"}, tx=tx) +async for event in stream.subscribe(consumer="dashboard"): + await push_to_browser(event) +``` + +**Subscribe behavior**: +1. Replay rows past `offset > saved_offset` in batches (default 1000 rows) +2. Transition to live delivery on commit wake +3. Auto-save offset at most every 1000 events or every 1 second (whichever first) +4. At-least-once: a crash re-delivers in-flight events up to the last flushed offset +5. Override auto-save with `save_every_n=` / `save_every_s=`; set both to 0 for manual control + +**Transaction coupling**: `stream.publish(payload, tx=tx)` inserts into `_honker_stream` inside the caller's transaction. Rollback drops the event. This is the transactional outbox pattern without a separate dispatch table. + +### 2.4 Scheduler — Time-Triggered Cron Tasks + +**Schema**: +```sql +CREATE TABLE _honker_scheduler_tasks ( + name TEXT PRIMARY KEY, + queue TEXT NOT NULL, + cron_expr TEXT NOT NULL, + payload TEXT NOT NULL, + priority INTEGER NOT NULL DEFAULT 0, + expires_s INTEGER, + next_fire_at INTEGER NOT NULL, + enabled INTEGER NOT NULL DEFAULT 1 +); +``` + +**API**: +```sql +SELECT honker_scheduler_register('nightly', 'backups', '0 3 * * *', '"go"', 0, NULL); +SELECT honker_scheduler_tick(unixepoch()); -- JSON: fires due +SELECT honker_scheduler_soonest(); -- min next_fire_at +SELECT honker_scheduler_unregister('nightly'); -- 1 = deleted +SELECT honker_scheduler_pause('nightly'); -- 1 = paused +SELECT honker_scheduler_resume('nightly'); -- 1 = resumed +SELECT honker_scheduler_list(); -- JSON array of all schedules +SELECT honker_scheduler_update('nightly', '0 4 * * *', NULL, NULL, NULL, 0); +``` + +Supports: 5-field cron, 6-field cron (with seconds), `@every ` interval expressions. + +**Leader election via named lock**: `db.lock('honker-scheduler', ttl=60)`. Two scheduler processes can't both fire. The lock is heartbeat-refreshed every 30s. + +**Missed-fire catch-up**: If the scheduler was down for 4 hours with an hourly schedule, the first iteration fires all 4 missed boundaries (with `expires=` to drop stale ones). + +**Fires = enqueue**: The scheduler never runs handlers. It enqueues into the task queue. Regular workers consume. + +### 2.5 Outbox Pattern + +The `outbox` is a convenience wrapper around the `Queue` primitive: + +```python +db.outbox("emails", delivery=send_email) +db.outbox("emails").enqueue({"to": "alice@example.com"}, tx=tx) +db.outbox("emails").run_worker("worker-1") +``` + +Failures retry with exponential backoff (`base_backoff_s * 2^(attempts-1)`) up to `max_attempts`, then land in `_honker_dead`. + +--- + +## 3. Persistence and Reliability + +### Durability Guarantees + +- **Atomic commit**: Business write + side-effect enqueue/event/notify commit together or roll back together. This is SQLite ACID — the transactional outbox pattern is built into the primitives, not bolted on. +- **SIGKILL safety**: Verified in `tests/test_crash_recovery.py`. Subprocess killed pre-COMMIT → `PRAGMA integrity_check == 'ok'`, zero in-flight rows, no stale write lock, queue round-trip works post-crash. +- **Worker crash recovery**: If a worker crashes mid-job, the claim expires after `visibility_timeout_s` (default 300s) and another worker reclaims. `attempts` increments on each claim. After `max_attempts` (default 3), the row moves to `_honker_dead`. +- **Stream at-least-once**: Offsets auto-flush every 1000 events or 1 second. A crash re-delivers in-flight events up to the last flushed offset. The crash window is bounded by the flush thresholds. +- **Notify has no replay**: Listeners attach at `MAX(id)`. Pruned events are gone. For durable replay, use streams. + +### WAL Mode + +Recommended default (`journal_mode = WAL`). Gives concurrent readers with one writer and efficient fsync batching (`wal_autocheckpoint = 10000`). Other journal modes work but lose WAL's concurrent-read-while-writing property. Wake detection (`PRAGMA data_version`) works in all journal modes. + +### What Happens on Crash + +| Scenario | Result | +|----------|--------| +| Process SIGKILL mid-TRANSACTION | SQLite atomic-commit rollback. In-flight write did not land. Fresh process can acquire write lock immediately. | +| Worker process crash mid-job | Claim expires after visibility_timeout. Another worker reclaims. `attempts` increments. | +| Stream consumer crash | Resumes from last auto-saved offset (at-least-once). Pending offset is lost. | +| Database file replaced (litestream restore) | Watcher panics with clear error message. All subscribers see error from update_events(). Must reopen database. | + +### What Honker Does NOT Provide + +- **Multi-writer replication**: SQLite's locking is for single-host. Two servers writing one `.db` over NFS will corrupt it. Shard by file or switch to Postgres. +- **In-memory database support**: `:memory:` creates a separate database per connection, splitting writer/readers/watchers. Use temp file-backed `.db` for tests. +- **Cross-node distribution**: Honker is single-machine. No built-in mechanism for distributing events across nodes. (This is intentional — see alknet relevance below.) +- **Task pipelines/chains/groups/chords**. Deliberately not built. +- **Workflow orchestration with DAGs**. Deliberately not built. +- **Ordering guarantees across queues**. Each queue is independent. +- **Exactly-once delivery**. Honker provides at-least-once. Idempotent handlers are the user's responsibility. + +--- + +## 4. Performance + +### Benchmarks (M-series, release build, median of 3) + +| Operation | Throughput | +|-----------|-----------| +| enqueue (1/tx) | ~8,000/sec | +| enqueue (100/tx) | ~110,000/sec | +| claim + ack (individual) | ~4,500/sec | +| claim_batch + ack_batch (32) | ~75,000/sec | +| claim_batch + ack_batch (128) | ~110,000/sec | +| async iter end-to-end | ~6,500/sec | +| stream replay | ~1,000,000/sec | +| stream live e2e p50 | 0.24ms | +| stream live e2e p99 | 8ms | + +### Cross-Process Wake Latency + +Median ~1-2ms on M-series, bounded by the 1ms `PRAGMA data_version` poll cadence. 600-second soak test under sustained ~75 commits/sec showed zero missed wakes, zero drift, `PRAGMA integrity_check = ok`. + +### Claim Performance at Scale + +With 100,000 dead rows in `_honker_dead`: + +| Operation | Claim+ack | +|-----------|-----------| +| 0 dead rows (fresh DB) | ~4,000/sec | +| 100k dead rows | ~3,500/sec | + +The partial index `(queue, priority DESC, run_at, id) WHERE state IN ('pending','processing')` keeps the claim hot path bounded by working-set size, not history size. + +### How It Compares to Polling + +Prior to honker's wake mechanism, the alternative would be application-level polling (e.g., `SELECT … WHERE id > last_seen` every N seconds). Honker replaces this with a single-digit-microsecond PRAGMA read. 100 subscribers still = 1 poll thread. The over-triggering trade-off (waking all subscribers on any commit) is explicitly chosen over potentially missing a wake. + +--- + +## 5. SQLite Integration + +### Loading the Extension + +```sql +-- Any SQLite 3.9+ client +.load ./libhonker_ext +SELECT honker_bootstrap(); +``` + +`honker_bootstrap()` is idempotent — it runs `CREATE TABLE IF NOT EXISTS` and `CREATE INDEX IF NOT EXISTS` for all schema tables. + +### Compile/Load Flags + +For Rust integration via `rusqlite`: + +```toml +[dependencies] +honker-core = "0.2.3" +rusqlite = { version = "0.39.0", features = ["functions", "hooks"] } +``` + +Then in Rust: +```rust +use honker_core::{attach_notify, attach_honker_functions, bootstrap_honker_schema, open_conn}; + +let conn = open_conn("app.db", true)?; // true = install notify +attach_honker_functions(&conn)?; +bootstrap_honker_schema(&conn)?; +``` + +For the loadable extension: +```bash +cargo build --release -p honker-extension +# Produces: target/release/libhonker_ext.so (or .dylib, .dll) +``` + +### Rust Crate Usage + +```rust +use honker_core::SharedUpdateWatcher; + +let watcher = SharedUpdateWatcher::new(db_path.clone()); +let (sub_id, rx) = watcher.subscribe(); + +// In a loop: +match rx.recv_timeout(Duration::from_secs(5)) { + Ok(()) => { /* re-read state from SQLite */ }, + Err(RecvTimeoutError::Timeout) => { /* paranoia poll */ }, + Err(RecvTimeoutError::Disconnected) => { /* watcher died, reopen */ }, +} + +watcher.unsubscribe(sub_id); +watcher.close()?; +``` + +### Using with ORM Connections + +Load `libhonker_ext` on the ORM's connection and call `honker_bootstrap()` inside the ORM's transaction: + +```python +# SQLAlchemy +@event.listens_for(engine, "connect") +def _load_honker(conn, _): + honker.load_extension(conn) + conn.execute("SELECT honker_bootstrap()") + +with Session(engine) as s, s.begin(): + s.add(Order(user_id=42)) + s.execute(text("SELECT honker_enqueue(:q, :p, NULL, NULL, 0, 3, NULL)"), + {"q": "emails", "p": '{"to":"alice"}'}) +``` + +### PRAGMA Defaults + +Applied on every connection opened via `open_conn`: + +```sql +PRAGMA journal_mode = WAL; +PRAGMA synchronous = NORMAL; -- fsync WAL at checkpoint, not every commit +PRAGMA busy_timeout = 5000; -- wait up to 5s for writer lock +PRAGMA foreign_keys = ON; +PRAGMA cache_size = -32000; -- 32MB page cache (default was 2MB) +PRAGMA temp_store = MEMORY; -- temp B-trees in RAM +PRAGMA wal_autocheckpoint = 10000; -- fsync every 10k WAL pages +``` + +--- + +## 6. Complete API Surface + +### Notification Functions + +| Function | Signature | Returns | Notes | +|----------|-----------|---------|-------| +| `notify(channel, payload)` | Scalar, 2 args | `rowid` | INSERTs into `_honker_notifications`, returns auto-generated id | + +### Queue Functions + +| Function | Signature | Returns | Notes | +|----------|-----------|---------|-------| +| `honker_bootstrap()` | 0 args | `1` | Creates all schema tables/indexes. Idempotent. | +| `honker_enqueue(queue, payload, run_at_or_null, delay_or_null, priority, max_attempts, expires_or_null)` | 7 args | `id` | INSERTs job. Delay overrides run_at. | +| `honker_claim_batch(queue, worker_id, n, timeout_s)` | 4 args | JSON array | Claims up to `n` jobs. Each gets `claim_expires_at = now + timeout_s`. | +| `honker_ack_batch(ids_json, worker_id)` | 2 args | `count` | ACKs (DELETEs) claimed jobs. `ids_json` is `[1,2,3]`. | +| `honker_ack(job_id, worker_id)` | 2 args | `1` or `0` | Single-job ack. Returns 0 if claim expired. | +| `honker_retry(job_id, worker_id, delay_s, error)` | 4 args | `1` or `0` | Retries (flips back to pending) or fails to dead if `attempts >= max_attempts`. | +| `honker_fail(job_id, worker_id, error)` | 3 args | `1` or `0` | Unconditionally moves to `_honker_dead`. | +| `honker_heartbeat(job_id, worker_id, extend_s)` | 3 args | `1` or `0` | Extends claim for long-running handlers. | +| `honker_cancel(job_id)` | 1 arg | `1` or `0` | Removes pending/processing row. Idempotent. | +| `honker_get_job(job_id)` | 1 arg | JSON or `""` | Read job state. Pure read. | +| `honker_sweep_expired(queue)` | 1 arg | `count` | Moves expired pending jobs to `_honker_dead`. | +| `honker_queue_next_claim_at(queue)` | 1 arg | `unix_ts` or `0` | Earliest future deadline (run_at or claim_expires_at + 1). | + +### Stream Functions + +| Function | Signature | Returns | Notes | +|----------|-----------|---------|-------| +| `honker_stream_publish(topic, key_or_null, payload_json)` | 3 args | `offset` | INSERTs event + fires notification | +| `honker_stream_read_since(topic, offset, limit)` | 3 args | JSON array | Reads events with `offset > ?` | +| `honker_stream_save_offset(consumer, topic, offset)` | 3 args | `1` or `0` | Monotonic upsert. 0 = existing offset ≥ new | +| `honker_stream_get_offset(consumer, topic)` | 2 args | `offset` or `0` | Returns saved offset | + +### Lock Functions + +| Function | Signature | Returns | Notes | +|----------|-----------|---------|-------| +| `honker_lock_acquire(name, owner, ttl_s)` | 3 args | `1` or `0` | 1 = acquired, 0 = held | +| `honker_lock_release(name, owner)` | 2 args | `1` or `0` | 1 = released, 0 = not yours | + +### Rate Limit Functions + +| Function | Signature | Returns | Notes | +|----------|-----------|---------|-------| +| `honker_rate_limit_try(name, limit, per)` | 3 args | `1` or `0` | 1 = under limit, 0 = at limit | +| `honker_rate_limit_sweep(older_than_s)` | 1 arg | `count` | Prunes expired windows | + +### Scheduler Functions + +| Function | Signature | Returns | Notes | +|----------|-----------|---------|-------| +| `honker_scheduler_register(name, queue, cron_expr, payload, priority, expires_s_or_null)` | 6 args | `1` | Upserts task. Computes next_fire_at. | +| `honker_scheduler_unregister(name)` | 1 arg | `0` or `1` | Deletes task. | +| `honker_scheduler_tick(now_unix)` | 1 arg | JSON array | Fires due tasks, enqueues payloads, advances next_fire_at. | +| `honker_scheduler_soonest()` | 0 args | `unix_ts` or `0` | Earliest next_fire_at for sleep duration calculation. | +| `honker_scheduler_pause(name)` | 1 arg | `0` or `1` | Toggles `enabled = 0`. | +| `honker_scheduler_resume(name)` | 1 arg | `0` or `1` | Toggles `enabled = 1`. | +| `honker_scheduler_list()` | 0 args | JSON array | Returns all schedules with state. | +| `honker_scheduler_update(name, cron_expr_or_null, payload_or_null, priority_or_null, expires_s_or_null, touch_expires)` | 6 args | `0` or `1` | Mutates schedule fields. Recomputes next_fire_at if cron_expr changed. | +| `honker_cron_next_after(expr, from_unix)` | 2 args | `unix_ts` | Pure deterministic function. 5-field, 6-field, or `@every `. | + +### Result Functions + +| Function | Signature | Returns | Notes | +|----------|-----------|---------|-------| +| `honker_result_save(job_id, value_json, ttl_s)` | 3 args | `1` | UPSERTs result. `ttl_s=0` = no expiration. | +| `honker_result_get(job_id)` | 1 arg | `value` or `NULL` | Returns result or NULL if expired/missing. | +| `honker_result_sweep()` | 0 args | `count` | Prunes expired result rows. | + +### Watcher Functions (Extension ABI) + +| Function | Signature | Returns | Notes | +|----------|-----------|---------|-------| +| `honker_update_watcher_open(db_path, backend)` | 2 SQL args | `id` | Opens a watcher handle. For Elixir and extension consumers. | +| `honker_update_watcher_wait(id, timeout_ms)` | 2 SQL args | `1`/`0`/`-1` | 1 = update observed, 0 = timeout, -1 = disconnected | +| `honker_update_watcher_close(id)` | 1 SQL arg | `1` | Closes watcher handle. | + +C ABI (for Go, .NET, C++, Ruby bindings that route through the extension): + +| Function | Signature | Returns | Notes | +|----------|-----------|---------|-------| +| `honker_watcher_open(db_path, backend, err_buf, err_buf_len)` | C ABI | `*mut HonkerWatcherHandle` | Opens a core-backed update watcher. | +| `honker_watcher_wait(handle, timeout_ms)` | C ABI | `1`/`0`/`-1`/`-2` | 1 = update, 0 = timeout, -1 = closed, -2 = panic | +| `honker_watcher_close(handle)` | C ABI | void | Closes and frees the handle. | + +### Tables + +| Table | Purpose | +|-------|---------| +| `_honker_live` | Pending + processing jobs. Partial index for fast claims. | +| `_honker_dead` | Terminal jobs (retry-exhausted or explicitly failed). Never scanned by claim path. | +| `_honker_notifications` | Ephemeral notify/listen messages. Not auto-pruned. | +| `_honker_stream` | Durable stream events with auto-incrementing offsets. | +| `_honker_stream_consumers` | Per-consumer stream offsets. Monotonic upsert. | +| `_honker_locks` | Named advisory locks with TTL expiration. | +| `_honker_rate_limits` | Fixed-window rate limit counters. | +| `_honker_scheduler_tasks` | Cron/schedule task definitions with next_fire_at. | +| `_honker_results` | Task result storage with TTL expiration. | + +--- + +## 7. Comparison to Postgres (pg_notify) + +| Feature | Honker | pg_notify | +|---------|--------|-----------| +| **Delivery model** | Table-backed `INSERT` in transaction | In-memory NOTIFY with LISTEN callback | +| **Persistence** | Rows survive restart. Not auto-pruned. | Ephemeral — lost on restart, not replayed. | +| **Transactional coupling** | `notify(channel, payload)` inside `BEGIN IMMEDIATE; INSERT; COMMIT` — atomic with business write | NOTIFY fires at COMMIT inside the same transaction. Atomic with business write. | +| **Retry / visibility timeout** | Queue has `claim_expires_at`, `attempts`, `max_attempts`, dead-letter. | No retry. No visibility timeout. | +| **Delayed delivery** | `run_at` for scheduled delivery. Jobs only claimable after deadline. | No scheduling. | +| **Cross-process wake** | `PRAGMA data_version` polling at ~1ms cadence. SharedUpdateWatcher fans out to N subscribers. | Postgres notifies listeners via its inter-process communication. | +| **Priority** | Queue priority via partial index `(queue, priority DESC, run_at, id)`. | No priority. | +| **Rate limiting** | Built-in fixed-window `rate_limit_try`. | No rate limiting. | +| **Named locks** | TTL-based advisory locks in `_honker_locks`. | `pg_advisory_lock` (similar concept, different implementation). | +| **Cron scheduling** | Built-in scheduler with 5-field/6-field cron + `@every` intervals. | Needs pg-boss/Oban/cron extension. | +| **Stream offsets** | Per-consumer tracked offsets with monotonic upsert. | No built-in stream offsets. | +| **Multi-process** | Single-machine, single-writer. | Multi-process, multi-writer natively. | +| **Durability** | SQLite ACID. WAL mode for concurrent readers. | Postgres ACID. Full write-ahead logging. | + +**What honker gives you that pg_notify alone doesn't**: +1. **Retry with exponential backoff** — automatic re-delivery on failure +2. **Visibility timeout** — crashed workers don't permanently lose messages +3. **Dead-letter queue** — exhausted retries land in `_honker_dead` for inspection +4. **Delayed jobs** — `run_at` for future delivery +5. **Prioritization** — `priority` column in claim index +6. **Transactional outbox** — business write + enqueue/event in one transaction, without adding Redis/Celery +7. **Task result storage** — workers can persist return values; callers can await results +8. **Durable streams** — per-consumer offsets with at-least-once delivery +9. **Cron scheduling** — built-in periodic tasks with leader election +10. **Named locks and rate limiting** — built-in coordination primitives + +**What you'd need to add if you used Postgres instead**: pg-boss, Oban, or similar PgBoss-style packages provide many of these features, but they require Postgres as the database. Honker exists for the case where SQLite is already the primary datastore. + +--- + +## 8. Comparison to Other Message Systems + +| Feature | Honker | Redis Pub/Sub | NATS | Kafka | +|---------|--------|--------------|------|-------| +| **Persistence** | SQLite tables (disk) | In-memory only (unless RDB/AOF) | In-memory (JetStream adds persistence) | Persistent log | +| **Transactional coupling** | Business write + enqueue in one tx | Not atomic with business data | Not atomic with business data | Not atomic with business data | +| **Delivery guarantee** | At-least-once | At-most-once (fire-and-forget) | At-most-once (core); at-least-once (JetStream) | At-least-once with consumer offsets | +| **Retry/visibility** | Built-in (claim timeout, retry, dead-letter) | None (messages disappear if no consumer) | None (core); redelivery (JetStream) | Consumer group offsets | +| **Priority** | Yes (partial index) | No | No | No | +| **Delayed delivery** | Yes (`run_at`) | No (requires sorted sets hack) | No | No (requires time-based logic) | +| **Single-node complexity** | Zero — just a `.db` file | Requires Redis server | Requires NATS server | Requires Kafka cluster | +| **Cross-process wake latency** | 1-2ms | ~0.1ms | ~0.1ms | ~1-5ms | +| **Cross-node distribution** | None (single machine) | Pub/Sub is fan-out to connected clients | JetStream supports clustering | Built for distributed | +| **Dependency** | SQLite (already in your stack) | Additional server | Additional server | Additional cluster | +| **Schema coupling** | Same file as business data — dual-write impossible | Separate system — dual-write risk | Separate system — dual-write risk | Separate system — dual-write risk | +| **Language support** | Python, Node, Rust, Go, Ruby, Bun, Elixir, C++, .NET, JVM, Kotlin | Many (but protocol, not SQL) | 40+ client libraries | Many client libraries | +| **Dead-letter queue** | Built-in `_honker_dead` | None | JetStream has DLQ | DLQ via configuration | + +**When honker is the right choice**: SQLite is already your primary datastore, and you need pub/sub + queue + scheduling without introducing Redis/Celery/NATS. The dual-write problem between your business tables and the queue disappears. + +**When honker is NOT the right choice**: Multi-node deployments, multi-writer sharding, need for cross-datacenter replication, or workloads exceeding single-machine throughput. + +--- + +## 9. Relevance to Alknet + +### 9.1 Alignment with Event Boundary Discipline (ADR-032) + +ADR-032 defines three communication layers: + +``` +Call Protocol (Layer 3, external, JSON) + └── irpc Service (Layer 3, internal, postcard) + └── Honker Streams (Domain events, within service boundary) +``` + +**Honker's single-machine model is exactly right for the bottom layer.** Domain events in alknet are internal to the service that owns that data — `nodes:created`, `edges:deleted`, `accounts:updated`. These never cross the service boundary without projection into a call protocol `EventEnvelope`. + +The integration plan (Phase 2.2) explicitly lists honker integration patterns for alknet-storage: + +| Feature | Use Case | +|---------|----------| +| `stream_publish` / `subscribe` | Durable pub/sub for node/edge/membership changes | +| `notify` / `listen` | Ephemeral pub/sub for real-time control channel events | +| `queue` / `claim` / `ack` | Task queue for async operations | + +### 9.2 Patterns from Honker for alknet-storage Adoption + +**Map honker's primitives to alknet-storage's internal events**: + +| Alknet Domain Event | Honker Primitive | Stream Name | +|---------------------|------------------|-------------| +| Node created | `stream.publish("nodes:created", ...)` | `nodes:created` | +| Node updated | `stream.publish("nodes:updated", ...)` | `nodes:updated` | +| Node deleted | `stream.publish("nodes:deleted", ...)` | `nodes:deleted` | +| Edge created | `stream.publish("edges:created", ...)` | `edges:created` | +| Account updated | `stream.publish("accounts:updated", ...)` | `accounts:updated` | +| ACL rule changed | `stream.publish("acl:changed", ...)` | `acl:changed` | + +**Map honker's task queue to alknet's async operations**: + +| Alknet Async Task | Honker Queue | +|-------------------|-------------| +| Key rotation | `queue("key-rotation")` | +| Certificate renewal | `queue("cert-renewal")` | +| Audit log archival | `queue("audit-archival")` | +| Node encryption/decryption | `queue("node-crypto")` | + +**Map honker's notify/listen to real-time events**: + +| Alknet Real-Time Event | Honker Channel | +|------------------------|---------------| +| SSH connection opened | `notify("ssh:connected", ...)` | +| Config reload triggered | `notify("config:reload", ...)` | +| Forwarding rule activated | `notify("forwarding:activated", ...)` | + +### 9.3 Replicating Honker Patterns with Postgres for Production + +If alknet-storage is backed by Postgres in production deployments (the storage spec mentions `rusqlite` but leaves room for alternative backends), the following Postgres equivalents would be needed: + +| Honker Primitive | Postgres Equivalent | What's Lost | +|-----------------|---------------------|-------------| +| `notify/listen` | `pg_notify` + `LISTEN` | Postgres NOTIFY is ephemeral (lost on restart). Honker's table-backed notifications persist. Need to add a `_notifications` table and polling. | +| `stream_publish/subscribe` | `pg_notify` + consumer offset table | No built-in per-consumer offset tracking. Would need a `_stream_consumers` table and polling/cursor logic. | +| `queue/claim/ack` | pg-boss / Oban | These exist and are production-quality. Honker's simplicity (one table, partial index) is lost. Need a dependency on Oban or pg-boss. | +| `run_at` (delayed jobs) | Oban's `scheduled_at` / pg-boss's `startAfter` | Available in both. | +| `claim_expires_at` (visibility timeout) | Oban's `attempted_at` + `max_attempts` | Available in both. | +| `honker_lock_acquire/release` | `pg_advisory_lock` | Built-in, similar concept. | +| `honker_rate_limit_try` | Custom table or Redis | Postgres has no built-in rate limiting. | +| Transactional coupling | Same tx | Naturally available: `INSERT INTO orders ...; INSERT INTO _honker_live ...;` both in the same Postgres tx. | +| Scheduler | pg-boss `schedule()` or Oban's `Oban.insert(CronWorker, ...)` | Available in both. | + +**What would be lost switching to Postgres + pg-boss/Oban**: +- **Schema simplicity**: Honker uses 2 tables for 90% of queue operations. pg-boss uses more tables. Oban uses per-queue tables. +- **Zero-dependency**: Honker is a SQLite extension. No Redis, no Celery, no broker. pg-boss requires Postgres. Oban requires Postgres + Elixir. +- **Cross-language transparency**: Any SQLite client can `SELECT load_extension('honker')` and get the same features. Postgres requires language-specific client libraries. +- **File-based deployment**: Copy the `.db` file. Done. Postgres requires a server. + +**Recommendation for alknet-storage**: Start with honker on SQLite for self-hosted/edge deployments. For production Postgres deployments, create an abstraction layer in `alknet-storage` that implements the same `EventStream`, `TaskQueue`, and `NotificationChannel` traits against both backends. The honker-on-SQLite implementation is the reference; the Postgres implementation uses `pg_notify` + offset tables + Oban/pg-boss. + +### 9.4 Honker's Queue/Claim Model and alknet's Call Protocol + +The call protocol's `EventEnvelope` frames are the integration boundary (ADR-033). When a domain event needs to cross node boundaries, it must be projected: + +``` +Honker stream event (internal) + → Projection function + → EventEnvelope frame (external, call protocol) + → Transported over SSH/QUIC/DNS + → Received by remote node + → May trigger local Honker stream event on remote node +``` + +The **queue/claim model maps to async call protocol operations**: + +1. **call.requested** → Honker `queue.enqueue({"operation": "/head/auth/verify", "input": {...}})` +2. **Worker claims the job** → Like a worker process picking up a call request +3. **job.ack()** → call.responded with the result +4. **job.retry()** → Call timeout / retry logic (but this is at the transport layer, not the queue) +5. **job fails → _honker_dead** → Dead letter equivalent for failed call protocol operations + +The **key difference**: alknet's call protocol is synchronous request-response at the transport layer, while honker's queue is async at-least-once. They serve different purposes: +- **Call protocol**: "I need you to verify this pubkey NOW" (synchronous, cross-node) +- **Honker queue**: "Process this key rotation in the background" (asynchronous, within-node) + +For **cross-node task distribution**, honker's queue should NOT be the transport. Instead: +1. A domain event (honker stream) in the storage service triggers a projection +2. The projection creates an `EventEnvelope` frame +3. The call protocol delivers it to remote nodes +4. Remote nodes may enqueue it into their own honker queues for local processing + +### 9.5 Cross-Node Event Distribution + +**Honker is single-node by design.** This is correct for alknet's architecture because: + +1. **Domain events stay within the service boundary** (ADR-032). Honker streams are for internal state reconstruction, not cross-node distribution. +2. **Integration events cross boundaries via the call protocol.** When a domain event in the storage service needs to be communicated to another node, it's projected into an `EventEnvelope` frame and sent over the wire. +3. **Each node has its own `.db` file** with its own honker streams. This is a feature, not a limitation — it enforces the event boundary discipline. + +The bridge pattern: + +``` +Node A (storage service): + 1. Business write INSERTs into SQLite + 2. stream.publish("nodes:created", {node_id: 42}) in same tx + 3. A local subscriber detects the event + 4. Projects it into EventEnvelope {operation: "/head/nodes/created", data: {node_id: 42}} + 5. Sends via call protocol over SSH/QUIC/DNS to Node B + +Node B (receiver): + 1. Receives EventEnvelope via call protocol + 2. Enqueues locally: queue("incoming-events").enqueue({source: "node-A", event: ...}) + 3. Or publishes locally: stream.publish("remote:nodes:created", {node_id: 42}) +``` + +This preserves the three-layer model while respecting honker's single-machine design. + +### 9.6 Honker Patterns and Integration Plan Mapping + +The integration plan (Phase 2.2, alknet-storage) references these honker patterns. Here's the direct mapping: + +| Plan Reference | Honker Primitive | Implementation Notes | +|---------------|-----------------|---------------------| +| `stream_publish/subscribe` | `db.stream("topic").publish(data, tx=tx)` + `async for event in stream.subscribe(consumer="name")` | Used for domain events within alknet-storage. Each metagraph change (node/edge created/updated/deleted) publishes to a stream. Consumers (local reactive logic, SSE endpoints) subscribe. | +| `notify/listen` | `tx.notify("channel", data)` + `async for n in db.listen("channel")` | Used for ephemeral real-time signals. SSH connection events, config reload triggers, forwarding rule activation. No persistence needed. | +| `queue/claim` | `queue.enqueue(data, tx=tx)` + `async for job in queue.claim(worker_id)` | Used for background tasks. Key rotation, certificate renewal, audit log archival, batch operations. The `tx=tx` parameter ensures atomicity with business writes. | + +**Implementation approach for alknet-storage (Rust)**: + +Use `honker-core` directly (not the Python binding). The Rust crate exposes: +- `open_conn(path, install_notify)` — open a connection with PRAGMA defaults +- `attach_honker_functions(&conn)` — register all SQL functions +- `bootstrap_honker_schema(&conn)` — create tables +- `SharedUpdateWatcher::new(db_path)` — the wake listener + +Or load the extension via `rusqlite`: +```rust +use rusqlite::Connection; + +let conn = Connection::open("alknet.db")?; +conn.load_extension("libhonker_ext", None)?; +conn.execute_batch("SELECT honker_bootstrap()")?; +``` + +### 9.7 Rust Integration: honker-core vs honker-rs + +Two options for Rust integration in alknet-storage: + +**Option A: Use `honker-core` directly** + +The `honker-core` crate provides: +- `attach_notify(&conn)` — `_honker_notifications` table + `notify()` SQL function +- `attach_honker_functions(&conn)` — all `honker_*` SQL functions +- `bootstrap_honker_schema(&conn)` — all table/index creation +- `SharedUpdateWatcher` — the wake mechanism +- `open_conn(path, install_notify)` — connection factory with PRAGMA defaults + +This gives you raw SQL access. You call `conn.query_row("SELECT honker_enqueue(…)")` etc. Maximum control, minimum abstraction. + +**Option B: Use `honker-rs` ergonomic wrapper** + +The `packages/honker-rs` crate provides: +- `Database::open(path)` — opens `system.db` +- `db.queue("name")` — `Queue` handle with `.enqueue()`, `.claim_batch()`, `.ack_batch()` +- `db.stream("name")` — `Stream` handle with `.publish()`, `.subscribe()` +- `db.listen("channel")` — async listener +- `db.outbox("name", delivery_fn)` — outbox pattern +- `db.lock("name", owner, ttl)` — named lock +- `db.scheduler()` — cron scheduler + +**Recommendation**: Start with `honker-core` + direct SQL. The schema and functions are stable and well-tested. Wrap in application-level methods as needed. `honker-rs` may not expose all features (e.g., the scheduler pause/resume/list/update functions added in Phase Mantle). Using `honker-core` gives maximum flexibility while maintaining a single source of truth for SQL behavior. + +--- + +## 10. Open Questions for Alknet + +1. **Should alknet-storage bundle honker as a Rust crate dependency, or load the extension at runtime?** + - Bundling `honker-core` gives compile-time verification. Loading the extension requires shipping `libhonker_ext.so/.dylib/.dll` alongside the binary. + - Recommendation: Bundle `honker-core` as a crate dependency for the Rust implementation. Extension loading is for language bindings that can't link Rust code directly. + +2. **Should the `alknet-storage` crate depend on `honker` (the Python package) or `honker-core` (the Rust rlib)?** + - `honker-core` (Rust rlib) — correct choice for a Rust crate. `honker` is the Python binding. + - The Crate dependency in storage.md currently lists `honker = "0.x"`. This should be `honker-core = "0.2"`. + +3. **How does the Rust `SharedUpdateWatcher` integrate with tokio?** + - `SharedUpdateWatcher::subscribe()` returns a `std::sync::mpsc::Receiver<()>`, which is blocking. For tokio integration, wrap in `tokio::task::spawn_blocking` or use `tokio::sync::mpsc` as a bridge. + - Alternatively, use `UpdateWatcher::spawn()` directly and convert ticks to tokio notifications. + +4. **Should alknet-storage abstract over honker-specific table names?** + - Honker prefixes all internal tables with `_honker_` (e.g., `_honker_live`, `_honker_stream`). Alknet-storage should treat these as honker's internal schema and not directly query them for application logic. + - Application-level tables (like `nodes`, `edges`, `accounts`) should use their own namespacing convention. Honker's tables coexist in the same `.db` file. + +5. **Multi-tenant support**: Honker queues and streams are identified by name strings (e.g., `"emails"`, `"user-events"`). For alknet's multi-tenant model (system DB vs tenant DB), each tenant gets its own `.db` file with its own honker tables. Cross-tenant events must go through the call protocol — never by direct honker stream subscription across database files. + +6. **Database file management**: Alknet-storage's system DB (`system.db`) and tenant DBs (`tenant-{orgId}.db`) should each have their own honker instance. The `SharedUpdateWatcher` is per-database, so 100 active tenants = 100 poll threads. This is fine for the expected alknet deployment size, but worth monitoring thread count in large deployments. + +--- + +## 11. License and Maturity + +- **License**: Apache 2.0 OR MIT (dual-licensed). Fully permissive for integration. +- **Maturity**: Alpha software (noted in README). Better than experimental but not beta-quality yet. +- **Status**: Active development. Regular commits. Cross-language interop tests. 180+ Python tests, 12+ Rust tests. Crash recovery verified. 600-second soak test under sustained writes. +- **Breaking changes risk**: The project is pre-1.0. Some table names still reference "joblite" and "litenotify" in the CHANGELOG (historical names). Current names use `_honker_` prefix. The API surface is stabilizing but may change. +- **Recommendation**: Pin to a specific `honker-core` version in `alknet-storage`'s `Cargo.toml`. The schema migration path (seen in `bootstrap_honker_schema`'s ALTER TABLE for `enabled` column) shows the project handles migrations. + +--- + +## References + +- [Honker GitHub Repository](https://github.com/russellromney/honker) — Primary source for all code and documentation +- [Honker README](https://github.com/russellromney/honker/blob/main/README.md) — Feature overview, quick start, architecture, performance +- [Honker BINDINGS.md](https://github.com/russellromney/honker/blob/main/BINDINGS.md) — Language binding support matrix +- [Honker ROADMAP.md](https://github.com/russellromney/honker/blob/main/ROADMAP.md) — Future work phases, planned features (singleton/dedup, state events, queue stats, per-queue config) +- [Honker CHANGELOG.md](https://github.com/russellromney/honker/blob/main/CHANGELOG.md) — Detailed history of all changes, performance passes, and architecture decisions +- [Honker honker-core/src/lib.rs](https://github.com/russellromney/honker/blob/main/honker-core/src/lib.rs) — Core Rust implementation: Writer, Readers, UpdateWatcher, SharedUpdateWatcher, schema, PRAGMAs +- [Honker honker-core/src/honker_ops.rs](https://github.com/russellromney/honker/blob/main/honker-core/src/honker_ops.rs) — All SQL function implementations: enqueue, claim, ack, retry, stream, lock, rate limit, scheduler +- [Honker honker-extension/src/lib.rs](https://github.com/russellromney/honker/blob/main/honker-extension/src/lib.rs) — Loadable extension entry point and C ABI for watcher +- [alknet ADR-032: Event Boundary Discipline](../../architecture/decisions/032-event-boundary-discipline.md) — Domain events stay within service boundary +- [alknet Integration Plan](../../research/integration-plan.md) — Phase 2.2: alknet-storage honker integration +- [alknet Storage Spec](../../architecture/storage.md) — alknet-storage crate design and honker integration table \ No newline at end of file diff --git a/docs/research/references/rustfs/rustfs-events-select.md b/docs/research/references/rustfs/rustfs-events-select.md new file mode 100644 index 0000000..eff73fa --- /dev/null +++ b/docs/research/references/rustfs/rustfs-events-select.md @@ -0,0 +1,765 @@ +# RustFS Event Notification System & S3 Select Reference + +> **Companion document**: This extends [rustfs-reference.md](./rustfs-reference.md) which covers auth, architecture, and credential mapping. This document focuses on the **event notification system** and **S3 Select** feature. + +**Date**: 2026-06-08 +**RustFS version**: Based on source at `/workspace/rustfs/` (commit-level snapshot) +**Purpose**: Evaluate rustfs event notification and S3 Select for alknet integration + +--- + +## Table of Contents + +1. [Event Notification System](#1-event-notification-system) +2. [Event Types & Structure](#2-event-types--structure) +3. [Notification Targets](#3-notification-targets) +4. [Configuration & Rule Engine](#4-configuration--rule-engine) +5. [Pipeline & Delivery](#5-pipeline--delivery) +6. [Live Event Stream](#6-live-event-stream) +7. [S3 Select](#7-s3-select) +8. [Mapping to alknet](#8-mapping-to-alknet) +9. [References](#9-references) + +--- + +## 1. Event Notification System + +### 1.1 Architecture Overview + +RustFS implements a full S3-compatible bucket notification system. The architecture follows a layered pattern: + +``` +┌──────────────────────────────────────────────────────────┐ +│ S3 API Layer │ +│ (PutObject, DeleteObject, CopyObject, etc.) │ +└─────────────┬────────────────────────────────────────────┘ + │ emits EventArgs + ▼ +┌──────────────────────────────────────────────────────────┐ +│ ECStore (event_notification.rs) │ +│ - send_event() hook (global OnceLock dispatch) │ +│ - registers dispatch callback during init │ +└─────────────┬────────────────────────────────────────────┘ + │ converts EventArgs → NotifyEventArgs + ▼ +┌──────────────────────────────────────────────────────────┐ +│ rustfs_notify (NotificationSystem) │ +│ ┌──────────────┐ ┌──────────────┐ ┌───────────────┐ │ +│ │ NotifyPipeline│──▶│ NotifyRuleEngine│─▶│ EventNotifier │ │ +│ │ (broadcast │ │ (match rules) │ │ (send to │ │ +│ │ + history) │ │ │ │ targets) │ │ +│ └──────────────┘ └──────────────┘ └──────┬────────┘ │ +│ │ │ +│ ┌──────────────┐ ┌──────────────┐ ┌──────▼────────┐ │ +│ │BucketConfigM │ │ NotifyConfigM │ │ TargetList │ │ +│ │ anager │ │ anager │ │ (Webhook, │ │ +│ └──────────────┘ └──────────────┘ │ Kafka, AMQP, │ │ +│ │ NATS, Redis, │ │ +│ │ MQTT, MySQL, │ │ +│ │ Postgres, │ │ +│ │ Pulsar) │ │ +│ └───────────────┘ │ +└──────────────────────────────────────────────────────────┘ +``` + +### 1.2 Key Crates + +| Crate | Purpose | +|-------|---------| +| `rustfs_notify` | Core notification orchestration: `Event`, `EventArgs`, `EventNotifier`, `NotifyPipeline`, `NotificationSystem`, rule engine, bucket config management | +| `rustfs_targets` | Target implementations (Webhook, Kafka, AMQP, NATS, Redis, MQTT, MySQL, PostgreSQL, Pulsar) + `Target` trait, `QueueStore`, TLS hot-reload | +| `rustfs_s3_types` | `EventName` enum with all S3 event type definitions, serialization, mask/bitfield support | +| `rustfs_ecstore` | Storage layer; `event_notification.rs` provides the dispatch hook that bridges ecstore events to the notify system | +| `rustfs_config` | Configuration for each target type (Env vars, KVS parsing, subsystem names) | + +### 1.3 Initialization Flow + +1. `rustfs/server/event.rs::init_event_notifier()` runs at startup +2. If notify module is enabled (`RUSTFS_NOTIFY_ENABLE=true`), it calls `rustfs_notify::initialize(config)` which: + - Creates a `NotificationSystem` with `EventNotifier`, `TargetRegistry`, and config + - Loads all target configurations from the config store + - Initializes each target (connects, health-checks, starts stream replay workers) +3. An ECStore dispatch hook is installed via `register_event_dispatch_hook()` which: + - Converts `ecstore::EventArgs` → `notify::EventArgs` + - Parses `EventName` from string + - Spawns an async task to call `notifier_global::notify(args)` + +### 1.4 Module Toggle + +The notification system respects a module enable/disable flag: +- Environment variable: `RUSTFS_NOTIFY_ENABLE` (default: `DEFAULT_NOTIFY_ENABLE`) +- When disabled, only the **live event stream** is initialized (no targets are loaded) +- This allows in-process event subscription without external delivery + +--- + +## 2. Event Types & Structure + +### 2.1 EventName Enum + +Defined in `rustfs_s3_types::EventName`. All S3-standard event types plus RustFS extensions: + +| Category | Events | +|----------|--------| +| **ObjectAccessed** | `s3:ObjectAccessed:Get`, `s3:ObjectAccessed:Head`, `s3:ObjectAccessed:GetRetention`, `s3:ObjectAccessed:GetLegalHold`, `s3:ObjectAccessed:Attributes` | +| **ObjectCreated** | `s3:ObjectCreated:Put`, `s3:ObjectCreated:Post`, `s3:ObjectCreated:Copy`, `s3:ObjectCreated:CompleteMultipartUpload`, `s3:ObjectCreated:PutRetention`, `s3:ObjectCreated:PutLegalHold` | +| **ObjectRemoved** | `s3:ObjectRemoved:Delete`, `s3:ObjectRemoved:DeleteMarkerCreated`, `s3:ObjectRemoved:DeleteAllVersions`, `s3:ObjectRemoved:NoOP` | +| **ObjectTagging** | `s3:ObjectTagging:Put`, `s3:ObjectTagging:Delete` | +| **ObjectAcl** | `s3:ObjectAcl:Put` | +| **ObjectReplication** | `s3:Replication:OperationFailedReplication`, `s3:Replication:OperationCompletedReplication`, `s3:Replication:OperationMissedThreshold`, `s3:Replication:OperationReplicatedAfterThreshold`, `s3:Replication:OperationNotTracked` | +| **ObjectRestore** | `s3:ObjectRestore:Post`, `s3:ObjectRestore:Completed` | +| **ObjectTransition** | `s3:ObjectTransition:Failed`, `s3:ObjectTransition:Complete` | +| **Lifecycle** | `s3:LifecycleExpiration:Delete`, `s3:LifecycleExpiration:DeleteMarkerCreated`, `s3:LifecycleDelMarkerExpiration:Delete`, `s3:LifecycleTransition` | +| **Bucket** | `s3:BucketCreated:*`, `s3:BucketRemoved:*` | +| **Scanner** | `s3:Scanner:ManyVersions`, `s3:Scanner:LargeVersions`, `s3:Scanner:BigPrefix` | +| **IntelligentTiering** | `s3:IntelligentTiering` | +| **Compound (wildcard)** | `s3:ObjectAccessed:*`, `s3:ObjectCreated:*`, `s3:ObjectRemoved:*`, `s3:ObjectTagging:*`, `s3:Replication:*`, `s3:ObjectRestore:*`, `s3:LifecycleExpiration:*`, `s3:ObjectTransition:*`, `s3:Scanner:*`, `Everything` | +| **Internal** | `ObjectRemovedAbortMultipartUpload`, `ObjectCreatedCreateMultipartUpload`, `ObjectRemovedDeleteObjects` | + +### 2.2 Event Schema Versioning + +The `event_schema_version` function returns different versions based on event type: + +| Version | Events | +|---------|--------| +| `2.1` | ObjectCreated/Removed/Accessed base events | +| `2.2` | Replication events | +| `2.3` | Tagging, ACL, Restore, Lifecycle, IntelligentTiering events | + +### 2.3 Event Record Structure (`rustfs_notify::Event`) + +```rust +pub struct Event { + pub event_version: String, // e.g., "2.1", "2.2", "2.3" + pub event_source: String, // "rustfs:s3" + pub aws_region: String, + pub event_time: DateTime, + pub event_name: EventName, + pub user_identity: Identity, // { principal_id: String } + pub request_parameters: HashMap, + pub response_elements: HashMap, + pub s3: Metadata, // See below + pub glacier_event_data: Option, + pub source: Source, // { host, port, user_agent } +} + +pub struct Metadata { + pub schema_version: String, // "1.0" + pub configuration_id: String, + pub bucket: Bucket, // { name, owner_identity, arn } + pub object: Object, // See below +} + +pub struct Object { + pub key: String, // URL-encoded object key + pub size: Option, + pub e_tag: Option, + pub content_type: Option, + pub user_metadata: Option>, + pub version_id: Option, + pub sequencer: String, // Monotonic event sequence ID +} +``` + +- The `key` field is URL-encoded (form-urlencoded) +- `sequencer` is derived from `ObjectInfo.mod_time` nanosecond timestamp, ensuring ordering +- `user_metadata` filters out keys starting with `x-amz-meta-internal-` +- For removed events, `size`, `e_tag`, `content_type`, and `user_metadata` are omitted + +### 2.4 EventArgs Builder + +Events are constructed via `EventArgsBuilder`: + +```rust +let args = EventArgsBuilder::new(EventName::ObjectCreatedPut, "my-bucket", object_info) + .host("10.0.0.1") + .port(9000) + .user_agent("alknet-storage/1.0") + .req_param("principalId", "user-123") + .version_id("v2") + .build(); +let event = Event::new(args); +``` + +The builder pattern ensures all required fields are provided and allows optional fields. + +--- + +## 3. Notification Targets + +### 3.1 Target Trait + +All targets implement `rustfs_targets::Target`: + +```rust +#[async_trait] +pub trait Target: Send + Sync + 'static +where E: Send + Sync + 'static + Clone + Serialize + DeserializeOwned +{ + fn id(&self) -> TargetID; + fn name(&self) -> String; + async fn is_active(&self) -> Result; + async fn save(&self, event: Arc>) -> Result<(), TargetError>; + async fn send_raw_from_store(&self, key: Key, body: Vec, meta: QueuedPayloadMeta) -> Result<(), TargetError>; + async fn send_from_store(&self, key: Key) -> Result<(), TargetError>; + async fn close(&self) -> Result<(), TargetError>; + fn store(&self) -> Option<&(dyn Store)>; + fn clone_dyn(&self) -> Box + Send + Sync>; + async fn init(&self) -> Result<(), TargetError>; + fn is_enabled(&self) -> bool; + fn delivery_snapshot(&self) -> TargetDeliverySnapshot; + fn record_final_failure(&self); +} +``` + +### 3.2 Supported Targets + +| Target | Crate Module | Protocol | Queue Store | TLS/mTLS | SASL | Notes | +|--------|-------------|----------|-------------|----------|------|-------| +| **Webhook** | `targets::webhook` | HTTP POST | Yes (file) | Yes (CA, client cert, skip_verify) | Bearer token | Health check via HEAD to `/`; TLS hot-reload | +| **Kafka** | `targets::kafka` | Kafka Produce | Yes (file) | Yes (CA, client cert) | PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 | Uses `rustfs_kafka_async`; acknowledgments configurable (-1, 0, 1) | +| **AMQP** | `targets::amqp` | AMQP 0-9-1 | Yes (file) | Yes (CA, client cert via amqps://) | Username/password (in URL or config) | Uses `lapin`; publisher confirms; persistent delivery mode | +| **NATS** | `targets::nats` | NATS Publish | Yes (file) | Yes (CA, client cert) | Token, username/password, credentials file | Subject-based routing | +| **Redis** | `targets::redis` | Redis Pub/Sub | Yes (file) | Yes (CA, client cert, insecure) | Password | Channel publish; connection pooling | +| **MQTT** | `targets::mqtt` | MQTT v5 | Yes (file) | Yes (CA, client cert) | Username/password | Uses `rumqttc`; QoS 0/1; WebSocket path allowlist | +| **MySQL** | `targets::mysql` | MySQL INSERT | Yes (file) | Yes (CA, client cert) | Username/password | Namespace or access format; connection pooling | +| **PostgreSQL** | `targets::postgres` | PostgreSQL INSERT/UPSERT | Yes (file) | Yes (CA, client cert) | Username/password (DSN) | Namespace (UPSERT) or access (append) format; `deadpool-postgres` pooling | +| **Pulsar** | `targets::pulsar` | Pulsar Produce | Yes (file) | Yes (CA, client cert) | Token, OAuth2 | Topic-based; persistent or non-persistent | + +**Note**: Elasticsearch is listed as a subsystem constant (`notify_elasticsearch`) but marked `#[allow(dead_code)]`, indicating it's planned but not yet implemented. + +### 3.3 Target Identification (ARN) + +Each target has a `TargetID` (format: `ID:Name`, e.g., `1:webhook`) and an `ARN` (format: `arn:rustfs:sqs:{region}:{id}:{name}`, e.g., `arn:rustfs:sqs:us-east-1:1:webhook`). + +Default partition: `rustfs`, default service: `sqs`. + +### 3.4 Queue Store (Persistent Delivery) + +Targets that have a `queue_dir` configured use a persistent store for at-least-once delivery: + +- Events are first persisted to the queue store, then sent +- If the target is unreachable, events remain in the store and are replayed when connectivity recovers +- Queue store format: `RQP1` magic + metadata length (LE u32) + JSON metadata + raw body +- `QueuedPayload` structure includes: event_name, bucket_name, object_name, content_type, queued_at_unix_ms, payload_len +- Extension: `notify_store` (`.nqs`) for notification events, `audit_store` for audit logs + +### 3.5 Delivery Payload Format (`TargetLog`) + +```rust +// Serialized as JSON when delivering to targets +struct TargetLog { + event_name: EventName, + key: String, // "{bucket}/{decoded_object_name}" + records: Vec, // For AMQP/NATS: includes full EntityTarget records + // For others: includes serialized Event data +} +``` + +For AMQP and NATS targets, `build_queued_payload_with_records()` is used, which includes cloned `EntityTarget` records. For other targets, `build_queued_payload()` serializes just the event data. + +### 3.6 Concurrency Controls + +| Parameter | Default | Env Var | +|-----------|---------|---------| +| Target stream concurrency | 20 | `RUSTFS_NOTIFY_TARGET_STREAM_CONCURRENCY` | +| Send concurrency (inflight limit) | 64 | `RUSTFS_NOTIFY_SEND_CONCURRENCY` | + +### 3.7 TLS Hot-Reload + +All targets that support TLS (webhook, Kafka, AMQP, NATS, MySQL, PostgreSQL, MQTT) implement `ReloadableTargetTls`: + +- A background coordinator polls TLS files for changes +- When fingerprint changes are detected, new material (HTTP client, producer, connection) is built +- Applied via `apply_tls_material()` without requiring a restart +- Supports CA certificates, client certificates, and client keys + +--- + +## 4. Configuration & Rule Engine + +### 4.1 Bucket Notification Configuration (XML) + +Configuration follows the S3 `NotificationConfiguration` XML schema: + +```xml + + + my-notification + arn:rustfs:sqs:us-east-1:1:webhook + s3:ObjectCreated:* + s3:ObjectRemoved:Delete + + + + prefix + uploads/ + + + suffix + .csv + + + + + +``` + +The XML is parsed via `quick_xml` into `NotificationConfiguration` → `QueueConfig` → validated → converted to `BucketNotificationConfig` → `RulesMap`. + +Key validation rules: +- Lambda and Topic configurations are **not supported** (return `UnsupportedConfiguration` error) +- Only `QueueConfiguration` is supported (maps to all target types, not just SQS) +- One prefix filter and one suffix filter maximum +- Filter values: ≤1024 chars, no `.` or `..` segments, no `\`, valid UTF-8 +- No duplicate event names within a queue config +- ARN must exist in the configured target list + +### 4.2 RulesMap + +`RulesMap` maps `EventName` → `PatternRules` → `TargetIdSet`: + +- Compound events (like `ObjectCreatedAll`) are **expanded** into specific events on insertion +- Pattern matching: prefix/suffix wildcards (e.g., `uploads/*.csv`) +- URL-encoded keys are matched against both encoded and decoded patterns +- Bitmask-based fast path: `total_events_mask` enables O(1) `has_subscriber()` checks + +### 4.3 Dynamically Reconfigurable + +- `NotificationSystem::set_target_config()` — add/update a target +- `NotificationSystem::remove_target_config()` — remove a target +- `NotificationSystem::load_bucket_notification_config()` — load per-bucket rules +- `NotificationSystem::remove_bucket_notification_config()` — remove per-bucket rules +- `NotificationSystem::reload_config()` — reload from a new `Config` object +- All changes trigger automatic re-initialization of affected targets + +--- + +## 5. Pipeline & Delivery + +### 5.1 Event Flow + +``` +ECStore operation + ↓ +ecstore::event_notification::send_event(EventArgs) + ↓ (OnceLock dispatch hook) +convert EventArgs → notify::EventArgs + ↓ spawn +notifier_global::notify(EventArgs) + ↓ +NotificationSystem::send_event(Arc) + ↓ +NotifyPipeline::send_event() + ├── LiveEventHistory::record() (in-memory, last 1024 events) + ├── broadcast::send() (tokio broadcast channel, capacity 1024) + └── EventNotifier::send() (async, rule-matched delivery) + ├── RuleEngine::match_targets(bucket, event_name, object_key) + └── For each matched target: + ├── EntityTarget construction + ├── If queue_store: persist then async send + └── If no queue_store: immediate async send +``` + +### 5.2 Live Event Stream + +The `NotifyPipeline` provides an in-process event stream via `tokio::sync::broadcast`: + +```rust +// Subscribe to live events +let rx = system.subscribe_live_events(); + +// Check if there are live listeners +system.has_live_listeners(); + +// Get recent events since a sequence number +system.recent_live_events_since(after_sequence, limit) → LiveEventBatch +``` + +- Broadcast channel capacity: 1024 +- `LiveEventHistory` stores last 1024 events with monotonic sequence numbers +- `LiveEventBatch` includes `events: Vec>`, `next_sequence: u64`, `truncated: bool` + +### 5.3 Metrics + +`NotificationMetrics` tracks: +- Processing count (in-flight) +- Processed count (completed) +- Failed count +- Skipped count (no matching targets) + +Per-target `TargetDeliverySnapshot`: +- `total_messages` +- `failed_messages` +- `queue_length` + +--- + +## 6. Live Event Stream + +### 6.1 In-Process Subscription + +The live event stream is useful for alknet because it provides a **push-based** event feed without requiring external message brokers: + +```rust +// This can be used from within the same process +let mut rx = notification_system.subscribe_live_events(); +while let Ok(event) = rx.recv().await { + // event: Arc — full S3 event record + println!("Event: {} on {}/{}", event.event_name, event.s3.bucket.name, event.s3.object.key); +} +``` + +### 6.2 Event History Replay + +The `LiveEventHistory` supports catch-up subscriptions: + +```rust +// Get events since sequence number 42 +let batch = system.recent_live_events_since(42, 100).await; +// batch.next_sequence → next sequence to request +// batch.truncated → whether there are more events +// batch.events → Vec> +``` + +--- + +## 7. S3 Select + +### 7.1 Architecture Overview + +RustFS implements S3 Select using **Apache DataFusion** as the SQL engine: + +``` +SelectObjectContentRequest + ↓ validation (expression type, input/output format, scan range) + ↓ preflight (get object info, validate SSE headers) + ↓ create EcObjectStore (DataFusion ObjectStore adapter) + ↓ get_global_db(input) → QueryDispatcher + ↓ Query::new(Context, expression) → execute + ↓ DataFusion SQL parser → logical plan → optimized → physical plan → RecordBatch stream + ↓ SelectOutputEncoder → CSV or JSON → chunked (128KB) → event stream +``` + +### 7.2 Key Crates + +| Crate | Purpose | +|-------|---------| +| `rustfs_s3select_api` | Query error types, `Context`, `Query`, `QueryResult`, `DatabaseManagerSystem` trait, object store | +| `rustfs_s3select_query` | SQL implementation: parser, analyzer, optimizer, function manager, execution, dispatcher | + +### 7.3 SQL Engine + +- **Parser**: Custom `RustFsDialect` + `ExtParser` extending DataFusion's SQL parser +- **Supports**: Single SELECT statements only (multi-statement is rejected) +- **Optimizer**: `CascadeOptimizerBuilder` (DataFusion's default rule set) +- **Scheduler**: `LocalScheduler` (single-node execution) +- **Functions**: All of DataFusion's built-in scalar, aggregate, and window functions + +### 7.4 Input Formats + +| Format | Support | Notes | +|--------|---------|-------| +| **CSV** | ✅ Full | `FileHeaderInfo` (NONE, USE, IGNORE), custom delimiters, quote chars, comment chars, record delimiters | +| **JSON (LINES)** | ✅ Full | NDJSON line-by-line streaming | +| **JSON (DOCUMENT)** | ✅ Limited | Max 128 MiB (OOM guard); no scan range support | +| **Parquet** | ✅ Full | Columnar format | +| **Compression** | ❌ Not supported | Only `NONE` compression currently accepted | + +### 7.5 Output Formats + +| Format | Options | +|--------|---------| +| **CSV** | Custom field delimiter, quote character, quote escape, record delimiter, quote fields (ALWAYS/ASNEEDED) | +| **JSON** | Line-delimited (NDJSON); custom record delimiter | + +### 7.6 Expression Limitations + +- Max expression size: 256 KiB (`MAX_SELECT_EXPRESSION_BYTES`) +- Expression type must be `SQL` +- No `AllowQuotedRecordDelimiter` support for CSV +- Scan ranges: + - CSV: supported + - JSON LINES: supported + - JSON DOCUMENT: **not supported** + - Parquet: supported + - Range must be valid (start < end, start < object size) + +### 7.7 Object Store Integration + +`EcObjectStore` implements DataFusion's `ObjectStore` trait, adapting rustfs's ECStore for query execution: +- Handles `GET` with optional byte ranges (scan range) +- JSON DOCUMENT mode: entire file buffered for DOM parsing, then flattened to NDJSON +- JSON sub-path extraction: `FROM s3object.some.path` navigates to the key before flattening +- Respects SSE-C headers for encrypted objects + +### 7.8 Streaming Response + +Results are streamed as S3 event types: +1. `Cont` event (continuation marker) +2. `Records` events (128KB chunks) +3. `Progress` events (if `RequestProgress.Enabled=true`) — currently only `BytesReturned` populated +4. `Stats` event (final) +5. `End` event + +### 7.9 Error Mapping + +| QueryError | S3 Error | +|-----------|----------| +| `Parser` | `ParseSelectFailure` (400) | +| `MultiStatement` | `UnsupportedSqlStructure` | +| `NotImplemented` | `NotImplemented` | +| `Datafusion` (scan range) | `InvalidRequestParameter` | +| `Datafusion` (missing binding) | `EvaluatorBindingDoesNotExist` | +| `Datafusion` (other) | `UnsupportedSqlOperation` | +| `StoreError` (bucket not found) | `NoSuchBucket` | +| `StoreError` (object not found) | `NoSuchKey` | +| `StoreError` (other) | `InternalError` | + +--- + +## 8. Mapping to alknet + +### 8.1 rustfs Events → alknet Integration Events + +rustfs events are **integration events from rustfs's perspective** and remain **integration events from alknet's perspective**. This is the correct cross-boundary classification per ADR-032. + +#### Event Projection: `rustfs::BucketNotificationEvent` → `alknet::EventEnvelope` + +Suggested namespace and operation mapping: + +| rustfs EventName | alknet Namespace | alknet Operation | +|------------------|-----------------|-----------------| +| `s3:ObjectCreated:Put` | `storage.object` | `created.put` | +| `s3:ObjectCreated:Post` | `storage.object` | `created.post` | +| `s3:ObjectCreated:Copy` | `storage.object` | `created.copy` | +| `s3:ObjectCreated:CompleteMultipartUpload` | `storage.object` | `created.multipart-complete` | +| `s3:ObjectRemoved:Delete` | `storage.object` | `removed.delete` | +| `s3:ObjectRemoved:DeleteMarkerCreated` | `storage.object` | `removed.delete-marker-created` | +| `s3:ObjectAccessed:Get` | `storage.object` | `accessed.get` | +| `s3:ObjectAccessed:Head` | `storage.object` | `accessed.head` | +| `s3:BucketCreated:*` | `storage.bucket` | `created` | +| `s3:BucketRemoved:*` | `storage.bucket` | `removed` | + +The full `Event` record from rustfs should be preserved in the `EventEnvelope.payload` field for traceability, while a normalized `metadata` extraction provides fast-path access: + +```rust +// Pseudocode for mapping +fn project_rustfs_event(event: &rustfs_notify::Event) -> alknet::EventEnvelope { + let namespace = if event.event_name == EventName::BucketCreated || event.event_name == EventName::BucketRemoved { + "storage.bucket" + } else { + "storage.object" + }; + + let operation = event.event_name.as_str() // "s3:ObjectCreated:Put" + .strip_prefix("s3:") // "ObjectCreated:Put" + .unwrap_or("unknown") + .to_lowercase() + .replace(':',, "."); + + EventEnvelope { + id: uuid::Uuid::new_v4(), + namespace: namespace.into(), + operation: operation.into(), // e.g., "objectcreated.put" + timestamp: event.event_time, + source: "rustfs".into(), + metadata: json!({ + "bucket": event.s3.bucket.name, + "key": event.s3.object.key, + "size": event.s3.object.size, + "eTag": event.s3.object.e_tag, + "versionId": event.s3.object.version_id, + "sequencer": event.s3.object.sequencer, + "principalId": event.user_identity.principal_id, + }), + payload: serde_json::to_value(event).ok(), + } +} +``` + +### 8.2 Subscription Architecture + +#### Option A: In-Process Live Event Stream (Recommended) + +Since alknet and rustfs share the same process, alknet can subscribe to the live event stream directly: + +```rust +// In alknet's initialization +let notification_system = rustfs_notify::notification_system().unwrap(); +let mut event_rx = notification_system.subscribe_live_events(); + +// In alknet's event loop +tokio::spawn(async move { + while let Ok(event) = event_rx.recv().await { + let envelope = project_rustfs_event(&event); + alknet::honker::publish(envelope).await; + } +}); +``` + +**Advantages**: +- Zero-latency, zero-serialization overhead +- No network hop +- Direct access to `Arc` in-process +- alknet's Honker streams get events immediately + +**Considerations**: +- `has_live_listeners()` can be checked before performing expensive event construction +- The broadcast channel capacity is 1024; slow consumers will miss events (acceptable for integration events) +- `recent_live_events_since()` allows catch-up after reconnection + +#### Option B: External Target via Webhook/Kafka/etc. + +If alknet runs as a separate process, configure a webhook or Kafka target pointing to alknet's event ingestion endpoint: + +```json +{ + "notify_webhook": { + "1": { + "enable": true, + "endpoint": "https://alknet.internal/events/rustfs", + "auth_token": "Bearer alknet-secret" + } + } +} +``` + +**Advantages**: +- Decoupled deployment +- RustFS's queue store provides at-least-once delivery + +**Considerations**: +- Network latency and serialization overhead +- Need to handle deduplication (at-least-once means possible duplicates) +- Queue store provides durability if alknet is temporarily unavailable + +#### Option C: Hybrid — Live Stream + Webhook Fallback + +For maximum reliability: +1. In-process live stream for low-latency event propagation +2. Webhook/Kafka target as a fallback for events missed during restarts +3. Use `sequentor` ordering to detect gaps + +### 8.3 S3 Select → alknet Operations + +S3 Select can be exposed as an alknet operation: + +| alknet Operation | Description | +|-----------------|-------------| +| `storage.select` | Run an S3 Select SQL query on an object | +| `storage.select-status` | Check Select availability (optional) | + +```rust +// Example alknet call protocol operation +fn handle_storage_select(params: StorageSelectParams) -> Result { + // 1. Construct SelectObjectContentInput + // 2. Call existing rustfs SelectObjectContent handler + // 3. Stream results back through alknet call protocol +} +``` + +#### Use Cases for alknet + +1. **Metagraph Queries**: Query stored metagraph JSON/CSV objects without downloading them entirely + ```sql + SELECT s.name, s.version FROM S3Object s WHERE s.type = 'service' + ``` + +2. **Log Analytics**: Query structured log data stored in S3 + ```sql + SELECT COUNT(*) as cnt, s.level FROM S3Object s WHERE s.timestamp > '2026-01-01' GROUP BY s.level + ``` + +3. **Ad-hoc Data Exploration**: Quick data inspection without full downloads + ```sql + SELECT * FROM S3Object s LIMIT 100 + ``` + +4. **Aggregation Pipelines**: Pre-process data before moving to alknet's internal stores + +### 8.4 ADR-032 Implications: Cross-Boundary Event Flow + +Per ADR-032, rustfs events are **integration events** — they represent facts about state changes that have already happened in the storage system boundary. When alknet consumes them: + +``` +┌─────────────┐ ┌─────────────┐ +│ rustfs │ │ alknet │ +│ (bounded │ integration │ (bounded │ +│ context) │───── event ─────────▶│ context) │ +│ │ │ │ +│ S3 Object │ EventEnvelope │ Honker │ +│ Created/ │ namespace: │ Stream │ +│ Removed/ │ "storage.object" │ Subscriber │ +│ Accessed │ operation: │ │ +│ │ "created.put" │ Call │ +│ │ │ Protocol │ +│ S3 Select │ storage.select │ Operation │ +│ Results │◀──── call ──────────│ │ +└─────────────┘ └─────────────┘ +``` + +Key points: +1. **Events flow inward**: rustfs → alknet (integration events entering alknet's boundary) +2. **Calls flow outward**: alknet → rustfs (alknet initiates S3 Select as a call) +3. **No shared domain model**: alknet shouldn't reference rustfs's `Event` struct directly in its domain; it projects into its own `EventEnvelope` format +4. **Eventual consistency**: rustfs notifications may arrive out of order; `sequentor` field provides ordering within a bucket +5. **At-least-once delivery**: If using webhook/Kafka targets, duplicate events are possible; alknet must be idempotent +6. **No orchestration across boundaries**: alknet doesn't tell rustfs to emit events; it subscribes to events rustfs naturally produces + +### 8.5 Implementation Recommendations + +1. **Short-term**: Use the **in-process live event stream** to subscribe to rustfs events and re-emit them through alknet's Honker system. This gives immediate value with minimal integration work. + +2. **Medium-term**: Add a **webhook notification target** pointing at an alknet HTTP endpoint for redundancy. Configure bucket notification rules via the S3 API (PutBucketNotificationConfiguration). + +3. **Long-term**: Consider implementing an **alknet NATS target** that directly publishes events into alknet's NATS infrastructure, bypassing the HTTP layer entirely for lower latency. + +4. **S3 Select**: Expose via alknet's call protocol as `storage.select`. The existing `execute_select_object_content` function can be called directly as a library function since alknet and rustfs share the same process. + +5. **Event schema versioning**: Store the `event_version` field from rustfs events in alknet's `EventEnvelope.metadata` to handle future schema evolution. + +--- + +## 9. References + +### Source Code Locations + +| Component | Path | +|-----------|------| +| Event structure | `/crates/notify/src/event.rs` | +| EventName enum | `/crates/s3-types/src/event_name.rs` | +| NotifyPipeline + LiveEventHistory | `/crates/notify/src/pipeline.rs` | +| EventNotifier + TargetList | `/crates/notify/src/notifier.rs` | +| NotificationSystem | `/crates/notify/src/integration.rs` | +| Rule engine | `/crates/notify/src/rule_engine.rs` | +| RulesMap | `/crates/notify/src/rules/rules_map.rs` | +| Bucket notification config | `/crates/notify/src/rules/config.rs` | +| XML notification config | `/crates/notify/src/rules/xml_config.rs` | +| Target trait + QueuedPayload | `/crates/targets/src/target/mod.rs` | +| Webhook target | `/crates/targets/src/target/webhook.rs` | +| Kafka target | `/crates/targets/src/target/kafka.rs` | +| AMQP target | `/crates/targets/src/target/amqp.rs` | +| NATS target | `/crates/targets/src/target/nats.rs` | +| Redis target | `/crates/targets/src/target/redis.rs` | +| MQTT target | `/crates/targets/src/target/mqtt.rs` | +| MySQL target | `/crates/targets/src/target/mysql.rs` | +| PostgreSQL target | `/crates/targets/src/target/postgres.rs` | +| Pulsar target | `/crates/targets/src/target/pulsar.rs` | +| ARN + TargetID | `/crates/targets/src/arn.rs` | +| ECStore event dispatch | `/crates/ecstore/src/event_notification.rs` | +| Server event init | `/rustfs/src/server/event.rs` | +| S3 Select handler | `/rustfs/src/app/select_object.rs` | +| S3 Select query engine | `/crates/s3select-query/src/` | +| S3 Select API | `/crates/s3select-api/src/` | +| S3 Select object store | `/crates/s3select-api/src/object_store.rs` | +| Config subsystem names | `/crates/config/src/notify/mod.rs` | + +### AWS S3 Documentation + +- [S3 Event Notification Configuration](https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventNotifications.html) +- [S3 Select Documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/selecting-content-from-objects.html) + +### Internal References + +- `/workspace/@alkdev/alknet/docs/research/references/rustfs/rustfs-reference.md` — Companion document covering auth, architecture, and credential mapping \ No newline at end of file