--- id: core-operators-tests name: Write tests for all stream operators status: pending depends_on: [] scope: moderate risk: low impact: component level: implementation --- ## Description The `operators.ts` module exports 13 operators (`filter`, `map`, `pipe`, `take`, `reduce`, `toArray`, `batch`, `dedupe`, `window`, `flat`, `groupBy`, `chain`, `join`). None have tests. Each operator works with `AsyncIterable` input; Repeater-returning operators (`filter`, `map`) provide backpressure-aware push semantics. The operators are adapted from graphql-yoga (`filter`, `map`, `pipe`) and added from the async-utility reference (`take`, `reduce`, `toArray`, `batch`, `dedupe`, `window`, `flat`, `groupBy`, `chain`, `join`). ## Acceptance Criteria - [ ] `test/operators.test.ts` exists and passes - [ ] `filter` — filters items by predicate; type-narrowing overload works - [ ] `filter` — async predicate support - [ ] `map` — transforms items; async mapper support - [ ] `pipe` — composes 1-5 functions - [ ] `pipe` — compose with `subscribe`: `pipe(pubsub.subscribe("myEvent", id), filter(...), map(...))` - [ ] `take` — yields first N items, then stops - [ ] `reduce` — reduces to single value - [ ] `toArray` — collects all items into array - [ ] `batch` — groups into arrays of `size` - [ ] `batch` — yields remaining items if not a full batch - [ ] `dedupe` — yields only unique items - [ ] `window` — sliding window of `size` items, advancing by `step` - [ ] `flat` — flattens `AsyncIterable` into `AsyncIterable` - [ ] `groupBy` — groups items by key into `Map` (terminal operation) - [ ] `chain` — concatenates multiple async iterables - [ ] `join` — streaming join between two sources on matching keys ## References - docs/architecture/api-surface.md (Operators section) - src/operators.ts ## Notes > To be filled by implementation agent ## Summary > To be filled on completion