Skip to content

feat(rtmp): FrameLifecycleListener for pooled-copy buffer management#2074

Closed
pird32 wants to merge 2 commits intopedroSG94:masterfrom
pird32:fork/rootencoder-rtmp-memory
Closed

feat(rtmp): FrameLifecycleListener for pooled-copy buffer management#2074
pird32 wants to merge 2 commits intopedroSG94:masterfrom
pird32:fork/rootencoder-rtmp-memory

Conversation

@pird32
Copy link
Copy Markdown

@pird32 pird32 commented Apr 16, 2026

Problem

At 30 fps video + 50 fps audio, the RTMP send path requires a defensive ByteBuffer.clone() before each sendMediaFrame() call because MediaCodec output buffers are reclaimed after the callback returns. This creates ~80 heap allocations per second, which causes GC pressure and monotonic heap growth under sustained load.

Callers currently have no way to know when the sender has finished reading the buffer, so they cannot safely implement a buffer pool — the recycled buffer might still be referenced by a queued frame.

Solution

This PR (which builds on the shared contract from PR #N) exposes a per-sender lifecycle hook:

rtmpSender.frameLifecycleListener = FrameLifecycleListener { frame ->
    // called from the sender's IO thread AFTER network write
    myPool.release(frame.data)
}

notifyFrameConsumed(frame) is called from RtmpSender.onRun() after each frame's network write completes. The sender does not access frame.data after this point.

Also emits ConnectChecker.onTransportEvent(NetworkSendError) from the error path alongside the existing onConnectionFailed() call, for structured error handling.

Thread contract

onFrameConsumed is called on the sender's Dispatchers.IO coroutine. Implementations must not block.

Backward compatibility

frameLifecycleListener defaults to null. Existing senders behave identically with no listener registered. Callers that currently use clone() can continue to do so — this is purely additive.

Example: zero-allocation RTMP send

val pool = ByteBufferPool(capacity = 128)
rtmpSender.frameLifecycleListener = FrameLifecycleListener { frame ->
    pool.release(frame.data)
}

// In MediaCodec callback:
val buf = pool.acquire(videoBuffer) ?: return // pool exhausted = queue full, drop
rtmpSender.sendMediaFrame(MediaFrame(buf, info.toMediaFrameInfo(), MediaFrame.Type.VIDEO))

pird32 added 2 commits April 16, 2026 17:51
- BaseSender: fix getCacheSize() to track actual capacity after resizeCache()
  (was always returning initial 400 even after resizeCache(128))
- BaseSender: add getQueueSnapshot() returning QueueSnapshot(capacity, items)
- BaseSender: add frameLifecycleListener for pooled-copy buffer recycling
- BaseSender: emit rate-limited ConnectChecker.onTransportEvent(QueueOverflow)
  when sendMediaFrame() drops a frame (cooldown: 1500 ms, no flooding)
- New: QueueSnapshot data class with usageRatio and summary()
- New: TransportEvent sealed class (QueueOverflow, NetworkSendError)
- New: FrameLifecycleListener fun interface for buffer pool integration
- ConnectChecker: add onTransportEvent(TransportEvent) default no-op method

Upstream-friendliness: onTransportEvent is a default method; existing
ConnectChecker implementors are not required to override it.

Made-with: Cursor
…all senders

All four protocol senders (RtmpSender, SrtSender, RtspSender, UdpSender) now:
- Call notifyFrameConsumed(mediaFrame) after the dispatch loop finishes
  processing each frame (after network write). Wired to frameLifecycleListener
  so buffer pools can release slots at the correct point in the lifecycle.
- Emit ConnectChecker.onTransportEvent(NetworkSendError) on send errors in
  addition to the existing onConnectionFailed(String) call (backward compatible).

Made-with: Cursor
@pird32
Copy link
Copy Markdown
Author

pird32 commented Apr 16, 2026

Closing this PR to keep only one consolidated PR for easier review/testing, as requested. All changes are included in #2072.

@pird32 pird32 closed this Apr 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant