- rustfs-events-select.md: deep dive into rustfs S3 event notification system (9 target types, 30+ event types, rule engine, queue store) and S3 Select (DataFusion-based SQL, CSV/JSON/Parquet input) - honker-reference.md: deep dive into honker SQLite extension for pub/sub, queue, and notification — core primitives, SQL API, wake mechanism, single-machine design, and mapping to alknet storage patterns
51 KiB
Research: Honker — SQLite Pub/Sub, Queue, and Notification Extension
Key Findings
- Honker is a Rust-based SQLite extension that adds Postgres-style NOTIFY/LISTEN semantics plus durable pub/sub, task queues, and event streams entirely within SQLite. It eliminates the need for a separate message broker (Redis, Kafka) when SQLite is the primary datastore.
- Three core primitives:
notify/listen(ephemeral pub/sub),stream(durable pub/sub with per-consumer offsets), andqueue(at-least-once work queue with retries, priority, delayed jobs, and dead-letter handling). All three are SQL INSERTs inside your transaction — business write and side-effect commit or roll back together. - Wake mechanism: Uses
PRAGMA data_versionpolling at 1ms granularity to detect commits, achieving ~1-2ms median cross-process wake latency without requiring a daemon or broker. A single thread per database fans out to N subscribers via bounded channels. - Single-machine, single-writer model: Designed for self-hosted deployments. Not distributed — no multi-node replication. This maps perfectly to alknet's per-node architecture where domain events are internal to a service boundary (ADR-032).
- Comprehensive SQL API: 30+ SQL scalar functions (
honker_enqueue,honker_claim_batch,honker_ack_batch,honker_stream_publish,honker_stream_read_since,honker_stream_save_offset,notify,honker_lock_acquire,honker_rate_limit_try,honker_scheduler_register, etc.) registered as a loadable SQLite extension. Any language that canSELECT load_extension('honker')gets the same features. - Rust core (
honker-core): All SQL implementations live in a shared Rust crate consumed by the loadable extension, PyO3 Python binding, napi-rs Node binding, and other language wrappers. One source of truth for the SQL — no behavioral drift across bindings. - License: Apache 2.0 / MIT dual-license. Fully permissive for integration.
Recommendation: Adopt honker's patterns directly in alknet-storage. The honker crate (or honker-core for a Rust-native integration) should be a dependency of alknet-storage. Honker's single-node model aligns with alknet's event boundary discipline — domain events stay within the service boundary, and cross-node events go through the call protocol. For production deployments that use Postgres instead of SQLite, the same patterns (queue/claim, stream/subscribe, notify/listen) can be replicated using Postgres features, but honker's built-in retry, visibility timeout, and scheduling would need to be reimplemented.
1. Architecture
What Is Honker?
Honker is a SQLite extension + language bindings that adds Postgres-style NOTIFY/LISTEN semantics to SQLite, with built-in durable pub/sub, task queues, and event streams — without requiring a client-polling loop, a daemon, or a separate broker.
Core idea: If SQLite is your primary datastore, your queue should live in the same file. INSERT INTO orders and queue.enqueue(...) commit in the same transaction. Rollback drops both.
Implementation language: Rust. The shared engine is honker-core, a plain Rust rlib crate. Language bindings (Python via PyO3, Node via napi-rs, Go via CGo, Ruby via C extension, .NET via P/Invoke, JVM via JNI, Kotlin wrapper, Elixir via NIF, C++ via header-only wrapper) are thin wrappers around the loadable extension's SQL functions.
How it works as a SQLite extension: The honker-extension crate compiles to libhonker_ext.{so,dylib,dll}. Any SQLite 3.9+ client loads it:
.load ./libhonker_ext
SELECT honker_bootstrap();
This creates the schema tables (_honker_live, _honker_dead, _honker_notifications, _honker_stream, _honker_stream_consumers, _honker_locks, _honker_rate_limits, _honker_scheduler_tasks, _honker_results) and registers all SQL scalar functions. The extension and Python/binding tables are shared, so a Python worker can claim jobs any other language pushed via the extension.
Crate Structure
honker-core/ # Rust rlib shared across all bindings (published on crates.io)
honker-extension/ # SQLite loadable extension (cdylib, published on crates.io)
packages/
honker/ # Python package (PyO3 cdylib + Queue/Stream/Outbox/Scheduler)
honker-node/ # napi-rs Node.js binding
honker-rs/ # Ergonomic Rust wrapper
honker-go/ # Go binding
honker-ruby/ # Ruby binding
honker-bun/ # Bun binding
honker-ex/ # Elixir binding
honker-cpp/ # C++ binding
honker-dotnet/ # .NET / C# binding
honker-jvm/ # JVM / Java-compatible binding
honker-kotlin/ # Kotlin convenience wrapper
Wake Path Architecture
The fundamental challenge for any SQLite-based pub/sub system: SQLite has no wire protocol or server-push. Consumers must initiate reads. Honker solves this with a single-digit-microsecond PRAGMA data_version read:
- One PRAGMA-poll thread per
Databasequeriesdata_versionevery 1ms - Counter change → fan out a tick to each subscriber's bounded channel (capacity 1 — coalesces redundant wakes)
- Each subscriber runs
SELECT … WHERE id > last_seenagainst a partial index, yields rows, returns to wait - 100 subscribers = 1 poll thread. Idle listeners run zero SQL queries.
Idle cost: ~3.5µs per PRAGMA data_version query, ~3.5ms/sec total at 1kHz. A 5-second paranoia poll exists as a fallback only if the update watcher cannot fire.
Three backend options (controlled by WatcherBackend enum):
- Polling (default, stable):
PRAGMA data_versionevery 1ms. Correct on all platforms. - Kernel (experimental,
kernel-watcherCargo feature): Usesnotify-rsfilesystem events. Fires on every filesystem write. May produce spurious/missed wakes. Dead-man's switch for file replacement. - SHM fast path (experimental,
shm-fast-pathCargo feature): Memory-maps the-shmWAL index file and readsiChangeat offset 8 at ~100µs cadence. WAL-mode only. Dead-man's switch for file replacement.
Dead-man's switch: All backends check file identity (dev, ino) / (volume_serial, file_index) every ~100ms. If the database file is replaced (atomic rename, litestream restore, volume remount), the watcher panics with a clear error message. Subscribers see an error from update_events() instead of hanging silently.
SharedUpdateWatcher
pub struct SharedUpdateWatcher {
watcher: Mutex<Option<UpdateWatcher>>, // background poll thread
senders: Arc<Mutex<HashMap<u64, SyncSender<()>>>>, // fan-out channels
next_id: AtomicU64,
}
subscribe()→(u64, Receiver<()>)— register a channel; capacity 1unsubscribe(id)— remove channel; receiver seesErr(RecvError)close()— join the poll thread, clear all subscribers- Wakes are idempotent "go re-read state" signals. Dropped redundant wakes never lose data.
2. Core Capabilities
2.1 Notify/Listen — Ephemeral Pub/Sub
What it is: Fire-and-forget notifications to channel subscribers. Like pg_notify but with table-backed persistence until explicitly pruned.
How it works:
notify(channel, payload)is a SQL scalar function that INSERTs into_honker_notificationsand returns the row id. Runs inside the caller's open transaction — rollbacks drop the notification.db.listen(channel)ordb.updateEvents()in Node — registers a subscriber that wakes on any database commit, then filters by channel in theSELECTpath.- Listeners attach at current
MAX(id); history is not replayed. This is the key distinction from streams.
Schema:
CREATE TABLE _honker_notifications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
channel TEXT NOT NULL,
payload TEXT NOT NULL,
created_at INTEGER NOT NULL DEFAULT (unixepoch())
);
CREATE INDEX _honker_notifications_recent ON _honker_notifications(channel, id);
Key characteristics:
- Not auto-pruned. Call
db.prune_notifications(older_than_s=…, max_keep=…)from a scheduled task. - Over-triggering is by design: a
data_versionchange wakes every subscriber on that database, not just the matching channel. Each wasted wake = one indexed SELECT (microseconds). A missed wake = a silent correctness bug. - Payload must be valid JSON for cross-language compatibility.
2.2 Queue — At-Least-Once Work Queue
What it is: Durable, at-least-once delivery work queue with retries, priority, delayed jobs, task expiration, dead-letter handling, named locks, and rate-limiting.
Schema (single-table hybrid):
CREATE TABLE _honker_live (
id INTEGER PRIMARY KEY AUTOINCREMENT,
queue TEXT NOT NULL,
payload TEXT NOT NULL,
state TEXT NOT NULL DEFAULT 'pending', -- 'pending' | 'processing'
priority INTEGER NOT NULL DEFAULT 0,
run_at INTEGER NOT NULL DEFAULT (unixepoch()), -- for delayed jobs
worker_id TEXT,
claim_expires_at INTEGER, -- visibility timeout
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
created_at INTEGER NOT NULL DEFAULT (unixepoch()),
expires_at INTEGER -- job expiration
);
CREATE INDEX _honker_live_claim
ON _honker_live(queue, priority DESC, run_at, id)
WHERE state IN ('pending', 'processing');
CREATE TABLE _honker_dead (
id INTEGER PRIMARY KEY,
queue TEXT NOT NULL,
payload TEXT NOT NULL,
priority INTEGER NOT NULL DEFAULT 0,
run_at INTEGER NOT NULL DEFAULT 0,
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
created_at INTEGER NOT NULL DEFAULT (unixepoch()),
died_at INTEGER NOT NULL DEFAULT (unixepoch())
);
Claim/ack/nack model:
| Operation | SQL | Notes |
|---|---|---|
| Enqueue | INSERT INTO _honker_live (queue, payload, run_at, priority, max_attempts, expires_at) VALUES (…) |
Returns auto-increment id |
| Claim | UPDATE _honker_live SET state='processing', worker_id=?, claim_expires_at=unixepoch()+?, attempts=attempts+1 WHERE id IN (SELECT id FROM _honker_live WHERE queue=? AND state IN ('pending','processing') AND (expires_at IS NULL OR expires_at > unixepoch()) AND ((state='pending' AND run_at <= unixepoch()) OR (state='processing' AND claim_expires_at < unixepoch())) ORDER BY priority DESC, run_at ASC, id ASC LIMIT ?) RETURNING … |
One UPDATE … RETURNING via partial index |
| Ack | DELETE FROM _honker_live WHERE id=? AND worker_id=? AND claim_expires_at >= unixepoch() RETURNING id |
Returns 1 if claim still valid, 0 if expired |
| Retry | UPDATE _honker_live SET state='pending', run_at=unixepoch()+?, worker_id=NULL, claim_expires_at=NULL WHERE id=? + notify on queue channel |
If attempts >= max_attempts, DELETE from _honker_live and INSERT into _honker_dead |
| Fail | DELETE FROM _honker_live WHERE id=? AND worker_id=? AND claim_expires_at >= unixepoch() RETURNING … + INSERT INTO _honker_dead |
Unconditionally move to dead letter |
| Heartbeat | UPDATE _honker_live SET claim_expires_at=unixepoch()+? WHERE id=? AND worker_id=? AND state='processing' |
Extend claim for long-running handlers |
| Cancel | DELETE FROM _honker_live WHERE id=? AND state IN ('pending', 'processing') |
Idempotent |
Visibility timeout: Default 300 seconds (claim_expires_at = unixepoch() + 300). If a worker crashes mid-job, the claim expires and another worker reclaims. attempts increments. After max_attempts (default 3), the row moves to _honker_dead.
Priority: Higher priority value = claimed first. The partial index on (queue, priority DESC, run_at, id) ensures claim path is bounded by working-set size, not history size.
Delayed jobs: Set run_at to a future timestamp. Workers only claim rows where run_at <= unixepoch(). The run_at deadline also wakes sleeping workers through honker_queue_next_claim_at().
Task expiration: Set expires_at on enqueue. Expired jobs are filtered from the claim path. Call queue.sweep_expired() to move them to _honker_dead with last_error='expired'.
Named locks: honker_lock_acquire(name, owner, ttl_s) → 1 (got it) or 0 (held). honker_lock_release(name, owner) → 1 (released) or 0 (not yours). Uses _honker_locks table with TTL-based expiration. Primary use case: cron tasks that shouldn't overlap (leader election).
Rate limiting: honker_rate_limit_try(name, limit, per) → 1 (under limit) or 0 (at limit). Fixed-window counter. Rejected calls don't inflate the count.
Batch operations: honker_claim_batch(queue, worker_id, n, timeout_s) returns a JSON array of claimed jobs. honker_ack_batch('[1,2,3]', worker_id) acks multiple jobs. Ack is per-transaction for batch — honest bool return.
Task result storage: honker_enqueue() returns the job id. Workers can persist return values via honker_result_save(id, value, ttl_s). Callers await results with queue.wait_result(id, timeout). Opt-in (default save_result=False).
Claim iterator pattern:
async for job in q.claim("worker-1"):
try:
send(job.payload)
job.ack()
except Exception as e:
job.retry(delay_s=60, error=str(e))
Each iteration is claim_batch(worker_id, 1). Wakes on database update from any process, or when the next run_at / reclaim deadline arrives. 5-second paranoia poll is the only fallback.
Queue notifications: Each enqueue also fires a notification on honker:<queue> channel so workers wake immediately without waiting for the next poll cycle.
2.3 Stream — Durable Pub/Sub with Per-Consumer Offsets
What it is: Durable event stream where each named consumer tracks its own offset. Events persist until explicitly pruned. At-least-once delivery with configurable offset flush cadence.
Schema:
CREATE TABLE _honker_stream (
offset INTEGER PRIMARY KEY AUTOINCREMENT,
topic TEXT NOT NULL,
key TEXT,
payload TEXT NOT NULL,
created_at INTEGER NOT NULL DEFAULT (unixepoch())
);
CREATE INDEX _honker_stream_topic
ON _honker_stream(topic, offset);
CREATE TABLE _honker_stream_consumers (
name TEXT NOT NULL,
topic TEXT NOT NULL,
offset INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (name, topic)
);
API:
| Function | Returns | Notes |
|---|---|---|
honker_stream_publish(topic, key_or_null, payload_json) |
offset |
INSERTs into _honker_stream + fires notification on honker:stream:<topic> |
honker_stream_read_since(topic, offset, limit) |
JSON array | Reads rows where offset > ? ordered by offset |
honker_stream_save_offset(consumer, topic, offset) |
1 or 0 | Monotonic upsert — never rewinds. 1 = advanced, 0 = existing offset ≥ new |
honker_stream_get_offset(consumer, topic) |
offset or 0 | Returns saved offset for consumer/topic pair |
Python binding:
stream = db.stream("user-events")
stream.publish({"user_id": uid, "change": "name"}, tx=tx)
async for event in stream.subscribe(consumer="dashboard"):
await push_to_browser(event)
Subscribe behavior:
- Replay rows past
offset > saved_offsetin batches (default 1000 rows) - Transition to live delivery on commit wake
- Auto-save offset at most every 1000 events or every 1 second (whichever first)
- At-least-once: a crash re-delivers in-flight events up to the last flushed offset
- Override auto-save with
save_every_n=/save_every_s=; set both to 0 for manual control
Transaction coupling: stream.publish(payload, tx=tx) inserts into _honker_stream inside the caller's transaction. Rollback drops the event. This is the transactional outbox pattern without a separate dispatch table.
2.4 Scheduler — Time-Triggered Cron Tasks
Schema:
CREATE TABLE _honker_scheduler_tasks (
name TEXT PRIMARY KEY,
queue TEXT NOT NULL,
cron_expr TEXT NOT NULL,
payload TEXT NOT NULL,
priority INTEGER NOT NULL DEFAULT 0,
expires_s INTEGER,
next_fire_at INTEGER NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1
);
API:
SELECT honker_scheduler_register('nightly', 'backups', '0 3 * * *', '"go"', 0, NULL);
SELECT honker_scheduler_tick(unixepoch()); -- JSON: fires due
SELECT honker_scheduler_soonest(); -- min next_fire_at
SELECT honker_scheduler_unregister('nightly'); -- 1 = deleted
SELECT honker_scheduler_pause('nightly'); -- 1 = paused
SELECT honker_scheduler_resume('nightly'); -- 1 = resumed
SELECT honker_scheduler_list(); -- JSON array of all schedules
SELECT honker_scheduler_update('nightly', '0 4 * * *', NULL, NULL, NULL, 0);
Supports: 5-field cron, 6-field cron (with seconds), @every <n><unit> interval expressions.
Leader election via named lock: db.lock('honker-scheduler', ttl=60). Two scheduler processes can't both fire. The lock is heartbeat-refreshed every 30s.
Missed-fire catch-up: If the scheduler was down for 4 hours with an hourly schedule, the first iteration fires all 4 missed boundaries (with expires= to drop stale ones).
Fires = enqueue: The scheduler never runs handlers. It enqueues into the task queue. Regular workers consume.
2.5 Outbox Pattern
The outbox is a convenience wrapper around the Queue primitive:
db.outbox("emails", delivery=send_email)
db.outbox("emails").enqueue({"to": "alice@example.com"}, tx=tx)
db.outbox("emails").run_worker("worker-1")
Failures retry with exponential backoff (base_backoff_s * 2^(attempts-1)) up to max_attempts, then land in _honker_dead.
3. Persistence and Reliability
Durability Guarantees
- Atomic commit: Business write + side-effect enqueue/event/notify commit together or roll back together. This is SQLite ACID — the transactional outbox pattern is built into the primitives, not bolted on.
- SIGKILL safety: Verified in
tests/test_crash_recovery.py. Subprocess killed pre-COMMIT →PRAGMA integrity_check == 'ok', zero in-flight rows, no stale write lock, queue round-trip works post-crash. - Worker crash recovery: If a worker crashes mid-job, the claim expires after
visibility_timeout_s(default 300s) and another worker reclaims.attemptsincrements on each claim. Aftermax_attempts(default 3), the row moves to_honker_dead. - Stream at-least-once: Offsets auto-flush every 1000 events or 1 second. A crash re-delivers in-flight events up to the last flushed offset. The crash window is bounded by the flush thresholds.
- Notify has no replay: Listeners attach at
MAX(id). Pruned events are gone. For durable replay, use streams.
WAL Mode
Recommended default (journal_mode = WAL). Gives concurrent readers with one writer and efficient fsync batching (wal_autocheckpoint = 10000). Other journal modes work but lose WAL's concurrent-read-while-writing property. Wake detection (PRAGMA data_version) works in all journal modes.
What Happens on Crash
| Scenario | Result |
|---|---|
| Process SIGKILL mid-TRANSACTION | SQLite atomic-commit rollback. In-flight write did not land. Fresh process can acquire write lock immediately. |
| Worker process crash mid-job | Claim expires after visibility_timeout. Another worker reclaims. attempts increments. |
| Stream consumer crash | Resumes from last auto-saved offset (at-least-once). Pending offset is lost. |
| Database file replaced (litestream restore) | Watcher panics with clear error message. All subscribers see error from update_events(). Must reopen database. |
What Honker Does NOT Provide
- Multi-writer replication: SQLite's locking is for single-host. Two servers writing one
.dbover NFS will corrupt it. Shard by file or switch to Postgres. - In-memory database support:
:memory:creates a separate database per connection, splitting writer/readers/watchers. Use temp file-backed.dbfor tests. - Cross-node distribution: Honker is single-machine. No built-in mechanism for distributing events across nodes. (This is intentional — see alknet relevance below.)
- Task pipelines/chains/groups/chords. Deliberately not built.
- Workflow orchestration with DAGs. Deliberately not built.
- Ordering guarantees across queues. Each queue is independent.
- Exactly-once delivery. Honker provides at-least-once. Idempotent handlers are the user's responsibility.
4. Performance
Benchmarks (M-series, release build, median of 3)
| Operation | Throughput |
|---|---|
| enqueue (1/tx) | ~8,000/sec |
| enqueue (100/tx) | ~110,000/sec |
| claim + ack (individual) | ~4,500/sec |
| claim_batch + ack_batch (32) | ~75,000/sec |
| claim_batch + ack_batch (128) | ~110,000/sec |
| async iter end-to-end | ~6,500/sec |
| stream replay | ~1,000,000/sec |
| stream live e2e p50 | 0.24ms |
| stream live e2e p99 | 8ms |
Cross-Process Wake Latency
Median ~1-2ms on M-series, bounded by the 1ms PRAGMA data_version poll cadence. 600-second soak test under sustained ~75 commits/sec showed zero missed wakes, zero drift, PRAGMA integrity_check = ok.
Claim Performance at Scale
With 100,000 dead rows in _honker_dead:
| Operation | Claim+ack |
|---|---|
| 0 dead rows (fresh DB) | ~4,000/sec |
| 100k dead rows | ~3,500/sec |
The partial index (queue, priority DESC, run_at, id) WHERE state IN ('pending','processing') keeps the claim hot path bounded by working-set size, not history size.
How It Compares to Polling
Prior to honker's wake mechanism, the alternative would be application-level polling (e.g., SELECT … WHERE id > last_seen every N seconds). Honker replaces this with a single-digit-microsecond PRAGMA read. 100 subscribers still = 1 poll thread. The over-triggering trade-off (waking all subscribers on any commit) is explicitly chosen over potentially missing a wake.
5. SQLite Integration
Loading the Extension
-- Any SQLite 3.9+ client
.load ./libhonker_ext
SELECT honker_bootstrap();
honker_bootstrap() is idempotent — it runs CREATE TABLE IF NOT EXISTS and CREATE INDEX IF NOT EXISTS for all schema tables.
Compile/Load Flags
For Rust integration via rusqlite:
[dependencies]
honker-core = "0.2.3"
rusqlite = { version = "0.39.0", features = ["functions", "hooks"] }
Then in Rust:
use honker_core::{attach_notify, attach_honker_functions, bootstrap_honker_schema, open_conn};
let conn = open_conn("app.db", true)?; // true = install notify
attach_honker_functions(&conn)?;
bootstrap_honker_schema(&conn)?;
For the loadable extension:
cargo build --release -p honker-extension
# Produces: target/release/libhonker_ext.so (or .dylib, .dll)
Rust Crate Usage
use honker_core::SharedUpdateWatcher;
let watcher = SharedUpdateWatcher::new(db_path.clone());
let (sub_id, rx) = watcher.subscribe();
// In a loop:
match rx.recv_timeout(Duration::from_secs(5)) {
Ok(()) => { /* re-read state from SQLite */ },
Err(RecvTimeoutError::Timeout) => { /* paranoia poll */ },
Err(RecvTimeoutError::Disconnected) => { /* watcher died, reopen */ },
}
watcher.unsubscribe(sub_id);
watcher.close()?;
Using with ORM Connections
Load libhonker_ext on the ORM's connection and call honker_bootstrap() inside the ORM's transaction:
# SQLAlchemy
@event.listens_for(engine, "connect")
def _load_honker(conn, _):
honker.load_extension(conn)
conn.execute("SELECT honker_bootstrap()")
with Session(engine) as s, s.begin():
s.add(Order(user_id=42))
s.execute(text("SELECT honker_enqueue(:q, :p, NULL, NULL, 0, 3, NULL)"),
{"q": "emails", "p": '{"to":"alice"}'})
PRAGMA Defaults
Applied on every connection opened via open_conn:
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL; -- fsync WAL at checkpoint, not every commit
PRAGMA busy_timeout = 5000; -- wait up to 5s for writer lock
PRAGMA foreign_keys = ON;
PRAGMA cache_size = -32000; -- 32MB page cache (default was 2MB)
PRAGMA temp_store = MEMORY; -- temp B-trees in RAM
PRAGMA wal_autocheckpoint = 10000; -- fsync every 10k WAL pages
6. Complete API Surface
Notification Functions
| Function | Signature | Returns | Notes |
|---|---|---|---|
notify(channel, payload) |
Scalar, 2 args | rowid |
INSERTs into _honker_notifications, returns auto-generated id |
Queue Functions
| Function | Signature | Returns | Notes |
|---|---|---|---|
honker_bootstrap() |
0 args | 1 |
Creates all schema tables/indexes. Idempotent. |
honker_enqueue(queue, payload, run_at_or_null, delay_or_null, priority, max_attempts, expires_or_null) |
7 args | id |
INSERTs job. Delay overrides run_at. |
honker_claim_batch(queue, worker_id, n, timeout_s) |
4 args | JSON array | Claims up to n jobs. Each gets claim_expires_at = now + timeout_s. |
honker_ack_batch(ids_json, worker_id) |
2 args | count |
ACKs (DELETEs) claimed jobs. ids_json is [1,2,3]. |
honker_ack(job_id, worker_id) |
2 args | 1 or 0 |
Single-job ack. Returns 0 if claim expired. |
honker_retry(job_id, worker_id, delay_s, error) |
4 args | 1 or 0 |
Retries (flips back to pending) or fails to dead if attempts >= max_attempts. |
honker_fail(job_id, worker_id, error) |
3 args | 1 or 0 |
Unconditionally moves to _honker_dead. |
honker_heartbeat(job_id, worker_id, extend_s) |
3 args | 1 or 0 |
Extends claim for long-running handlers. |
honker_cancel(job_id) |
1 arg | 1 or 0 |
Removes pending/processing row. Idempotent. |
honker_get_job(job_id) |
1 arg | JSON or "" |
Read job state. Pure read. |
honker_sweep_expired(queue) |
1 arg | count |
Moves expired pending jobs to _honker_dead. |
honker_queue_next_claim_at(queue) |
1 arg | unix_ts or 0 |
Earliest future deadline (run_at or claim_expires_at + 1). |
Stream Functions
| Function | Signature | Returns | Notes |
|---|---|---|---|
honker_stream_publish(topic, key_or_null, payload_json) |
3 args | offset |
INSERTs event + fires notification |
honker_stream_read_since(topic, offset, limit) |
3 args | JSON array | Reads events with offset > ? |
honker_stream_save_offset(consumer, topic, offset) |
3 args | 1 or 0 |
Monotonic upsert. 0 = existing offset ≥ new |
honker_stream_get_offset(consumer, topic) |
2 args | offset or 0 |
Returns saved offset |
Lock Functions
| Function | Signature | Returns | Notes |
|---|---|---|---|
honker_lock_acquire(name, owner, ttl_s) |
3 args | 1 or 0 |
1 = acquired, 0 = held |
honker_lock_release(name, owner) |
2 args | 1 or 0 |
1 = released, 0 = not yours |
Rate Limit Functions
| Function | Signature | Returns | Notes |
|---|---|---|---|
honker_rate_limit_try(name, limit, per) |
3 args | 1 or 0 |
1 = under limit, 0 = at limit |
honker_rate_limit_sweep(older_than_s) |
1 arg | count |
Prunes expired windows |
Scheduler Functions
| Function | Signature | Returns | Notes |
|---|---|---|---|
honker_scheduler_register(name, queue, cron_expr, payload, priority, expires_s_or_null) |
6 args | 1 |
Upserts task. Computes next_fire_at. |
honker_scheduler_unregister(name) |
1 arg | 0 or 1 |
Deletes task. |
honker_scheduler_tick(now_unix) |
1 arg | JSON array | Fires due tasks, enqueues payloads, advances next_fire_at. |
honker_scheduler_soonest() |
0 args | unix_ts or 0 |
Earliest next_fire_at for sleep duration calculation. |
honker_scheduler_pause(name) |
1 arg | 0 or 1 |
Toggles enabled = 0. |
honker_scheduler_resume(name) |
1 arg | 0 or 1 |
Toggles enabled = 1. |
honker_scheduler_list() |
0 args | JSON array | Returns all schedules with state. |
honker_scheduler_update(name, cron_expr_or_null, payload_or_null, priority_or_null, expires_s_or_null, touch_expires) |
6 args | 0 or 1 |
Mutates schedule fields. Recomputes next_fire_at if cron_expr changed. |
honker_cron_next_after(expr, from_unix) |
2 args | unix_ts |
Pure deterministic function. 5-field, 6-field, or @every <n><unit>. |
Result Functions
| Function | Signature | Returns | Notes |
|---|---|---|---|
honker_result_save(job_id, value_json, ttl_s) |
3 args | 1 |
UPSERTs result. ttl_s=0 = no expiration. |
honker_result_get(job_id) |
1 arg | value or NULL |
Returns result or NULL if expired/missing. |
honker_result_sweep() |
0 args | count |
Prunes expired result rows. |
Watcher Functions (Extension ABI)
| Function | Signature | Returns | Notes |
|---|---|---|---|
honker_update_watcher_open(db_path, backend) |
2 SQL args | id |
Opens a watcher handle. For Elixir and extension consumers. |
honker_update_watcher_wait(id, timeout_ms) |
2 SQL args | 1/0/-1 |
1 = update observed, 0 = timeout, -1 = disconnected |
honker_update_watcher_close(id) |
1 SQL arg | 1 |
Closes watcher handle. |
C ABI (for Go, .NET, C++, Ruby bindings that route through the extension):
| Function | Signature | Returns | Notes |
|---|---|---|---|
honker_watcher_open(db_path, backend, err_buf, err_buf_len) |
C ABI | *mut HonkerWatcherHandle |
Opens a core-backed update watcher. |
honker_watcher_wait(handle, timeout_ms) |
C ABI | 1/0/-1/-2 |
1 = update, 0 = timeout, -1 = closed, -2 = panic |
honker_watcher_close(handle) |
C ABI | void | Closes and frees the handle. |
Tables
| Table | Purpose |
|---|---|
_honker_live |
Pending + processing jobs. Partial index for fast claims. |
_honker_dead |
Terminal jobs (retry-exhausted or explicitly failed). Never scanned by claim path. |
_honker_notifications |
Ephemeral notify/listen messages. Not auto-pruned. |
_honker_stream |
Durable stream events with auto-incrementing offsets. |
_honker_stream_consumers |
Per-consumer stream offsets. Monotonic upsert. |
_honker_locks |
Named advisory locks with TTL expiration. |
_honker_rate_limits |
Fixed-window rate limit counters. |
_honker_scheduler_tasks |
Cron/schedule task definitions with next_fire_at. |
_honker_results |
Task result storage with TTL expiration. |
7. Comparison to Postgres (pg_notify)
| Feature | Honker | pg_notify |
|---|---|---|
| Delivery model | Table-backed INSERT in transaction |
In-memory NOTIFY with LISTEN callback |
| Persistence | Rows survive restart. Not auto-pruned. | Ephemeral — lost on restart, not replayed. |
| Transactional coupling | notify(channel, payload) inside BEGIN IMMEDIATE; INSERT; COMMIT — atomic with business write |
NOTIFY fires at COMMIT inside the same transaction. Atomic with business write. |
| Retry / visibility timeout | Queue has claim_expires_at, attempts, max_attempts, dead-letter. |
No retry. No visibility timeout. |
| Delayed delivery | run_at for scheduled delivery. Jobs only claimable after deadline. |
No scheduling. |
| Cross-process wake | PRAGMA data_version polling at ~1ms cadence. SharedUpdateWatcher fans out to N subscribers. |
Postgres notifies listeners via its inter-process communication. |
| Priority | Queue priority via partial index (queue, priority DESC, run_at, id). |
No priority. |
| Rate limiting | Built-in fixed-window rate_limit_try. |
No rate limiting. |
| Named locks | TTL-based advisory locks in _honker_locks. |
pg_advisory_lock (similar concept, different implementation). |
| Cron scheduling | Built-in scheduler with 5-field/6-field cron + @every intervals. |
Needs pg-boss/Oban/cron extension. |
| Stream offsets | Per-consumer tracked offsets with monotonic upsert. | No built-in stream offsets. |
| Multi-process | Single-machine, single-writer. | Multi-process, multi-writer natively. |
| Durability | SQLite ACID. WAL mode for concurrent readers. | Postgres ACID. Full write-ahead logging. |
What honker gives you that pg_notify alone doesn't:
- Retry with exponential backoff — automatic re-delivery on failure
- Visibility timeout — crashed workers don't permanently lose messages
- Dead-letter queue — exhausted retries land in
_honker_deadfor inspection - Delayed jobs —
run_atfor future delivery - Prioritization —
prioritycolumn in claim index - Transactional outbox — business write + enqueue/event in one transaction, without adding Redis/Celery
- Task result storage — workers can persist return values; callers can await results
- Durable streams — per-consumer offsets with at-least-once delivery
- Cron scheduling — built-in periodic tasks with leader election
- Named locks and rate limiting — built-in coordination primitives
What you'd need to add if you used Postgres instead: pg-boss, Oban, or similar PgBoss-style packages provide many of these features, but they require Postgres as the database. Honker exists for the case where SQLite is already the primary datastore.
8. Comparison to Other Message Systems
| Feature | Honker | Redis Pub/Sub | NATS | Kafka |
|---|---|---|---|---|
| Persistence | SQLite tables (disk) | In-memory only (unless RDB/AOF) | In-memory (JetStream adds persistence) | Persistent log |
| Transactional coupling | Business write + enqueue in one tx | Not atomic with business data | Not atomic with business data | Not atomic with business data |
| Delivery guarantee | At-least-once | At-most-once (fire-and-forget) | At-most-once (core); at-least-once (JetStream) | At-least-once with consumer offsets |
| Retry/visibility | Built-in (claim timeout, retry, dead-letter) | None (messages disappear if no consumer) | None (core); redelivery (JetStream) | Consumer group offsets |
| Priority | Yes (partial index) | No | No | No |
| Delayed delivery | Yes (run_at) |
No (requires sorted sets hack) | No | No (requires time-based logic) |
| Single-node complexity | Zero — just a .db file |
Requires Redis server | Requires NATS server | Requires Kafka cluster |
| Cross-process wake latency | 1-2ms | ~0.1ms | ~0.1ms | ~1-5ms |
| Cross-node distribution | None (single machine) | Pub/Sub is fan-out to connected clients | JetStream supports clustering | Built for distributed |
| Dependency | SQLite (already in your stack) | Additional server | Additional server | Additional cluster |
| Schema coupling | Same file as business data — dual-write impossible | Separate system — dual-write risk | Separate system — dual-write risk | Separate system — dual-write risk |
| Language support | Python, Node, Rust, Go, Ruby, Bun, Elixir, C++, .NET, JVM, Kotlin | Many (but protocol, not SQL) | 40+ client libraries | Many client libraries |
| Dead-letter queue | Built-in _honker_dead |
None | JetStream has DLQ | DLQ via configuration |
When honker is the right choice: SQLite is already your primary datastore, and you need pub/sub + queue + scheduling without introducing Redis/Celery/NATS. The dual-write problem between your business tables and the queue disappears.
When honker is NOT the right choice: Multi-node deployments, multi-writer sharding, need for cross-datacenter replication, or workloads exceeding single-machine throughput.
9. Relevance to Alknet
9.1 Alignment with Event Boundary Discipline (ADR-032)
ADR-032 defines three communication layers:
Call Protocol (Layer 3, external, JSON)
└── irpc Service (Layer 3, internal, postcard)
└── Honker Streams (Domain events, within service boundary)
Honker's single-machine model is exactly right for the bottom layer. Domain events in alknet are internal to the service that owns that data — nodes:created, edges:deleted, accounts:updated. These never cross the service boundary without projection into a call protocol EventEnvelope.
The integration plan (Phase 2.2) explicitly lists honker integration patterns for alknet-storage:
| Feature | Use Case |
|---|---|
stream_publish / subscribe |
Durable pub/sub for node/edge/membership changes |
notify / listen |
Ephemeral pub/sub for real-time control channel events |
queue / claim / ack |
Task queue for async operations |
9.2 Patterns from Honker for alknet-storage Adoption
Map honker's primitives to alknet-storage's internal events:
| Alknet Domain Event | Honker Primitive | Stream Name |
|---|---|---|
| Node created | stream.publish("nodes:created", ...) |
nodes:created |
| Node updated | stream.publish("nodes:updated", ...) |
nodes:updated |
| Node deleted | stream.publish("nodes:deleted", ...) |
nodes:deleted |
| Edge created | stream.publish("edges:created", ...) |
edges:created |
| Account updated | stream.publish("accounts:updated", ...) |
accounts:updated |
| ACL rule changed | stream.publish("acl:changed", ...) |
acl:changed |
Map honker's task queue to alknet's async operations:
| Alknet Async Task | Honker Queue |
|---|---|
| Key rotation | queue("key-rotation") |
| Certificate renewal | queue("cert-renewal") |
| Audit log archival | queue("audit-archival") |
| Node encryption/decryption | queue("node-crypto") |
Map honker's notify/listen to real-time events:
| Alknet Real-Time Event | Honker Channel |
|---|---|
| SSH connection opened | notify("ssh:connected", ...) |
| Config reload triggered | notify("config:reload", ...) |
| Forwarding rule activated | notify("forwarding:activated", ...) |
9.3 Replicating Honker Patterns with Postgres for Production
If alknet-storage is backed by Postgres in production deployments (the storage spec mentions rusqlite but leaves room for alternative backends), the following Postgres equivalents would be needed:
| Honker Primitive | Postgres Equivalent | What's Lost |
|---|---|---|
notify/listen |
pg_notify + LISTEN |
Postgres NOTIFY is ephemeral (lost on restart). Honker's table-backed notifications persist. Need to add a _notifications table and polling. |
stream_publish/subscribe |
pg_notify + consumer offset table |
No built-in per-consumer offset tracking. Would need a _stream_consumers table and polling/cursor logic. |
queue/claim/ack |
pg-boss / Oban | These exist and are production-quality. Honker's simplicity (one table, partial index) is lost. Need a dependency on Oban or pg-boss. |
run_at (delayed jobs) |
Oban's scheduled_at / pg-boss's startAfter |
Available in both. |
claim_expires_at (visibility timeout) |
Oban's attempted_at + max_attempts |
Available in both. |
honker_lock_acquire/release |
pg_advisory_lock |
Built-in, similar concept. |
honker_rate_limit_try |
Custom table or Redis | Postgres has no built-in rate limiting. |
| Transactional coupling | Same tx | Naturally available: INSERT INTO orders ...; INSERT INTO _honker_live ...; both in the same Postgres tx. |
| Scheduler | pg-boss schedule() or Oban's Oban.insert(CronWorker, ...) |
Available in both. |
What would be lost switching to Postgres + pg-boss/Oban:
- Schema simplicity: Honker uses 2 tables for 90% of queue operations. pg-boss uses more tables. Oban uses per-queue tables.
- Zero-dependency: Honker is a SQLite extension. No Redis, no Celery, no broker. pg-boss requires Postgres. Oban requires Postgres + Elixir.
- Cross-language transparency: Any SQLite client can
SELECT load_extension('honker')and get the same features. Postgres requires language-specific client libraries. - File-based deployment: Copy the
.dbfile. Done. Postgres requires a server.
Recommendation for alknet-storage: Start with honker on SQLite for self-hosted/edge deployments. For production Postgres deployments, create an abstraction layer in alknet-storage that implements the same EventStream, TaskQueue, and NotificationChannel traits against both backends. The honker-on-SQLite implementation is the reference; the Postgres implementation uses pg_notify + offset tables + Oban/pg-boss.
9.4 Honker's Queue/Claim Model and alknet's Call Protocol
The call protocol's EventEnvelope frames are the integration boundary (ADR-033). When a domain event needs to cross node boundaries, it must be projected:
Honker stream event (internal)
→ Projection function
→ EventEnvelope frame (external, call protocol)
→ Transported over SSH/QUIC/DNS
→ Received by remote node
→ May trigger local Honker stream event on remote node
The queue/claim model maps to async call protocol operations:
- call.requested → Honker
queue.enqueue({"operation": "/head/auth/verify", "input": {...}}) - Worker claims the job → Like a worker process picking up a call request
- job.ack() → call.responded with the result
- job.retry() → Call timeout / retry logic (but this is at the transport layer, not the queue)
- job fails → _honker_dead → Dead letter equivalent for failed call protocol operations
The key difference: alknet's call protocol is synchronous request-response at the transport layer, while honker's queue is async at-least-once. They serve different purposes:
- Call protocol: "I need you to verify this pubkey NOW" (synchronous, cross-node)
- Honker queue: "Process this key rotation in the background" (asynchronous, within-node)
For cross-node task distribution, honker's queue should NOT be the transport. Instead:
- A domain event (honker stream) in the storage service triggers a projection
- The projection creates an
EventEnvelopeframe - The call protocol delivers it to remote nodes
- Remote nodes may enqueue it into their own honker queues for local processing
9.5 Cross-Node Event Distribution
Honker is single-node by design. This is correct for alknet's architecture because:
- Domain events stay within the service boundary (ADR-032). Honker streams are for internal state reconstruction, not cross-node distribution.
- Integration events cross boundaries via the call protocol. When a domain event in the storage service needs to be communicated to another node, it's projected into an
EventEnvelopeframe and sent over the wire. - Each node has its own
.dbfile with its own honker streams. This is a feature, not a limitation — it enforces the event boundary discipline.
The bridge pattern:
Node A (storage service):
1. Business write INSERTs into SQLite
2. stream.publish("nodes:created", {node_id: 42}) in same tx
3. A local subscriber detects the event
4. Projects it into EventEnvelope {operation: "/head/nodes/created", data: {node_id: 42}}
5. Sends via call protocol over SSH/QUIC/DNS to Node B
Node B (receiver):
1. Receives EventEnvelope via call protocol
2. Enqueues locally: queue("incoming-events").enqueue({source: "node-A", event: ...})
3. Or publishes locally: stream.publish("remote:nodes:created", {node_id: 42})
This preserves the three-layer model while respecting honker's single-machine design.
9.6 Honker Patterns and Integration Plan Mapping
The integration plan (Phase 2.2, alknet-storage) references these honker patterns. Here's the direct mapping:
| Plan Reference | Honker Primitive | Implementation Notes |
|---|---|---|
stream_publish/subscribe |
db.stream("topic").publish(data, tx=tx) + async for event in stream.subscribe(consumer="name") |
Used for domain events within alknet-storage. Each metagraph change (node/edge created/updated/deleted) publishes to a stream. Consumers (local reactive logic, SSE endpoints) subscribe. |
notify/listen |
tx.notify("channel", data) + async for n in db.listen("channel") |
Used for ephemeral real-time signals. SSH connection events, config reload triggers, forwarding rule activation. No persistence needed. |
queue/claim |
queue.enqueue(data, tx=tx) + async for job in queue.claim(worker_id) |
Used for background tasks. Key rotation, certificate renewal, audit log archival, batch operations. The tx=tx parameter ensures atomicity with business writes. |
Implementation approach for alknet-storage (Rust):
Use honker-core directly (not the Python binding). The Rust crate exposes:
open_conn(path, install_notify)— open a connection with PRAGMA defaultsattach_honker_functions(&conn)— register all SQL functionsbootstrap_honker_schema(&conn)— create tablesSharedUpdateWatcher::new(db_path)— the wake listener
Or load the extension via rusqlite:
use rusqlite::Connection;
let conn = Connection::open("alknet.db")?;
conn.load_extension("libhonker_ext", None)?;
conn.execute_batch("SELECT honker_bootstrap()")?;
9.7 Rust Integration: honker-core vs honker-rs
Two options for Rust integration in alknet-storage:
Option A: Use honker-core directly
The honker-core crate provides:
attach_notify(&conn)—_honker_notificationstable +notify()SQL functionattach_honker_functions(&conn)— allhonker_*SQL functionsbootstrap_honker_schema(&conn)— all table/index creationSharedUpdateWatcher— the wake mechanismopen_conn(path, install_notify)— connection factory with PRAGMA defaults
This gives you raw SQL access. You call conn.query_row("SELECT honker_enqueue(…)") etc. Maximum control, minimum abstraction.
Option B: Use honker-rs ergonomic wrapper
The packages/honker-rs crate provides:
Database::open(path)— openssystem.dbdb.queue("name")—Queuehandle with.enqueue(),.claim_batch(),.ack_batch()db.stream("name")—Streamhandle with.publish(),.subscribe()db.listen("channel")— async listenerdb.outbox("name", delivery_fn)— outbox patterndb.lock("name", owner, ttl)— named lockdb.scheduler()— cron scheduler
Recommendation: Start with honker-core + direct SQL. The schema and functions are stable and well-tested. Wrap in application-level methods as needed. honker-rs may not expose all features (e.g., the scheduler pause/resume/list/update functions added in Phase Mantle). Using honker-core gives maximum flexibility while maintaining a single source of truth for SQL behavior.
10. Open Questions for Alknet
-
Should alknet-storage bundle honker as a Rust crate dependency, or load the extension at runtime?
- Bundling
honker-coregives compile-time verification. Loading the extension requires shippinglibhonker_ext.so/.dylib/.dllalongside the binary. - Recommendation: Bundle
honker-coreas a crate dependency for the Rust implementation. Extension loading is for language bindings that can't link Rust code directly.
- Bundling
-
Should the
alknet-storagecrate depend onhonker(the Python package) orhonker-core(the Rust rlib)?honker-core(Rust rlib) — correct choice for a Rust crate.honkeris the Python binding.- The Crate dependency in storage.md currently lists
honker = "0.x". This should behonker-core = "0.2".
-
How does the Rust
SharedUpdateWatcherintegrate with tokio?SharedUpdateWatcher::subscribe()returns astd::sync::mpsc::Receiver<()>, which is blocking. For tokio integration, wrap intokio::task::spawn_blockingor usetokio::sync::mpscas a bridge.- Alternatively, use
UpdateWatcher::spawn()directly and convert ticks to tokio notifications.
-
Should alknet-storage abstract over honker-specific table names?
- Honker prefixes all internal tables with
_honker_(e.g.,_honker_live,_honker_stream). Alknet-storage should treat these as honker's internal schema and not directly query them for application logic. - Application-level tables (like
nodes,edges,accounts) should use their own namespacing convention. Honker's tables coexist in the same.dbfile.
- Honker prefixes all internal tables with
-
Multi-tenant support: Honker queues and streams are identified by name strings (e.g.,
"emails","user-events"). For alknet's multi-tenant model (system DB vs tenant DB), each tenant gets its own.dbfile with its own honker tables. Cross-tenant events must go through the call protocol — never by direct honker stream subscription across database files. -
Database file management: Alknet-storage's system DB (
system.db) and tenant DBs (tenant-{orgId}.db) should each have their own honker instance. TheSharedUpdateWatcheris per-database, so 100 active tenants = 100 poll threads. This is fine for the expected alknet deployment size, but worth monitoring thread count in large deployments.
11. License and Maturity
- License: Apache 2.0 OR MIT (dual-licensed). Fully permissive for integration.
- Maturity: Alpha software (noted in README). Better than experimental but not beta-quality yet.
- Status: Active development. Regular commits. Cross-language interop tests. 180+ Python tests, 12+ Rust tests. Crash recovery verified. 600-second soak test under sustained writes.
- Breaking changes risk: The project is pre-1.0. Some table names still reference "joblite" and "litenotify" in the CHANGELOG (historical names). Current names use
_honker_prefix. The API surface is stabilizing but may change. - Recommendation: Pin to a specific
honker-coreversion inalknet-storage'sCargo.toml. The schema migration path (seen inbootstrap_honker_schema's ALTER TABLE forenabledcolumn) shows the project handles migrations.
References
- Honker GitHub Repository — Primary source for all code and documentation
- Honker README — Feature overview, quick start, architecture, performance
- Honker BINDINGS.md — Language binding support matrix
- Honker ROADMAP.md — Future work phases, planned features (singleton/dedup, state events, queue stats, per-queue config)
- Honker CHANGELOG.md — Detailed history of all changes, performance passes, and architecture decisions
- Honker honker-core/src/lib.rs — Core Rust implementation: Writer, Readers, UpdateWatcher, SharedUpdateWatcher, schema, PRAGMAs
- Honker honker-core/src/honker_ops.rs — All SQL function implementations: enqueue, claim, ack, retry, stream, lock, rate limit, scheduler
- Honker honker-extension/src/lib.rs — Loadable extension entry point and C ABI for watcher
- alknet ADR-032: Event Boundary Discipline — Domain events stay within service boundary
- alknet Integration Plan — Phase 2.2: alknet-storage honker integration
- alknet Storage Spec — alknet-storage crate design and honker integration table