feat(moq-net): add OriginDynamic for unannounced fallback broadcasts#1772
Merged
Conversation
Adds the origin-level analogue of BroadcastDynamic. Where BroadcastDynamic serves tracks on demand within a broadcast and TrackDynamic serves uncached groups within a track, OriginDynamic serves whole broadcasts on demand within an origin. A consumer calls the new OriginConsumer::request_broadcast(path), which first tries the existing announced lookup and, if nothing is announced, registers a fallback request for an OriginProducer::dynamic() handler to pick up via requested_broadcast(). The handler accepts with a BroadcastDynamic to serve the broadcast's tracks (e.g. a relay proxying upstream). Dynamically served broadcasts are deliberately never announced: they exist only as a fallback when a live announcement is absent, so OriginConsumer::announced never observes them. The request queue lives in a shared OriginDynamicState off to the side of the announce tree, mirroring the dynamic/requests/request_order shape of the broadcast and track models (ref-counted handlers, coalesced requests, reject-on-last-drop). Purely additive in-process API, no wire change. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
… through it Reworks request_broadcast to return a kio::Pending future (like TrackConsumer::fetch_group) instead of a half-pending BroadcastConsumer: - request_broadcast(path) -> Result<kio::Pending<BroadcastRequested>, Error>. Registration is synchronous (so a handler sees the request immediately); the future resolves to the announced broadcast at once, or once an OriginDynamic handler accepts (live broadcast) or rejects (error), or NotFound if every handler drops first. - BroadcastRequest::accept now takes the broadcast to serve (impl Consume<BroadcastConsumer>) and resolves all awaiting requesters; reject resolves them with an error. The result rides a one-shot kio channel; kio polls the value before the closed flag, so the final result is observed without a close race. - Coalescing is unchanged: concurrent requests for the same queued path share one handler request. Routes the publisher serve paths through request_broadcast instead of get_broadcast: lite recv_subscribe / run_track_info / run_fetch and the ietf subscribe handler. With no OriginDynamic handler registered this is identical to before (immediate announced lookup or NotFound); a registered handler gains the dynamic fallback. get_broadcast stays as the synchronous announced-only peek for tests and external callers (libmoq, moq-rtc). announced_broadcast is unchanged (still the race-free await-for-announce path). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Adds dynamic broadcast requesting to the FFI layers so callers aren't gated to only-announced broadcasts. request_broadcast resolves the announced broadcast immediately, falls back to an OriginDynamic handler if the origin has one, or errors; unlike the announce-wait paths it does not block for a future announce. - rs/moq-ffi: MoqOriginConsumer::request_broadcast (async). - rs/libmoq: moq_origin_request + moq_origin_request_close (callback style, reusing the consume-task slab). Sits between moq_origin_consume (announced-only, fails fast) and moq_origin_consume_announced (waits indefinitely). - Wrappers: py (OriginConsumer + Client), swift (OriginConsumer), go (OriginConsumer + Client). kt gets it for free via the uniffi typealias. - Docs: py README + doc/lib/py, and the libmoq callback-function list in doc/lib/c. The go/swift/kt doc pages are prose quickstarts that don't enumerate per-method APIs, so request_broadcast flows through their generated/typealiased API docs instead. libmoq's existing get_broadcast-based `consume` is intentionally left as-is: it must stay a synchronous announced-only peek so a stale announce/unannounce can't trigger a dynamic fetch. Carrying the announced BroadcastConsumer in libmoq's announce buffer (to make the announce->consume handoff race-free) is a separate follow-up; it needs a free lifecycle on that slab and doesn't regress today's path. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…free moq_origin_announced hands its callback a fresh announce ID for every announce/unannounce event, but there was no way to release one, so they accumulated for the life of the listener. Adds moq_origin_announced_free (and Origin::announced_free) to drop a single delivered record once read. Explicit free rather than auto-cleanup on unannounce: an unannounce is its own delivered record, and auto-freeing the prior one would race a caller still reading its info (the path pointer borrows the record's storage). This is per-record and distinct from moq_origin_announced_close, which stops the listener. Adds an announced_free_lifecycle test and documents the free in doc/lib/c. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…gin_request Removes the synchronous moq_origin_consume (and Origin::consume): it was a point-in-time announced-only peek that raced announcement gossip. moq_origin_request covers the same case (it resolves an already-announced broadcast immediately) without the racy footgun, and additionally supports the dynamic fallback. Callers reacting to an announcement should use the broadcast delivered by moq_origin_announced; callers that want a specific path use moq_origin_request. Migrates the libmoq tests to a request_broadcast helper over moq_origin_request, and updates the README signature list and doc cross-references. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
…to request_broadcast get_broadcast is a synchronous announced-only peek that races announcement gossip. request_broadcast is now the public lookup (announced-now resolves immediately, with a dynamic fallback), so the racy peek no longer needs to be public surface. - moq-net: drop `pub` from OriginConsumer::get_broadcast; it stays as the internal helper backing request_broadcast (and the tree-state test assertions). - moq-rtc: WHIP/WHEP gateways look up the broadcast via request_broadcast. With no dynamic handler this is identical (resolve-if-announced, else fail fast); a handler would add the fallback. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
cargo doc -D warnings (rustdoc::private_intra_doc_links) rejected the public announced_broadcast / request_broadcast doc comments linking to Self::get_broadcast after it was made private. Point them at request_broadcast / drop the link. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds
OriginDynamic, the origin-level analogue of the existingBroadcastDynamic/TrackDynamicon-demand handlers, makesrequest_broadcastthe one public broadcast-lookup, and reworks the FFI surface around it.OriginDynamic(new)BroadcastDynamicTrackDynamicrequest_broadcast(the unified lookup)OriginConsumer::request_broadcast(path) -> Result<kio::Pending<BroadcastRequested>, Error>— akio::Pendingfuture you await (not anasync fn), mirroringTrackConsumer::fetch_group. Registration is synchronous (a handler sees the request immediately); the future then resolves:OriginDynamic→ once the handleraccepts (live broadcast) orrejects (error), coalescing concurrent requests for the same path;NotFound(Droppedonce the origin is gone).Not announced, by design
Dynamically served broadcasts are never announced —
OriginConsumer::announcednever observes them. The request queue lives in a sharedOriginDynamicStateoff to the side of the announce tree (ref-counted handlers, coalesced requests, reject-on-last-drop). The result rides a one-shot kio channel; kio checks the value before the closed flag, so the outcome is observed without a close race.Handler side
OriginProducer::dynamic() -> OriginDynamic;requested_broadcast()yields aBroadcastRequest.accept(broadcast)resolves every awaiting requester with the supplied live broadcast (the handler keeps producing into it, e.g. a relay proxying upstream);reject(err)resolves them with an error.get_broadcastretired from the public APIOriginConsumer::get_broadcastwas a synchronous announced-only peek that races announcement gossip. It's now private — the internal helper backingrequest_broadcast's announced case (and the tree-state test assertions). Every caller moved torequest_broadcast:recv_subscribe,run_track_info,run_fetch, ietf subscribe.With no
OriginDynamicregistered this is behavior-identical (resolve-if-announced, else fail fast); a registered handler adds the fallback.announced_broadcastis unchanged (race-free await-for-announce).FFI: dynamic broadcast requesting
Neither libmoq nor moq-ffi could previously request a broadcast that hadn't been announced. Now:
MoqOriginConsumer::request_broadcast(async).moq_origin_request+moq_origin_request_close(callback style). Removes the synchronousmoq_origin_consume(breaking): it was a racy announced-only peek;moq_origin_requestresolves an already-announced broadcast immediately (same common case, no race) and adds the dynamic fallback. Callers reacting to an announcement use the broadcast delivered bymoq_origin_announced.OriginConsumer+Client), swift (OriginConsumer), go (OriginConsumer+Client); kt via the uniffi typealias.doc/lib/py; libmoq README +doc/lib/c.FFI: free delivered announcements
moq_origin_announcedhanded its callback a fresh announce ID per event with no way to release one (they accumulated). Addsmoq_origin_announced_freeto drop a delivered record once read. Explicit free, not auto-on-unannounce: an unannounce is its own record, and auto-freeing the prior one would race a caller still reading its borrowedpathpointer.Scope / follow-ups
dev(new public surface inrs/moq-net+rs/moq-ffi;get_broadcastprivatized;moq_origin_consumeremoved).OriginDynamicin the relay/cluster to actually fetch from upstream on a fallback request;js/netmirror; carry theBroadcastConsumerin libmoq's announced record (+ accessor) so announce→consume is fully race-free (now unblocked bymoq_origin_announced_free).Public API changes
rs/moq-net: addedOriginProducer::dynamic,OriginConsumer::request_broadcast,OriginDynamic,BroadcastRequest,BroadcastRequested;OriginConsumer::get_broadcastis now private.rs/moq-ffi: addedMoqOriginConsumer::request_broadcast.rs/libmoq: addedmoq_origin_request,moq_origin_request_close,moq_origin_announced_free; removedmoq_origin_consume.Test plan
announced_free_lifecycle; 7 roundtrip tests migrated ontomoq_origin_requestcargo test -p moq-ffi -p libmoq(24 + 25); moq-rtc / moq-relay buildcargo fmt --check+clippy --all-targetsclean for moq-net / moq-ffi / libmoq / moq-rtc (via nix)(Written by Claude)