Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
0785276
moq-mux: decouple the opus importer from the broadcast catalog
claude Jun 16, 2026
23f3bcb
moq-mux: decouple the H.264 importer from the broadcast catalog
claude Jun 16, 2026
e315189
moq-boy: fix track() call after moq-video Producer::track() lost its …
claude Jun 16, 2026
433e8ea
moq-mux: port the remaining importers off the broadcast catalog
claude Jun 16, 2026
dc37cea
moq-mux: extract the H.264 byte->frame Split; add the FrameDecode path
claude Jun 16, 2026
f752b7b
moq-mux: make the H.264 Split dumb; move config into the importer
claude Jun 16, 2026
ba7a406
moq-mux: Published owns decode+sync; drop manual sync calls
claude Jun 16, 2026
7f10a4e
moq-mux: replace lenient_start with a MissingKeyframe error
claude Jun 16, 2026
f631821
moq-mux: extract the H.265 byte->frame Split
claude Jun 16, 2026
249ff25
moq-mux: extract the AV1 byte->frame Split
claude Jun 16, 2026
f4a278a
moq-mux: fix stale lenient-start comments in the h264 importer
claude Jun 16, 2026
fd6e459
Merge branch 'dev' into claude/moq-mux-import-export-api-dtkqdp
claude Jun 16, 2026
db6e812
moq-mux: drop the fixed-track concept; config updates in place
claude Jun 16, 2026
de2d201
moq-mux: convert the importers off anyhow to thiserror
claude Jun 16, 2026
b71966f
moq-mux: make the video importers pure frame publishers
claude Jun 17, 2026
fe95e63
moq-mux: slim Split to a pure stream splitter (decode + flush)
claude Jun 17, 2026
1ee9658
moq-mux: drop the unused Split::decode_from
claude Jun 17, 2026
e58d075
moq-mux: remove Split::seed; feed init bytes as the stream
claude Jun 17, 2026
de7cfb3
moq-mux: fold publish into import; add moq_net::TrackDemand
kixelated Jun 17, 2026
5de839c
moq-mux: importers self-catalog; frame-only decode; shared clock
kixelated Jun 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 4 additions & 14 deletions rs/libmoq/src/publish.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::str::FromStr;

use bytes::Buf;
use moq_mux::import;

use crate::{Error, Id, NonZeroSlab};
Expand Down Expand Up @@ -52,29 +51,20 @@ impl Publish {
Ok(())
}

