feat(moq-data): add metadata-over-MoQ helpers (set + json)#1727
feat(moq-data): add metadata-over-MoQ helpers (set + json)#1727kixelated wants to merge 10 commits into
Conversation
Add a new `moq-data` crate and `@moq/data` package providing helpers for sending metadata over MoQ tracks, mirroring the snapshot/delta machinery of moq-json. - `set`: a HashSet-like collection synced over a track with `+`/`-` delta encoding. Generic over any binary item via a small `Item` trait (Rust) or `Codec<T>` (JS); `String`/`Vec<u8>`/`Bytes` and `stringCodec`/`bytesCodec` are provided. Each group is self-contained: frame 0 is an atomic snapshot (u32 count + length-prefixed items) and later frames are single ops (`+`/`-` byte then item bytes), so a late joiner reconstructs from the newest group alone. Deltas are on by default. The wire format uses big-endian u32 length prefixes so Rust and JS stay byte-compatible. - `json`: re-exports moq-json for now; JSON will migrate here over time. The motivating use case is a `tracks.set` track listing a broadcast's track names. Wiring that into the hang catalog is a follow-up (a catalog/wire change targeting `dev`); this crate is the generic, additive building block. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughTwo new parallel packages are introduced: 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (5)
js/data/src/set/set.test.ts (2)
69-82: ⚡ Quick winConsider calling
finish()for resource cleanup.While the test correctly verifies live consumption, it doesn't close the track or producer. For completeness and to avoid potential resource leaks, consider adding:
producer.remove("video"); expect(await consumer.next()).toEqual(set("audio")); + + producer.finish(); });This makes the cleanup explicit and matches the pattern in other tests.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@js/data/src/set/set.test.ts` around lines 69 - 82, The test "live consumer sees each change" creates Track and Producer objects but does not clean up these resources after the test completes, which can lead to resource leaks. Add a call to finish() at the end of the test after the last assertion to explicitly close and clean up the track and producer resources. Follow the same cleanup pattern used in other tests in this file to ensure consistency.
35-44: ⚡ Quick winTest name implies structure verification but only checks final state.
The test name "deltas off: a snapshot group per change" suggests verifying that each change creates a separate snapshot group, but the assertion only checks the final reconstructed set. Consider adding a structure assertion to match the test name:
producer.insert("audio"); producer.finish(); + expect(await structure(track)).toEqual([1, 1]); // two groups, one frame each - expect((await drain(track)).at(-1)).toEqual(set("video", "audio"));Alternatively, rename the test to focus on the end state rather than the internal structure.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@js/data/src/set/set.test.ts` around lines 35 - 44, The test "deltas off: a snapshot group per change" has a name that implies verification of internal snapshot group structure, but the assertion only checks the final reconstructed set. Either add an assertion to verify that drain(track) contains multiple snapshot groups (one per insert operation) to match the test name, or rename the test to accurately reflect that it only verifies the final state after inserts (for example, "deltas off: inserts produce correct final set").js/data/src/json.ts (1)
1-2: 💤 Low valueRephrase comment to focus on current state, not future plans.
The comment "will migrate here over time" describes future intent rather than current state. Based on learnings, comments should reflect the current state of the code, not its history or future. Consider rephrasing to explain why the re-export exists today without forecasting future changes.
📝 Suggested rephrase
-// Snapshot/delta JSON publishing, re-exported from `@moq/json`. JSON lives in its own package today -// and will migrate here over time. +// Snapshot/delta JSON publishing, re-exported from `@moq/json`. export * from "`@moq/json`";🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@js/data/src/json.ts` around lines 1 - 2, The comment at the beginning of the json.ts file contains forward-looking language about future migration ("will migrate here over time") instead of explaining the current state. Rephrase the comment to describe why the re-export from `@moq/json` exists in its current state today, removing the future-oriented language. Focus on the present reason for the re-export structure rather than forecasting planned changes.Source: Learnings
js/data/src/set/consumer.ts (1)
14-14: ⚡ Quick winConsider importing
Codecdirectly for clarity.Using
Config<T>["codec"]creates an indirect dependency on the shape ofConfig. IfConfigchanges or is renamed, this type reference breaks. ImportingCodec<T>directly from./codec.tsmakes the dependency explicit and the code more resilient to refactoring.♻️ Suggested refactor
-import type { Config } from "./producer.ts"; +import type { Codec } from "./codec.ts"; import { decodeDelta, decodeSnapshot, INSERT, keyOf, REMOVE } from "./wire.ts";export class Consumer<T> { `#track`: Moq.Track; - `#codec`: Config<T>["codec"]; + `#codec`: Codec<T>; `#group`?: Moq.Group; // Keyed by encoded bytes so items dedupe by value, not reference. `#current` = new Map<string, T>(); `#framesRead` = 0; - constructor(track: Moq.Track, config: Config<T>) { + constructor(track: Moq.Track, config: { codec: Codec<T> }) { this.#track = track; this.#codec = config.codec; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@js/data/src/set/consumer.ts` at line 14, The `#codec` field currently uses Config<T>["codec"] as its type, creating an indirect dependency on the Config type that makes the code fragile to refactoring. Replace this indexed type reference by directly importing Codec<T> from ./codec.ts and using it as the type annotation for the `#codec` field instead of Config<T>["codec"]. This makes the dependency explicit and resilient to Config changes or renames.js/data/src/set/producer.ts (1)
99-103: ⚡ Quick winConsider caching encoded bytes to avoid re-encoding in snapshots.
#snapshot()re-encodes every value on each call, even though the map is already keyed by the encoded bytes (viakeyOf(bytes)). Storing theUint8Arrayalongside eachTwould eliminate redundant encoding when rolling snapshots, especially for large sets or expensive codecs.⚡ Suggested optimization
Change
#itemsto store{ value: T, bytes: Uint8Array }:- `#items` = new Map<string, T>(); + `#items` = new Map<string, { value: T; bytes: Uint8Array }>();Then update
insert,remove,has,values, and#snapshot:insert(value: T): boolean { const bytes = this.#codec.encode(value); const key = keyOf(bytes); if (this.#items.has(key)) return false; - this.#items.set(key, value); + this.#items.set(key, { value, bytes }); this.#publish(INSERT, bytes); return true; }remove(value: T): boolean { const bytes = this.#codec.encode(value); const key = keyOf(bytes); if (!this.#items.has(key)) return false; this.#items.delete(key); this.#publish(REMOVE, bytes); return true; }values(): IterableIterator<T> { - return this.#items.values(); + return (function* (items) { + for (const { value } of items.values()) yield value; + })(this.#items); }`#snapshot`(): Uint8Array { - const items: Uint8Array[] = []; - for (const value of this.#items.values()) items.push(this.#codec.encode(value)); - return encodeSnapshot(items); + const items = Array.from(this.#items.values(), ({ bytes }) => bytes); + return encodeSnapshot(items); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@js/data/src/set/producer.ts` around lines 99 - 103, The `#snapshot()` method re-encodes every value each time it is called, which is inefficient since encoding has already been performed. Refactor the `#items` data structure to store both the original value and its pre-encoded bytes as an object `{ value: T, bytes: Uint8Array }` instead of just the value `T`. Then update the `#snapshot()` method to use the cached `bytes` field directly instead of calling `this.#codec.encode(value)` in the loop. Additionally, update the `insert`, `remove`, `has`, and `values` methods to work with this new structure by accessing the value property where needed and storing the encoded bytes when inserting new items.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@js/data/src/set/producer.ts`:
- Around line 6-7: Replace the em dash ("—") used in the comment block with
appropriate punctuation following style guidelines. Either use a period to
separate the two related thoughts into distinct sentences, or use a
comma/parenthesis to connect them as a single continuous thought. The comment
describes Maximum frames behavior and should be reworded to remove the em dash
while maintaining clarity.
In `@js/data/src/set/wire.ts`:
- Around line 45-55: The snapshot decoders currently accept trailing bytes after
processing all declared items, which can allow malformed snapshots to parse
successfully and silently drop intended state. In js/data/src/set/wire.ts (lines
45-55), after the decode loop completes, add a check that requires offset to
equal frame.length, throwing an error if there are extra bytes remaining. In
rs/moq-data/src/set.rs (lines 379-390), after the decode loop completes, add a
check that requires the frame has no remaining data using frame.has_remaining(),
returning Error::Malformed if there are trailing bytes. Both files must validate
that all bytes in the snapshot have been consumed by the declared items.
In `@rs/moq-data/src/set.rs`:
- Around line 16-19: The snapshot format documentation in the comment block
describes the encoding using varint notation (varint(count) and varint(len)),
but the actual implementation uses big-endian u32 encoding. Update lines 16-17
in the snapshot documentation to accurately reflect that the format uses
big-endian u32 for both the count and length fields instead of variadic integer
encoding, ensuring the docs match the actual wire contract implemented in the
module.
- Around line 376-379: The code preallocates a HashSet using capacity from
untrusted wire data (the count obtained from frame.get_u32()) without first
validating it, which can allow an attacker to cause memory exhaustion. Remove
the capacity preallocation by replacing HashSet::with_capacity(count as usize)
with HashSet::new(), and let the HashSet grow naturally as items are inserted.
This ensures structural validation of the frame happens before any resource
allocation based on untrusted data.
---
Nitpick comments:
In `@js/data/src/json.ts`:
- Around line 1-2: The comment at the beginning of the json.ts file contains
forward-looking language about future migration ("will migrate here over time")
instead of explaining the current state. Rephrase the comment to describe why
the re-export from `@moq/json` exists in its current state today, removing the
future-oriented language. Focus on the present reason for the re-export
structure rather than forecasting planned changes.
In `@js/data/src/set/consumer.ts`:
- Line 14: The `#codec` field currently uses Config<T>["codec"] as its type,
creating an indirect dependency on the Config type that makes the code fragile
to refactoring. Replace this indexed type reference by directly importing
Codec<T> from ./codec.ts and using it as the type annotation for the `#codec`
field instead of Config<T>["codec"]. This makes the dependency explicit and
resilient to Config changes or renames.
In `@js/data/src/set/producer.ts`:
- Around line 99-103: The `#snapshot()` method re-encodes every value each time
it is called, which is inefficient since encoding has already been performed.
Refactor the `#items` data structure to store both the original value and its
pre-encoded bytes as an object `{ value: T, bytes: Uint8Array }` instead of just
the value `T`. Then update the `#snapshot()` method to use the cached `bytes`
field directly instead of calling `this.#codec.encode(value)` in the loop.
Additionally, update the `insert`, `remove`, `has`, and `values` methods to work
with this new structure by accessing the value property where needed and storing
the encoded bytes when inserting new items.
In `@js/data/src/set/set.test.ts`:
- Around line 69-82: The test "live consumer sees each change" creates Track and
Producer objects but does not clean up these resources after the test completes,
which can lead to resource leaks. Add a call to finish() at the end of the test
after the last assertion to explicitly close and clean up the track and producer
resources. Follow the same cleanup pattern used in other tests in this file to
ensure consistency.
- Around line 35-44: The test "deltas off: a snapshot group per change" has a
name that implies verification of internal snapshot group structure, but the
assertion only checks the final reconstructed set. Either add an assertion to
verify that drain(track) contains multiple snapshot groups (one per insert
operation) to match the test name, or rename the test to accurately reflect that
it only verifies the final state after inserts (for example, "deltas off:
inserts produce correct final set").
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: bad80947-5197-4023-b7c0-82e37a99ed9e
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.lockbun.lockis excluded by!**/*.lock
📒 Files selected for processing (18)
CLAUDE.mdCargo.tomljs/data/README.mdjs/data/package.jsonjs/data/src/index.tsjs/data/src/json.tsjs/data/src/set/codec.tsjs/data/src/set/consumer.tsjs/data/src/set/index.tsjs/data/src/set/producer.tsjs/data/src/set/set.test.tsjs/data/src/set/wire.tsjs/data/tsconfig.jsonpackage.jsonrs/moq-data/Cargo.tomlrs/moq-data/README.mdrs/moq-data/src/lib.rsrs/moq-data/src/set.rs
…iew nits - Remove the `moq-data` entry from `[workspace.dependencies]`: nothing consumes the crate yet, so cargo-shear flagged it as unused and failed CI. It stays a workspace member, so it still builds. - decode_snapshot (Rust + JS): reject trailing bytes after the declared items, and bound the item count by the remaining frame size before allocating so a malformed frame can't request a huge HashSet capacity. - Fix the set.rs module doc to describe the u32 length prefixes (was varint). - Consumer: import `Codec` directly instead of `Config<T>["codec"]`. - Drop forward-looking "will migrate" wording from the json re-export comments (kept in the READMEs); strengthen the set tests. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Change `set::Item::encode` to write directly into the frame's `bytes::BufMut` instead of returning a fresh `Bytes`, with a new `size()` for the length prefix. A string or byte-vector item was copied twice (into an intermediate `Bytes`, then into the frame); now it's a single copy straight into the frame buffer. `decode` keeps taking `bytes::Bytes` so an item can hold a zero-copy slice of the frame. The wire format is unchanged (big-endian u32 length prefix + raw item bytes), so the JS implementation and cross-language interop are unaffected. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Make `set::Item::decode` take `&mut impl Buf` instead of an owned `Bytes`, mirroring the `BufMut`-based `encode`. A custom item can now read its fields straight off the buffer (`get_u16`, ...), and `buf.copy_to_bytes(remaining)` stays zero-copy on the `Bytes`-backed frame for items that want the raw slice. The caller hands `decode` a buffer holding exactly the item's bytes (the snapshot loop splits each item off; a delta is the rest of the frame), so the wire format is unchanged and JS interop is unaffected. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Encode snapshots and deltas directly into the frame via `GroupProducer:: create_frame` + the `FrameProducer` `BufMut`, instead of building an intermediate `BytesMut` and handing it to `write_frame`. A frame is a single pre-sized buffer and `write_frame` memcpys into it, so the old path copied each item twice (item -> BytesMut -> FrameBuf); now it's one copy (item -> FrameBuf). This is what `Item::size()` enables: `create_frame` needs the total frame size up front, and summing `size()` gives it with no scratch buffer. `should_snapshot` now sizes the snapshot arithmetically rather than building it just to measure. `insert`/`remove` pick snapshot-vs-delta from sizes while they still hold the item reference, so a delta is written straight from `&item` with no clone. Wire format is unchanged, so JS interop is unaffected. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Drop the required `size()` method from `set::Item`. Computing a frame's size up front no longer needs a dedicated trait method: the new `encode_size` defaults to running `encode` against a counting `BufMut` (moq-net's `Sizer`), so a custom item only has to implement `encode`/`decode`. Items whose length is known directly (`String`, `Vec<u8>`, `Bytes`) override `encode_size` to return `self.len()`. Export `Sizer` from moq-net (it was already `pub` inside the private `coding` module). This is a Rust-only encoding helper, no wire or JS change, so no cross-package sync is needed. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…-net Revert the `moq_net::Sizer` re-export and copy the counting `BufMut` into moq-data as a private `sizer` module instead, so the encoding helper doesn't widen moq-net's public surface. `Item::encode_size`'s default uses the local copy. No behavior or wire change. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
rs/moq-data/src/set.rs (1)
190-220:⚠️ Potential issue | 🟠 Major | ⚡ Quick winPreserve
self.currenton publish failures.Line 191 and Line 208 commit local state before
write_snapshotorwrite_deltasucceeds. If publishing fails, retries become no-ops while the track may never have observed the change.🐛 Proposed fix
@@ - if self.should_snapshot(delta_size, snapshot_size) { - self.current.insert(item); - self.write_snapshot()?; + if self.should_snapshot(delta_size, snapshot_size) { + self.current.insert(item.clone()); + if let Err(err) = self.write_snapshot() { + self.current.remove(&item); + return Err(err); + } } else { // Write the delta straight from the reference, then move the item into the set. self.write_delta(INSERT, &item)?; self.current.insert(item); } @@ - if self.should_snapshot(delta_size, snapshot_size) { - self.write_snapshot()?; - } else { - self.write_delta(REMOVE, &removed)?; - } + let publish = if self.should_snapshot(delta_size, snapshot_size) { + self.write_snapshot() + } else { + self.write_delta(REMOVE, &removed) + }; + if let Err(err) = publish { + self.current.insert(removed); + return Err(err); + } Ok(true) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rs/moq-data/src/set.rs` around lines 190 - 220, The insert and remove methods modify self.current (local state) before ensuring that the write operations succeed. In the insert method, move the self.current.insert(item) call to occur only after both the write_snapshot() and write_delta() calls have completed successfully. In the remove method, since the item is taken from self.current before the write attempt, restore the removed item back into self.current if write_snapshot() or write_delta() fails, so that retries will attempt to publish the change again rather than becoming no-ops.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rs/moq-data/src/set.rs`:
- Around line 262-263: Update the documentation comment to reference the correct
method name. The doc comment currently references a non-existent trait method
`Item::size`, which should be changed to `Item::encode_size`. Locate the doc
comment describing the byte size computation and replace the outdated method
reference to point to the correct method that is currently used for sizing
calculations.
---
Outside diff comments:
In `@rs/moq-data/src/set.rs`:
- Around line 190-220: The insert and remove methods modify self.current (local
state) before ensuring that the write operations succeed. In the insert method,
move the self.current.insert(item) call to occur only after both the
write_snapshot() and write_delta() calls have completed successfully. In the
remove method, since the item is taken from self.current before the write
attempt, restore the removed item back into self.current if write_snapshot() or
write_delta() fails, so that retries will attempt to publish the change again
rather than becoming no-ops.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ba4419ef-89a3-4a83-8ab4-624fc7f8b1cd
📒 Files selected for processing (3)
rs/moq-data/README.mdrs/moq-data/src/set.rsrs/moq-net/src/lib.rs
✅ Files skipped from review due to trivial changes (1)
- rs/moq-data/README.md
`insert`/`remove` mutated `self.current` before the frame write succeeded, so a failed publish (e.g. the track was closed) left the local set disagreeing with what the track actually saw. Roll the change back on error: the snapshot path of `insert` removes the just-inserted item, and `remove` re-inserts the taken one. The delta paths already write before mutating. Adds a regression test. Also fix a stale doc reference (`Item::size` -> `Item::encode_size`). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…l set
The set consumer now yields an `Update { added, removed }` per change instead of
the whole reconstructed set, so a watcher (e.g. tracks.set) can react to
individual items appearing and leaving. The full set is still available via the
new `Consumer::current()`.
A delta maps to a single add or remove. A snapshot is diffed against the current
set (the consumer no longer clears state when switching groups), so a
snapshot-only stream still produces per-item changes and a group roll never
re-reports the whole set. No-op frames are skipped, so an `Update` is never
empty. Same change applied to `@moq/data`. Wire format unchanged.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…-ad0814 # Conflicts: # bun.lock
Summary
Adds a new
moq-datacrate and@moq/datapackage: helpers for sending metadata over MoQ tracks, built on the same snapshot/delta machinery asmoq-json. Two modules, each behind a feature (Rust) / subpath export (JS):set— aHashSet-like collection synced over a track with+/-delta encoding.json— re-exportsmoq-jsonfor now (moq_data::json/@moq/data/json); JSON will migrate here over time.The motivating use case is a
tracks.settrack listing all available track names in a broadcast, but the crate is kept generic enough to sync any binary data.The
setmoduleItems are arbitrary binary data, not serde-bound. A type plugs in via a tiny
Itemtrait (Rust:encode -> Bytes/decode(Bytes) -> Self) orCodec<T>(JS:encode/decodetoUint8Array).String,Vec<u8>,BytesandstringCodec/bytesCodecare provided.Wire format (cross-language, byte-compatible)
Each group is self-contained, mirroring
moq-json:u32(count)then, per item,u32(len)+ item bytes.+=0x2Binsert,-=0x2Dremove) + item bytes to end of frame.A consumer jumps to the newest group, decodes the snapshot, replays the deltas, and yields the full set after each frame. Deltas are on by default (
delta_ratio/deltaRatio= 2); a new snapshot group rolls once the deltas outgrow the ratio budget or hitMAX_DELTA_FRAMES(256). Lengths are big-endianu32(not QUIC varints) so the Rust and JS encoders stay trivially byte-compatible without depending on moq-net internals.Scope / branch targeting
This is an additive new crate/package, so it targets
main. The actualtracks.setwiring into the hang catalog is a follow-up — that's a catalog/wire change and belongs ondev. This PR ships only the generic building block, with a string-set test demonstrating the scenario.Public API surface (all new, nothing breaking)
rs/moq-data:set::{Producer, Consumer, Config, Item, Error, Result}+Itemimpls forString/Vec<u8>/Bytes;jsonre-export.Configis#[non_exhaustive]with aDefault;Erroris#[non_exhaustive].@moq/data:set::{Producer, Consumer, Config, Codec, stringCodec, bytesCodec};@moq/data/jsonre-export.Test plan
cargo test -p moq-data --all-features— 8 tests passcargo fmt/cargo clippy -p moq-data --all-featuresclean (via nix)json-only,set-only, no-default-featuresbun testinjs/data— 6 tests passtsc --noEmitandbiome check js/datacleanCross-package sync
Added a row to the CLAUDE.md sync table:
rs/moq-dataset wire/API ↔js/data(shared wire format, must stay byte-compatible).🤖 Generated with Claude Code
(Written by Claude)