pub fn media_ordered(&mut self, broadcast: Id, format: &str, mut init: &[u8]) -> Result<Id, Error> {
pub fn media_ordered(&mut self, broadcast: Id, format: &str, init: &[u8]) -> Result<Id, Error> {
let (broadcast, catalog) = self.broadcasts.get(broadcast).ok_or(Error::BroadcastNotFound)?;

let format = import::FramedFormat::from_str(format).map_err(|_| Error::UnknownFormat(format.to_string()))?;
let decoder = import::Framed::new(broadcast.clone(), catalog.clone(), format, &mut init)?;
let decoder = import::Framed::new(broadcast.clone(), catalog.clone(), format, init)?;

let id = self.media.insert(decoder)?;
Ok(id)
}

pub fn media_frame(
&mut self,
media: Id,
mut data: &[u8],
timestamp: hang::container::Timestamp,
) -> Result<(), Error> {
pub fn media_frame(&mut self, media: Id, data: &[u8], timestamp: hang::container::Timestamp) -> Result<(), Error> {
let media = self.media.get_mut(media).ok_or(Error::MediaNotFound)?;

media.decode_frame(&mut data, Some(timestamp))?;

if data.has_remaining() {
return Err(Error::BufferNotConsumed);
}
media.decode(data, Some(timestamp))?;

Ok(())
}
Expand Down
10 changes: 5 additions & 5 deletions rs/moq-boy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ pub struct Config {
/// track monitors, and async tasks.
struct Session {
video_encoder: video::VideoEncoder,
video_track: moq_net::TrackProducer,
audio_track: moq_net::TrackProducer,
video_track: moq_net::TrackDemand,
audio_track: moq_net::TrackDemand,

/// Whether anyone is subscribed to the video/audio tracks.
video_active: AtomicBool,
Expand All @@ -109,7 +109,7 @@ struct Session {
impl Session {
/// Monitor a single track's subscription state.
/// Sets the flag when a viewer subscribes, clears it when all unsubscribe.
async fn run_track_monitor(&self, name: &str, track: &moq_net::TrackProducer, flag: &AtomicBool) {
async fn run_track_monitor(&self, name: &str, track: &moq_net::TrackDemand, flag: &AtomicBool) {
loop {
if track.used().await.is_err() {
break;
Expand Down Expand Up @@ -246,8 +246,8 @@ async fn run(config: &Config) -> Result<()> {

let audio_encoder = audio::AudioEncoder::new(broadcast.clone(), catalog.clone(), 44100)?;

let video_track = video_encoder.track.clone();
let audio_track = audio_encoder.track().clone();
let video_track = video_encoder.demand.clone();
let audio_track = audio_encoder.track().demand();

let status_publisher = status::StatusPublisher::new(&mut broadcast)?;

Expand Down
8 changes: 4 additions & 4 deletions rs/moq-boy/src/video.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use crate::emulator::{HEIGHT, WIDTH};
/// Frames are submitted via `try_frame()` (non-blocking, drops if full).
pub struct VideoEncoder {
tx: tokio::sync::mpsc::Sender<EncoderMsg>,
/// Clone of the video track producer, for monitoring used/unused.
pub track: moq_net::TrackProducer,
/// Watch-only handle to the video track, for monitoring used/unused.
pub demand: moq_net::TrackDemand,
force_keyframe: Arc<AtomicBool>,
/// Latest encode duration in microseconds.
encode_duration: Arc<AtomicU64>,
Expand All @@ -36,7 +36,7 @@ impl VideoEncoder {
pub fn spawn(broadcast: moq_net::BroadcastProducer, catalog: moq_mux::catalog::Producer) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(4);
let producer = moq_video::encode::Producer::new(broadcast, catalog).expect("failed to create avc3 producer");
let track = producer.track().expect("avc3 track is eagerly created").clone();
let demand = producer.demand();

let force_keyframe = Arc::new(AtomicBool::new(false));
let encode_duration = Arc::new(AtomicU64::new(0));
Expand All @@ -49,7 +49,7 @@ impl VideoEncoder {

Self {
tx,
track,
demand,
force_keyframe,
encode_duration,
_thread: thread,
Expand Down
32 changes: 27 additions & 5 deletions rs/moq-cli/src/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ pub struct CaptureArgs {
}

enum PublishDecoder {
Avc3(Box<moq_mux::codec::h264::Import>),
Avc3 {
split: moq_mux::codec::h264::Split,
import: Box<moq_mux::codec::h264::Import>,
},
Fmp4(Box<fmp4::Import>),
Ts(Box<ts::Import>),
Flv(Box<flv::Import>),
Expand All @@ -91,13 +94,27 @@ impl PublishDecoder {
/// Decode a chunk of bytes from stdin (Avc3, Fmp4, Ts, or Flv).
fn decode_buf(&mut self, buffer: &mut bytes::BytesMut) -> anyhow::Result<()> {
match self {
Self::Avc3(d) => Ok(d.decode_stream(buffer, None)?),
Self::Avc3 { split, import } => {
let frames = split.decode(buffer, None)?;
import.decode(frames)?;
Ok(())
}
Self::Fmp4(d) => Ok(d.decode(buffer)?),
Self::Ts(d) => Ok(d.decode(buffer)?),
Self::Flv(d) => Ok(d.decode(buffer)?),
Self::Hls(_) => unreachable!(),
}
}

/// Flush any in-flight access unit at end of stream. The avc3 split holds the
/// final AU until the next start code, so stdin EOF must flush it.
fn finish(&mut self) -> anyhow::Result<()> {
if let Self::Avc3 { split, import } = self {
let tail = split.flush(None)?;
import.decode(tail)?;
}
Ok(())
}
}

// Exactly one Source exists per process, so the size gap between the small
Expand Down Expand Up @@ -130,9 +147,13 @@ impl Publish {

let source = match format {
PublishFormat::Avc3 => {
let avc3 = moq_mux::codec::h264::Import::new(broadcast.clone(), catalog.clone())
.with_mode(moq_mux::codec::h264::Mode::Avc3)?;
Source::Stream(PublishDecoder::Avc3(Box::new(avc3)))
let track = moq_mux::import::unique_track(&mut broadcast, ".avc3")?;
let import = moq_mux::codec::h264::Import::new(track, catalog.clone());
let split = moq_mux::codec::h264::Split::new();
Source::Stream(PublishDecoder::Avc3 {
split,
import: Box::new(import),
})
}
PublishFormat::Fmp4 => {
let fmp4 = fmp4::Import::new(broadcast.clone(), catalog.clone());
Expand Down Expand Up @@ -179,6 +200,7 @@ impl Publish {
loop {
let n = tokio::io::AsyncReadExt::read_buf(&mut stdin, &mut buffer).await?;
if n == 0 {
decoder.finish()?;
return Ok(());
}
decoder.decode_buf(&mut buffer)?;
Expand Down
50 changes: 16 additions & 34 deletions rs/moq-ffi/src/producer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::str::FromStr;
use std::sync::Arc;

use bytes::Buf;

use crate::consumer::{MoqBroadcastConsumer, MoqGroupConsumer, MoqTrackConsumer};
use crate::error::MoqError;
use crate::ffi::Task;
Expand All @@ -16,7 +14,7 @@ pub(crate) struct BroadcastProducer {

struct MediaProducer {
decoder: moq_mux::import::Framed,
track: moq_net::TrackProducer,
demand: moq_net::TrackDemand,
}

struct MediaStreamProducer {
Expand Down Expand Up @@ -127,21 +125,15 @@ impl MoqBroadcastProducer {
let format = moq_mux::import::FramedFormat::from_str(&format)
.map_err(|_| MoqError::Codec(format!("unknown format: {format}")))?;

let mut buf = init.as_slice();
let decoder = moq_mux::import::Framed::new(state.broadcast.clone(), state.catalog.clone(), format, &mut buf)
let decoder = moq_mux::import::Framed::new(state.broadcast.clone(), state.catalog.clone(), format, &init)
.map_err(|err| MoqError::Codec(format!("init failed: {err}")))?;

if buf.has_remaining() {
return Err(MoqError::Codec("init failed: trailing bytes".into()));
}

let track = decoder
.track()
.map_err(|err| MoqError::Codec(format!("track unavailable: {err}")))?
.clone();
let demand = decoder
.demand()
.map_err(|err| MoqError::Codec(format!("track unavailable: {err}")))?;

Ok(Arc::new(MoqMediaProducer {
inner: std::sync::Mutex::new(Some(MediaProducer { decoder, track })),
inner: std::sync::Mutex::new(Some(MediaProducer { decoder, demand })),
}))
}

Expand All @@ -167,22 +159,17 @@ impl MoqBroadcastProducer {
guard.as_ref().ok_or_else(|| MoqError::Closed)?.clone()
};

let mut buf = init.as_slice();
let decoder =
moq_mux::import::Framed::new_with_track(track_clone.clone(), state.catalog.clone(), format, &mut buf)
moq_mux::import::Framed::new_with_track(track_clone.clone(), state.catalog.clone(), format, &init)
.map_err(|err| MoqError::Codec(format!("init failed: {err}")))?;

if buf.has_remaining() {
return Err(MoqError::Codec("init failed: trailing bytes".into()));
}

let mut guard = track.inner.lock().unwrap();
guard.take().ok_or_else(|| MoqError::Closed)?;

Ok(Arc::new(MoqMediaProducer {
inner: std::sync::Mutex::new(Some(MediaProducer {
decoder,
track: track_clone,
demand: track_clone.demand(),
})),
}))
}
Expand Down Expand Up @@ -393,20 +380,20 @@ impl MoqMediaProducer {
let _guard = crate::ffi::RUNTIME.enter();
let guard = self.inner.lock().unwrap();
let media = guard.as_ref().ok_or_else(|| MoqError::Closed)?;
Ok(media.track.name().to_string())
Ok(media.demand.name().to_string())
}

/// Wait until this media track has at least one active consumer.
pub async fn used(&self) -> Result<(), MoqError> {
let track = self
let demand = self
.inner
.lock()
.unwrap()
.as_ref()
.ok_or(MoqError::Closed)?
.track
.demand
.clone();
match crate::ffi::RUNTIME.spawn(async move { track.used().await }).await {
match crate::ffi::RUNTIME.spawn(async move { demand.used().await }).await {
Ok(result) => result.map_err(Into::into),
Err(e) if e.is_cancelled() => Err(MoqError::Cancelled),
Err(e) => Err(MoqError::Task(e)),
Expand All @@ -415,15 +402,15 @@ impl MoqMediaProducer {

/// Wait until this media track has no active consumers.
pub async fn unused(&self) -> Result<(), MoqError> {
let track = self
let demand = self
.inner
.lock()
.unwrap()
.as_ref()
.ok_or(MoqError::Closed)?
.track
.demand
.clone();
match crate::ffi::RUNTIME.spawn(async move { track.unused().await }).await {
match crate::ffi::RUNTIME.spawn(async move { demand.unused().await }).await {
Ok(result) => result.map_err(Into::into),
Err(e) if e.is_cancelled() => Err(MoqError::Cancelled),
Err(e) => Err(MoqError::Task(e)),
Expand All @@ -439,16 +426,11 @@ impl MoqMediaProducer {
let media = guard.as_mut().ok_or_else(|| MoqError::Closed)?;

let timestamp = hang::container::Timestamp::from_micros(timestamp_us)?;
let mut data = payload.as_slice();
media
.decoder
.decode_frame(&mut data, Some(timestamp))
.decode(&payload, Some(timestamp))
.map_err(|err| MoqError::Codec(format!("decode failed: {err}")))?;

if data.has_remaining() {
return Err(MoqError::Codec("buffer was not fully consumed".into()));
}

Ok(())
}

Expand Down
38 changes: 17 additions & 21 deletions rs/moq-gst/src/sink/imp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,32 +448,32 @@ fn handle_caps(runtime: &mut RuntimeState, pad_name: String, caps: gst::Caps) ->
let structure = caps.structure(0).context("empty caps")?;
let decoder: moq_mux::import::Framed = match structure.name().as_str() {
"video/x-h264" => {
let mut bytes = Bytes::new();
new_decoder(runtime, moq_mux::import::FramedFormat::Avc3, &mut bytes)?
let bytes = Bytes::new();
new_decoder(runtime, moq_mux::import::FramedFormat::Avc3, &bytes)?
}
"video/x-h265" => {
let mut bytes = Bytes::new();
new_decoder(runtime, moq_mux::import::FramedFormat::Hev1, &mut bytes)?
let bytes = Bytes::new();
new_decoder(runtime, moq_mux::import::FramedFormat::Hev1, &bytes)?
}
"video/x-av1" => {
let mut bytes = Bytes::new();
new_decoder(runtime, moq_mux::import::FramedFormat::Av01, &mut bytes)?
let bytes = Bytes::new();
new_decoder(runtime, moq_mux::import::FramedFormat::Av01, &bytes)?
}
"video/x-vp8" => {
let mut bytes = Bytes::new();
new_decoder(runtime, moq_mux::import::FramedFormat::Vp8, &mut bytes)?
let bytes = Bytes::new();
new_decoder(runtime, moq_mux::import::FramedFormat::Vp8, &bytes)?
}
"video/x-vp9" => {
let mut bytes = Bytes::new();
new_decoder(runtime, moq_mux::import::FramedFormat::Vp9, &mut bytes)?
let bytes = Bytes::new();
new_decoder(runtime, moq_mux::import::FramedFormat::Vp9, &bytes)?
}
"audio/mpeg" => {
let codec_data = structure
.get::<gst::Buffer>("codec_data")
.context("AAC caps missing codec_data")?;
let map = codec_data.map_readable().context("failed to map codec_data")?;
let mut data = Bytes::copy_from_slice(map.as_slice());
new_decoder(runtime, moq_mux::import::FramedFormat::Aac, &mut data)?
let data = Bytes::copy_from_slice(map.as_slice());
new_decoder(runtime, moq_mux::import::FramedFormat::Aac, &data)?
}
"audio/x-opus" => {
let channels: i32 = structure.get("channels").unwrap_or(2);
Expand All @@ -486,7 +486,8 @@ fn handle_caps(runtime: &mut RuntimeState, pad_name: String, caps: gst::Caps) ->
sample_rate,
channel_count,
};
moq_mux::codec::opus::Import::new(runtime.broadcast.clone(), runtime.catalog.clone(), config)?.into()
let track = moq_mux::import::unique_track(&mut runtime.broadcast, ".opus")?;
moq_mux::codec::opus::Import::new(track, runtime.catalog.clone(), config)?.into()
}
other => anyhow::bail!("unsupported caps: {}", other),
};
Expand All @@ -504,18 +505,13 @@ fn handle_caps(runtime: &mut RuntimeState, pad_name: String, caps: gst::Caps) ->
fn new_decoder(
runtime: &mut RuntimeState,
format: moq_mux::import::FramedFormat,
buf: &mut Bytes,
buf: &[u8],
) -> Result<moq_mux::import::Framed> {
let decoder = moq_mux::import::Framed::new(runtime.broadcast.clone(), runtime.catalog.clone(), format, buf)?;
Ok(decoder)
}

fn handle_buffer(
runtime: &mut RuntimeState,
pad_name: String,
mut data: Bytes,
pts: Option<gst::ClockTime>,
) -> Result<()> {
fn handle_buffer(runtime: &mut RuntimeState, pad_name: String, data: Bytes, pts: Option<gst::ClockTime>) -> Result<()> {
let pad = runtime.pads.get_mut(&pad_name).context("pad not configured")?;

let ts = pts.and_then(|pts| {
Expand All @@ -524,5 +520,5 @@ fn handle_buffer(
hang::container::Timestamp::from_micros(relative.nseconds() / 1000).ok()
});

pad.decoder.decode_frame(&mut data, ts).map_err(|e| anyhow::anyhow!(e))
pad.decoder.decode(&data, ts).map_err(|e| anyhow::anyhow!(e))
}
2 changes: 2 additions & 0 deletions rs/moq-mux/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ mod format;
mod producer;
mod stream;
mod target;
mod tracks;

pub use consumer::Consumer;
pub use filter::{Filter, FilterAudio, FilterVideo};
pub use format::*;
pub use producer::{Guard, Producer};
pub use stream::Stream;
pub use target::{Target, TargetAudio, TargetVideo};
pub use tracks::{AudioTrack, VideoTrack};
Loading