From 9020d5e3a6bfaffcd067cdbc972f932a4db685e3 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 27 May 2026 16:49:29 -0400 Subject: [PATCH 01/19] feat: add initial support for frame processor usage directly on tracks --- packages/livekit-rtc/src/audio_stream.ts | 28 ++++- packages/livekit-rtc/src/frame_processor.ts | 4 + packages/livekit-rtc/src/room.ts | 8 ++ packages/livekit-rtc/src/track.ts | 130 ++++++++++++++++++++ packages/livekit-rtc/tsconfig.json | 3 +- 5 files changed, 169 insertions(+), 4 deletions(-) diff --git a/packages/livekit-rtc/src/audio_stream.ts b/packages/livekit-rtc/src/audio_stream.ts index 97cea8b6..d3251251 100644 --- a/packages/livekit-rtc/src/audio_stream.ts +++ b/packages/livekit-rtc/src/audio_stream.ts @@ -13,6 +13,12 @@ import type { Track } from './track.js'; export interface AudioStreamOptions { noiseCancellation?: NoiseCancellationOptions | FrameProcessor; + /** + * If true and `noiseCancellation` is a {@link FrameProcessor}, leaves the + * processor open when the stream closes so the same processor can be reused + * with another {@link AudioStream}. Defaults to `false`. + */ + noiseCancellationLeaveOpen?: boolean; sampleRate?: number; numChannels?: number; frameSizeMs?: number; @@ -31,19 +37,23 @@ class AudioStreamSource implements UnderlyingSource { private sampleRate: number; private numChannels: number; private legacyNcOptions?: NoiseCancellationOptions; - private frameProcessor?: FrameProcessor; + private frameProcessor: FrameProcessor | null = null; + private leaveProcessorOpen = false; private frameSizeMs?: number; + private track: Track; constructor( track: Track, sampleRateOrOptions?: number | AudioStreamOptions, numChannels?: number, ) { + this.track = track; if (sampleRateOrOptions !== undefined && typeof sampleRateOrOptions !== 'number') { this.sampleRate = sampleRateOrOptions.sampleRate ?? 48000; this.numChannels = sampleRateOrOptions.numChannels ?? 1; if (isFrameProcessor(sampleRateOrOptions.noiseCancellation)) { this.frameProcessor = sampleRateOrOptions.noiseCancellation; + this.leaveProcessorOpen = sampleRateOrOptions.noiseCancellationLeaveOpen ?? false; } else { this.legacyNcOptions = sampleRateOrOptions.noiseCancellation; } @@ -77,6 +87,12 @@ class AudioStreamSource implements UnderlyingSource { this.ffiHandle = new FfiHandle(res.stream!.handle!.id!); FfiClient.instance.on(FfiClientEvent.FfiEvent, this.onEvent); + track.registerAudioStream(this); + } + + /** @internal */ + get processor(): FrameProcessor | null { + return this.frameProcessor; } private onEvent = (ev: FfiEvent) => { @@ -113,8 +129,11 @@ class AudioStreamSource implements UnderlyingSource { // while buffered frames are still in the ReadableStream queue. if (!this.disposed) { this.disposed = true; + this.track.unregisterAudioStream(this); this.ffiHandle.dispose(); - this.frameProcessor?.close(); + if (this.frameProcessor && !this.leaveProcessorOpen) { + this.frameProcessor.close(); + } } break; } @@ -128,10 +147,13 @@ class AudioStreamSource implements UnderlyingSource { FfiClient.instance.off(FfiClientEvent.FfiEvent, this.onEvent); if (!this.disposed) { this.disposed = true; + this.track.unregisterAudioStream(this); this.ffiHandle.dispose(); // Also close the frame processor on cancel for symmetry with the EOS path, // so resources are released regardless of how the stream ends. - this.frameProcessor?.close(); + if (this.frameProcessor && !this.leaveProcessorOpen) { + this.frameProcessor.close(); + } } } } diff --git a/packages/livekit-rtc/src/frame_processor.ts b/packages/livekit-rtc/src/frame_processor.ts index 629d883a..8feeaa2b 100644 --- a/packages/livekit-rtc/src/frame_processor.ts +++ b/packages/livekit-rtc/src/frame_processor.ts @@ -43,7 +43,11 @@ export abstract class FrameProcessor { // eslint-disable-next-line @typescript-eslint/no-unused-vars onStreamInfoUpdated(_info: FrameProcessorStreamInfo): void {} // eslint-disable-next-line @typescript-eslint/no-unused-vars + onStreamInfoCleared(): void {} + // eslint-disable-next-line @typescript-eslint/no-unused-vars onCredentialsUpdated(_credentials: FrameProcessorCredentials): void {} + // eslint-disable-next-line @typescript-eslint/no-unused-vars + onCredentialsCleared(): void {} abstract process(frame: Frame): Frame; abstract close(): void; diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 7bc1f0c5..7ffed15b 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -557,9 +557,15 @@ export class Room extends (EventEmitter as new () => TypedEmitter } } else if (ev.case == 'localTrackPublished') { const publication = this.localParticipant.trackPublications.get(ev.value.trackSid!); + if (publication?.track) { + publication.track.setRoom(this); + } this.emit(RoomEvent.LocalTrackPublished, publication!, this.localParticipant); } else if (ev.case == 'localTrackUnpublished') { const publication = this.localParticipant.trackPublications.get(ev.value.publicationSid!); + if (publication?.track) { + publication.track.setRoom(null); + } this.localParticipant.trackPublications.delete(ev.value.publicationSid!); this.emit(RoomEvent.LocalTrackUnpublished, publication!, this.localParticipant!); } else if ((ev.case as string) == 'localTrackRepublished') { @@ -620,6 +626,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter } else if (trackInfo.kind == TrackKind.KIND_AUDIO) { publication.track = new RemoteAudioTrack(ownedTrack); } + publication.track?.setRoom(this); this.emit(RoomEvent.TrackSubscribed, publication.track!, publication, participant); } catch (e: unknown) { @@ -632,6 +639,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter ev.value.trackSid!, ); const track = publication.track!; + track.setRoom(null); publication.track = undefined; publication.subscribed = false; this.emit(RoomEvent.TrackUnsubscribed, track, publication, participant); diff --git a/packages/livekit-rtc/src/track.ts b/packages/livekit-rtc/src/track.ts index 1a25dce0..a51587ca 100644 --- a/packages/livekit-rtc/src/track.ts +++ b/packages/livekit-rtc/src/track.ts @@ -10,10 +10,18 @@ import type { TrackKind, } from '@livekit/rtc-ffi-bindings'; import { CreateAudioTrackRequest, CreateVideoTrackRequest } from '@livekit/rtc-ffi-bindings'; +import type { AudioFrame } from './audio_frame.js'; import type { AudioSource } from './audio_source.js'; import { FfiClient, FfiHandle } from './ffi_client.js'; +import type { FrameProcessor } from './frame_processor.js'; +import type { Room } from './room.js'; import type { VideoSource } from './video_source.js'; +/** @internal */ +export interface AudioStreamLike { + readonly processor: FrameProcessor | null; +} + export abstract class Track { /** @internal */ info?: TrackInfo; @@ -21,9 +29,131 @@ export abstract class Track { /** @internal */ ffi_handle: FfiHandle; + private roomRef: WeakRef | null = null; + private audioStreams: Set> = new Set(); + private streamFinalizationRegistry: FinalizationRegistry>; + private onRoomTokenRefreshed = () => { + const room = this.resolveRoom(); + if (!room || !room.token || !room.serverUrl) return; + for (const stream of this.iterateStreams()) { + const processor = stream.processor; + if (!processor) continue; + processor.onCredentialsUpdated({ token: room.token, url: room.serverUrl }); + } + }; + constructor(owned: OwnedTrack) { this.info = owned.info; this.ffi_handle = new FfiHandle(owned.handle!.id!); + this.streamFinalizationRegistry = new FinalizationRegistry>((ref) => { + this.audioStreams.delete(ref); + }); + } + + /** @internal */ + resolveRoom(): Room | null { + return this.roomRef?.deref() ?? null; + } + + /** @internal */ + setRoom(room: Room | null): void { + const oldRoom = this.resolveRoom(); + if (oldRoom !== room) { + if (oldRoom) { + oldRoom.off('tokenRefreshed', this.onRoomTokenRefreshed); + } + if (room) { + room.on('tokenRefreshed', this.onRoomTokenRefreshed); + } + } + this.roomRef = room ? new WeakRef(room) : null; + for (const stream of this.iterateStreams()) { + this.pushProcessorMetadataToStream(stream, room); + } + } + + /** @internal */ + registerAudioStream(stream: AudioStreamLike): void { + const ref = new WeakRef(stream); + this.audioStreams.add(ref); + this.streamFinalizationRegistry.register(stream, ref); + const room = this.resolveRoom(); + if (room) { + this.pushProcessorMetadataToStream(stream, room); + } + } + + /** @internal */ + unregisterAudioStream(stream: AudioStreamLike): void { + for (const ref of this.audioStreams) { + if (ref.deref() === stream) { + this.audioStreams.delete(ref); + return; + } + } + } + + private *iterateStreams(): Generator { + const dead: Array> = []; + for (const ref of this.audioStreams) { + const stream = ref.deref(); + if (stream) { + yield stream; + } else { + dead.push(ref); + } + } + for (const ref of dead) { + this.audioStreams.delete(ref); + } + } + + private pushProcessorMetadataToStream(stream: AudioStreamLike, room: Room | null): void { + const processor = stream.processor; + if (!processor) return; + + if (!room) { + processor.onStreamInfoCleared(); + processor.onCredentialsCleared(); + return; + } + + let identity = ''; + let publicationSid = ''; + const trackSid = this.sid; + if (trackSid) { + let found = false; + for (const participant of room.remoteParticipants.values()) { + const publication = participant.trackPublications.get(trackSid); + if (publication) { + identity = participant.identity; + publicationSid = publication.sid ?? ''; + found = true; + break; + } + } + if (!found) { + const local = room.localParticipant; + if (local) { + for (const publication of local.trackPublications.values()) { + if (publication.sid === trackSid) { + identity = local.identity; + publicationSid = publication.sid ?? ''; + break; + } + } + } + } + } + + processor.onStreamInfoUpdated({ + roomName: room.name ?? '', + participantIdentity: identity, + publicationSid, + }); + if (room.token && room.serverUrl) { + processor.onCredentialsUpdated({ token: room.token, url: room.serverUrl }); + } } get sid(): string | undefined { diff --git a/packages/livekit-rtc/tsconfig.json b/packages/livekit-rtc/tsconfig.json index 21aeb92d..0c1a2ac4 100644 --- a/packages/livekit-rtc/tsconfig.json +++ b/packages/livekit-rtc/tsconfig.json @@ -2,7 +2,8 @@ "extends": "../../tsconfig.json", "compilerOptions": { "outDir": "dist", - "declarationDir": "dist" + "declarationDir": "dist", + "lib": ["es2015", "es2021.weakref"] }, "include": ["src/**/*.ts"], "exclude": ["src/**/*.test.ts", "vite.config.ts"] From 3623d33549531bb793194022137827c42d9a1dc8 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Thu, 28 May 2026 12:35:26 -0400 Subject: [PATCH 02/19] fix: add optional chaining call to new *Cleared FrameProcessor methods --- packages/livekit-rtc/src/track.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/livekit-rtc/src/track.ts b/packages/livekit-rtc/src/track.ts index a51587ca..dfcc690f 100644 --- a/packages/livekit-rtc/src/track.ts +++ b/packages/livekit-rtc/src/track.ts @@ -113,8 +113,11 @@ export abstract class Track { if (!processor) return; if (!room) { - processor.onStreamInfoCleared(); - processor.onCredentialsCleared(); + // Guard with optional-call: plugins built against an older @livekit/rtc-node + // inherit a FrameProcessor base class that doesn't define these methods, + // so they could be undefined on the prototype chain. + processor.onStreamInfoCleared?.(); + processor.onCredentialsCleared?.(); return; } From 7fea123dd80fead7422f32bb6de747a6042954df Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Thu, 28 May 2026 13:32:48 -0400 Subject: [PATCH 03/19] fix: add missing changeset --- .changeset/proud-pianos-joke.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/proud-pianos-joke.md diff --git a/.changeset/proud-pianos-joke.md b/.changeset/proud-pianos-joke.md new file mode 100644 index 00000000..06ece037 --- /dev/null +++ b/.changeset/proud-pianos-joke.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Add initial support for frame processor usage directly on tracks From 1b10da8b45a2d2a631c4eea6c2f9509391fbe060 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Thu, 28 May 2026 13:37:25 -0400 Subject: [PATCH 04/19] fix: get rid of AudioStreamLike --- packages/livekit-rtc/src/audio_stream.ts | 2 +- packages/livekit-rtc/src/track.ts | 24 +++++++++--------------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/packages/livekit-rtc/src/audio_stream.ts b/packages/livekit-rtc/src/audio_stream.ts index d3251251..1e4083d2 100644 --- a/packages/livekit-rtc/src/audio_stream.ts +++ b/packages/livekit-rtc/src/audio_stream.ts @@ -30,7 +30,7 @@ export interface NoiseCancellationOptions { options: Record; } -class AudioStreamSource implements UnderlyingSource { +export class AudioStreamSource implements UnderlyingSource { private controller?: ReadableStreamDefaultController; private ffiHandle: FfiHandle; private disposed = false; diff --git a/packages/livekit-rtc/src/track.ts b/packages/livekit-rtc/src/track.ts index dfcc690f..d6b66681 100644 --- a/packages/livekit-rtc/src/track.ts +++ b/packages/livekit-rtc/src/track.ts @@ -10,17 +10,11 @@ import type { TrackKind, } from '@livekit/rtc-ffi-bindings'; import { CreateAudioTrackRequest, CreateVideoTrackRequest } from '@livekit/rtc-ffi-bindings'; -import type { AudioFrame } from './audio_frame.js'; import type { AudioSource } from './audio_source.js'; import { FfiClient, FfiHandle } from './ffi_client.js'; -import type { FrameProcessor } from './frame_processor.js'; import type { Room } from './room.js'; import type { VideoSource } from './video_source.js'; - -/** @internal */ -export interface AudioStreamLike { - readonly processor: FrameProcessor | null; -} +import type { AudioStreamSource } from './audio_stream.js'; export abstract class Track { /** @internal */ @@ -30,8 +24,8 @@ export abstract class Track { ffi_handle: FfiHandle; private roomRef: WeakRef | null = null; - private audioStreams: Set> = new Set(); - private streamFinalizationRegistry: FinalizationRegistry>; + private audioStreams: Set> = new Set(); + private streamFinalizationRegistry: FinalizationRegistry>; private onRoomTokenRefreshed = () => { const room = this.resolveRoom(); if (!room || !room.token || !room.serverUrl) return; @@ -45,7 +39,7 @@ export abstract class Track { constructor(owned: OwnedTrack) { this.info = owned.info; this.ffi_handle = new FfiHandle(owned.handle!.id!); - this.streamFinalizationRegistry = new FinalizationRegistry>((ref) => { + this.streamFinalizationRegistry = new FinalizationRegistry>((ref) => { this.audioStreams.delete(ref); }); } @@ -73,7 +67,7 @@ export abstract class Track { } /** @internal */ - registerAudioStream(stream: AudioStreamLike): void { + registerAudioStream(stream: AudioStreamSource): void { const ref = new WeakRef(stream); this.audioStreams.add(ref); this.streamFinalizationRegistry.register(stream, ref); @@ -84,7 +78,7 @@ export abstract class Track { } /** @internal */ - unregisterAudioStream(stream: AudioStreamLike): void { + unregisterAudioStream(stream: AudioStreamSource): void { for (const ref of this.audioStreams) { if (ref.deref() === stream) { this.audioStreams.delete(ref); @@ -93,8 +87,8 @@ export abstract class Track { } } - private *iterateStreams(): Generator { - const dead: Array> = []; + private *iterateStreams(): Generator { + const dead: Array> = []; for (const ref of this.audioStreams) { const stream = ref.deref(); if (stream) { @@ -108,7 +102,7 @@ export abstract class Track { } } - private pushProcessorMetadataToStream(stream: AudioStreamLike, room: Room | null): void { + private pushProcessorMetadataToStream(stream: AudioStreamSource, room: Room | null): void { const processor = stream.processor; if (!processor) return; From 9e496835aa5ed43fac90e6fbec048d71395e649d Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Thu, 28 May 2026 13:44:16 -0400 Subject: [PATCH 05/19] fix: run prettier --- packages/livekit-rtc/src/track.ts | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/packages/livekit-rtc/src/track.ts b/packages/livekit-rtc/src/track.ts index d6b66681..946c6e0c 100644 --- a/packages/livekit-rtc/src/track.ts +++ b/packages/livekit-rtc/src/track.ts @@ -11,10 +11,10 @@ import type { } from '@livekit/rtc-ffi-bindings'; import { CreateAudioTrackRequest, CreateVideoTrackRequest } from '@livekit/rtc-ffi-bindings'; import type { AudioSource } from './audio_source.js'; +import type { AudioStreamSource } from './audio_stream.js'; import { FfiClient, FfiHandle } from './ffi_client.js'; import type { Room } from './room.js'; import type { VideoSource } from './video_source.js'; -import type { AudioStreamSource } from './audio_stream.js'; export abstract class Track { /** @internal */ @@ -31,7 +31,9 @@ export abstract class Track { if (!room || !room.token || !room.serverUrl) return; for (const stream of this.iterateStreams()) { const processor = stream.processor; - if (!processor) continue; + if (!processor) { + continue; + } processor.onCredentialsUpdated({ token: room.token, url: room.serverUrl }); } }; @@ -39,9 +41,11 @@ export abstract class Track { constructor(owned: OwnedTrack) { this.info = owned.info; this.ffi_handle = new FfiHandle(owned.handle!.id!); - this.streamFinalizationRegistry = new FinalizationRegistry>((ref) => { - this.audioStreams.delete(ref); - }); + this.streamFinalizationRegistry = new FinalizationRegistry>( + (ref) => { + this.audioStreams.delete(ref); + }, + ); } /** @internal */ @@ -104,7 +108,9 @@ export abstract class Track { private pushProcessorMetadataToStream(stream: AudioStreamSource, room: Room | null): void { const processor = stream.processor; - if (!processor) return; + if (!processor) { + return; + } if (!room) { // Guard with optional-call: plugins built against an older @livekit/rtc-node From 793b649befb1dc2edd7ca272fc0d8a57d1aef5ec Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Fri, 29 May 2026 10:17:27 -0400 Subject: [PATCH 06/19] fix: address devin issue --- packages/livekit-rtc/src/track.ts | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/packages/livekit-rtc/src/track.ts b/packages/livekit-rtc/src/track.ts index 946c6e0c..621f4067 100644 --- a/packages/livekit-rtc/src/track.ts +++ b/packages/livekit-rtc/src/track.ts @@ -56,13 +56,12 @@ export abstract class Track { /** @internal */ setRoom(room: Room | null): void { const oldRoom = this.resolveRoom(); - if (oldRoom !== room) { - if (oldRoom) { - oldRoom.off('tokenRefreshed', this.onRoomTokenRefreshed); - } - if (room) { - room.on('tokenRefreshed', this.onRoomTokenRefreshed); - } + if (oldRoom && oldRoom !== room) { + oldRoom.off('tokenRefreshed', this.onRoomTokenRefreshed); + } + if (room) { + room.off('tokenRefreshed', this.onRoomTokenRefreshed); + room.on('tokenRefreshed', this.onRoomTokenRefreshed); } this.roomRef = room ? new WeakRef(room) : null; for (const stream of this.iterateStreams()) { From 479f8bc18ec68d320f999fcb3b47fcf8df93b0bb Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 24 Jun 2026 14:40:52 -0400 Subject: [PATCH 07/19] feat: port in updates from https://github.com/livekit/python-sdks/pull/679 --- packages/livekit-rtc/src/audio_stream.ts | 17 +++++++++-------- packages/livekit-rtc/src/participant.ts | 9 +++++++++ packages/livekit-rtc/src/room.ts | 23 +++++++++++++++++++++-- packages/livekit-rtc/src/track.ts | 7 +++++++ 4 files changed, 46 insertions(+), 10 deletions(-) diff --git a/packages/livekit-rtc/src/audio_stream.ts b/packages/livekit-rtc/src/audio_stream.ts index 1e4083d2..f537f577 100644 --- a/packages/livekit-rtc/src/audio_stream.ts +++ b/packages/livekit-rtc/src/audio_stream.ts @@ -14,11 +14,12 @@ import type { Track } from './track.js'; export interface AudioStreamOptions { noiseCancellation?: NoiseCancellationOptions | FrameProcessor; /** - * If true and `noiseCancellation` is a {@link FrameProcessor}, leaves the - * processor open when the stream closes so the same processor can be reused - * with another {@link AudioStream}. Defaults to `false`. + * When the audio stream closes, whether to run the {@link FrameProcessor}'s + * `close()` method. If `false`, the processor is left open so it can be + * reused with another {@link AudioStream}. Only relevant when + * `noiseCancellation` is a {@link FrameProcessor}. Defaults to `true`. */ - noiseCancellationLeaveOpen?: boolean; + autoCloseNoiseCancellation?: boolean; sampleRate?: number; numChannels?: number; frameSizeMs?: number; @@ -38,7 +39,7 @@ export class AudioStreamSource implements UnderlyingSource { private numChannels: number; private legacyNcOptions?: NoiseCancellationOptions; private frameProcessor: FrameProcessor | null = null; - private leaveProcessorOpen = false; + private autoCloseProcessor = true; private frameSizeMs?: number; private track: Track; @@ -53,7 +54,7 @@ export class AudioStreamSource implements UnderlyingSource { this.numChannels = sampleRateOrOptions.numChannels ?? 1; if (isFrameProcessor(sampleRateOrOptions.noiseCancellation)) { this.frameProcessor = sampleRateOrOptions.noiseCancellation; - this.leaveProcessorOpen = sampleRateOrOptions.noiseCancellationLeaveOpen ?? false; + this.autoCloseProcessor = sampleRateOrOptions.autoCloseNoiseCancellation ?? true; } else { this.legacyNcOptions = sampleRateOrOptions.noiseCancellation; } @@ -131,7 +132,7 @@ export class AudioStreamSource implements UnderlyingSource { this.disposed = true; this.track.unregisterAudioStream(this); this.ffiHandle.dispose(); - if (this.frameProcessor && !this.leaveProcessorOpen) { + if (this.frameProcessor && this.autoCloseProcessor) { this.frameProcessor.close(); } } @@ -151,7 +152,7 @@ export class AudioStreamSource implements UnderlyingSource { this.ffiHandle.dispose(); // Also close the frame processor on cancel for symmetry with the EOS path, // so resources are released regardless of how the stream ends. - if (this.frameProcessor && !this.leaveProcessorOpen) { + if (this.frameProcessor && this.autoCloseProcessor) { this.frameProcessor.close(); } } diff --git a/packages/livekit-rtc/src/participant.ts b/packages/livekit-rtc/src/participant.ts index d57a2f19..4a6a2ce4 100644 --- a/packages/livekit-rtc/src/participant.ts +++ b/packages/livekit-rtc/src/participant.ts @@ -767,6 +767,15 @@ export class LocalParticipant extends Participant { const pub = this.trackPublications.get(trackSid); if (pub) { + // Clear the processor's room context here too: this path races the + // localTrackUnpublished room event, and whichever loses finds the + // publication already gone and skips its own setRoom(null). Calling it + // from both paths guarantees the processor is cleared (and the + // tokenRefreshed listener detached); setRoom(null) is idempotent, so a + // double-clear when this path wins is safe. + if (pub.track) { + pub.track.setRoom(null); + } pub.track = undefined; } this.trackPublications.delete(trackSid); diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 7ffed15b..85efa85d 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -563,11 +563,21 @@ export class Room extends (EventEmitter as new () => TypedEmitter this.emit(RoomEvent.LocalTrackPublished, publication!, this.localParticipant); } else if (ev.case == 'localTrackUnpublished') { const publication = this.localParticipant.trackPublications.get(ev.value.publicationSid!); - if (publication?.track) { - publication.track.setRoom(null); + const track = publication?.track; + if (track) { + track.setRoom(null); } this.localParticipant.trackPublications.delete(ev.value.publicationSid!); + // Emit while `publication.track` is still set, preserving the pre-existing + // payload for callbacks. The handler is synchronous, so nulling the track + // right after still completes before any other turn can observe it. this.emit(RoomEvent.LocalTrackUnpublished, publication!, this.localParticipant!); + // Mirror trackUnsubscribed: drop the publication's track reference. This + // also makes unpublishTrack's own setRoom(null) a no-op when it loses the + // race (its `pub.track` guard short-circuits), avoiding a redundant clear. + if (track && publication) { + publication.track = undefined; + } } else if ((ev.case as string) == 'localTrackRepublished') { const value = (ev as any).value; const previousSid: string = value.previousSid!; @@ -577,6 +587,15 @@ export class Room extends (EventEmitter as new () => TypedEmitter publication.updateInfo(newInfo); this.localParticipant.trackPublications.delete(previousSid); this.localParticipant.trackPublications.set(publication.sid!, publication); + if (publication.track?.info) { + // Keep the local-track invariant (track.sid == publication.sid, set at + // publishTrack) intact across republish, then re-push metadata so any + // attached FrameProcessor learns the new publication SID / credentials. + // setRoom with the same room is a no-op for the tokenRefreshed listener + // but re-fans the metadata to every registered AudioStream. + publication.track.info.sid = publication.sid; + publication.track.setRoom(this); + } this.emit(RoomEvent.LocalTrackRepublished, publication, previousSid, this.localParticipant); } else { log.warn(`RoomEvent.LocalTrackRepublished: previous publication not found: ${previousSid}`); diff --git a/packages/livekit-rtc/src/track.ts b/packages/livekit-rtc/src/track.ts index 621f4067..b5ca1193 100644 --- a/packages/livekit-rtc/src/track.ts +++ b/packages/livekit-rtc/src/track.ts @@ -56,6 +56,13 @@ export abstract class Track { /** @internal */ setRoom(room: Room | null): void { const oldRoom = this.resolveRoom(); + if (!oldRoom && !room) { + // Already roomless — nothing to detach and nothing to re-clear. Without + // this guard a second setRoom(null) (e.g. the unpublishTrack / + // localTrackUnpublished race calling it from both paths) would re-fire + // onStreamInfoCleared / onCredentialsCleared on every registered processor. + return; + } if (oldRoom && oldRoom !== room) { oldRoom.off('tokenRefreshed', this.onRoomTokenRefreshed); } From 1fea4b10e8ae6f62d1308bbf3d9400f14568c11e Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 24 Jun 2026 14:45:30 -0400 Subject: [PATCH 08/19] feat: add audio stream tests to exercise new frame processor paths --- .../src/audio_stream_room_lifecycle.test.ts | 670 ++++++++++++++++++ 1 file changed, 670 insertions(+) create mode 100644 packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts diff --git a/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts b/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts new file mode 100644 index 00000000..11b395dd --- /dev/null +++ b/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts @@ -0,0 +1,670 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { OwnedTrack } from '@livekit/rtc-ffi-bindings'; +import { describe, expect, it, vi } from 'vitest'; +import type { AudioFrame } from './audio_frame.js'; +import type { AudioStreamSource } from './audio_stream.js'; +import { FfiClient } from './ffi_client.js'; +import { + FrameProcessor, + type FrameProcessorCredentials, + type FrameProcessorStreamInfo, +} from './frame_processor.js'; +import { LocalParticipant } from './participant.js'; +import { Room } from './room.js'; +import { RemoteAudioTrack, type Track } from './track.js'; +import { LocalTrackPublication } from './track_publication.js'; + +class RecordingProcessor extends FrameProcessor { + enabled = false; + streamInfoCalls: Array = []; + credentialsCalls: Array = []; + streamInfoClearedCount = 0; + credentialsClearedCount = 0; + closeCount = 0; + isEnabled(): boolean { + return this.enabled; + } + setEnabled(v: boolean): void { + this.enabled = v; + } + onStreamInfoUpdated(info: FrameProcessorStreamInfo): void { + this.streamInfoCalls.push(info); + } + onStreamInfoCleared(): void { + this.streamInfoClearedCount += 1; + } + onCredentialsUpdated(c: FrameProcessorCredentials): void { + this.credentialsCalls.push(c); + } + onCredentialsCleared(): void { + this.credentialsClearedCount += 1; + } + process(f: AudioFrame): AudioFrame { + return f; + } + close(): void { + this.closeCount += 1; + } +} + +function makeRoom(opts: { name: string; token?: string; serverUrl?: string }): Room { + const room = new Room(); + // The Room constructor doesn't accept name/token/url; for unit tests we set them directly + // on the private backing fields, the same way the FFI message handler would. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const r = room as any; + r.info = { name: opts.name }; + r._token = opts.token; + r._serverUrl = opts.serverUrl; + return room; +} + +interface StubParticipant { + identity: string; + trackPublications: Map; +} + +function attachRemoteParticipant( + room: Room, + identity: string, + publications: Array<{ publicationSid: string; trackSid: string }>, +): void { + const map = new Map(); + for (const pub of publications) { + map.set(pub.trackSid, { sid: pub.publicationSid }); + } + const participant: StubParticipant = { identity, trackPublications: map }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + room.remoteParticipants.set(identity, participant as any); +} + +function makeTrack(sid: string): RemoteAudioTrack { + const owned = { + info: { sid }, + handle: { id: BigInt(0) }, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } as any as OwnedTrack; + return new RemoteAudioTrack(owned); +} + +function makeStream(processor: FrameProcessor | null): AudioStreamSource { + // Minimal stub exercising only the surface the Track touches: the `processor` + // getter and a no-op `cancel()`. Keeping cancel inert isolates the + // metadata-push assertions from the real teardown path, which is covered + // separately via simulateStreamClose. + return { processor, cancel: () => {} } as unknown as AudioStreamSource; +} + +function makeLocalParticipant(identity: string): LocalParticipant { + // Bypass the FFI-touching constructor; set only the fields the lifecycle + // paths read (identity getter + trackPublications map). + const p = Object.create(LocalParticipant.prototype) as LocalParticipant; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (p as any).info = { identity }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (p as any).trackPublications = new Map(); + return p; +} + +function makeLocalPublication(sid: string, track: Track | undefined): LocalTrackPublication { + const pub = Object.create(LocalTrackPublication.prototype) as LocalTrackPublication; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (pub as any).info = { sid }; + pub.track = track; + return pub; +} + +/** + * Attach a real Room's localParticipant holding one local publication whose + * `track` is the given Track (mirrors publish_track: track.sid == publication.sid). + */ +function attachLocalTrack(room: Room, identity: string, sid: string, track: Track): void { + const local = makeLocalParticipant(identity); + local.trackPublications.set(sid, makeLocalPublication(sid, track)); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (room as any).localParticipant = local; +} + +/** + * Drive Room.processFfiEvent with a synthetic room event, satisfying the + * roomHandle / connected guards via injected private fields. Mirrors the + * Python tests dispatching through `room._on_room_event`. + */ +async function dispatchRoomEvent( + room: Room, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + message: { case: string; value: any }, +): Promise { + const handle = BigInt(1); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const r = room as any; + r.ffiHandle = { handle }; + if (!r.localParticipant) { + r.localParticipant = makeLocalParticipant('agent'); + } + await r.processFfiEvent({ + message: { case: 'roomEvent', value: { roomHandle: handle, message } }, + }); +} + +/** + * Simulates the cleanup path that AudioStreamSource runs on `cancel()` / `eos`: + * unregister from track, then close the processor when `autoClose` is set. + * Mirrors the Python tests' `_make_closeable_stream` helper. + */ +function simulateStreamClose( + track: RemoteAudioTrack, + stream: AudioStreamSource, + autoClose: boolean, +): void { + track.unregisterAudioStream(stream); + if (stream.processor && autoClose) { + stream.processor.close(); + } +} + +const TRACK_SID = 'TR_1'; + +describe('AudioStream room lifecycle', () => { + it('processor receives lifecycle callbacks on room attach', () => { + const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); + attachRemoteParticipant(room, 'alice', [ + { publicationSid: 'PUB_1', trackSid: TRACK_SID }, + ]); + const track = makeTrack(TRACK_SID); + const proc = new RecordingProcessor(); + const stream = makeStream(proc); + + track.registerAudioStream(stream); + track.setRoom(room); + + expect(proc.streamInfoCalls).toEqual([ + { roomName: 'room-a', participantIdentity: 'alice', publicationSid: 'PUB_1' }, + ]); + expect(proc.credentialsCalls).toEqual([{ token: 'tok-a', url: 'wss://a' }]); + expect(proc.streamInfoClearedCount).toBe(0); + expect(proc.credentialsClearedCount).toBe(0); + }); + + it('processor callbacks refire on track room change', () => { + const roomA = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); + attachRemoteParticipant(roomA, 'alice', [ + { publicationSid: 'PUB_1', trackSid: TRACK_SID }, + ]); + const roomB = makeRoom({ name: 'room-b', token: 'tok-b', serverUrl: 'wss://b' }); + attachRemoteParticipant(roomB, 'bob', [ + { publicationSid: 'PUB_2', trackSid: TRACK_SID }, + ]); + + const track = makeTrack(TRACK_SID); + const proc = new RecordingProcessor(); + track.registerAudioStream(makeStream(proc)); + + track.setRoom(roomA); + track.setRoom(roomB); + + expect(proc.streamInfoCalls.length).toBe(2); + expect(proc.streamInfoCalls[1]).toEqual({ + roomName: 'room-b', + participantIdentity: 'bob', + publicationSid: 'PUB_2', + }); + expect(proc.credentialsCalls.length).toBe(2); + expect(proc.credentialsCalls[1]).toEqual({ token: 'tok-b', url: 'wss://b' }); + }); + + it('token refresh propagates to processor', () => { + const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); + attachRemoteParticipant(room, 'alice', [ + { publicationSid: 'PUB_1', trackSid: TRACK_SID }, + ]); + const track = makeTrack(TRACK_SID); + const proc = new RecordingProcessor(); + track.registerAudioStream(makeStream(proc)); + track.setRoom(room); + + expect(proc.credentialsCalls.length).toBe(1); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (room as any)._token = 'tok-a-refreshed'; + room.emit('tokenRefreshed'); + + expect(proc.credentialsCalls.length).toBe(2); + expect(proc.credentialsCalls[1]).toEqual({ token: 'tok-a-refreshed', url: 'wss://a' }); + }); + + it('repeated setRoom with same room does not double-register listener', () => { + const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); + const track = makeTrack(TRACK_SID); + + track.setRoom(room); + track.setRoom(room); + track.setRoom(room); + + expect(room.listenerCount('tokenRefreshed')).toBe(1); + }); + + it('setRoom re-registers listener after Room.disconnect strips it', () => { + // Room.disconnect() calls removeAllListeners(), which silently drops the + // tokenRefreshed listener. If the same Room object is reused (e.g. on + // reconnect), setRoom(room) needs to re-register the listener rather than + // short-circuit on the oldRoom === room identity check. + const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); + const track = makeTrack(TRACK_SID); + + track.setRoom(room); + expect(room.listenerCount('tokenRefreshed')).toBe(1); + + // Simulate Room.disconnect() side-effect. + room.removeAllListeners(); + expect(room.listenerCount('tokenRefreshed')).toBe(0); + + track.setRoom(room); + expect(room.listenerCount('tokenRefreshed')).toBe(1); + }); + + it('setRoom swaps listener to new room', () => { + const roomA = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); + const roomB = makeRoom({ name: 'room-b', token: 'tok-b', serverUrl: 'wss://b' }); + const track = makeTrack(TRACK_SID); + const proc = new RecordingProcessor(); + track.registerAudioStream(makeStream(proc)); + + track.setRoom(roomA); + track.setRoom(roomB); + + expect(roomA.listenerCount('tokenRefreshed')).toBe(0); + expect(roomB.listenerCount('tokenRefreshed')).toBe(1); + + const beforeCredCount = proc.credentialsCalls.length; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (roomA as any)._token = 'tok-a-refreshed'; + roomA.emit('tokenRefreshed'); + expect(proc.credentialsCalls.length).toBe(beforeCredCount); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (roomB as any)._token = 'tok-b-refreshed'; + roomB.emit('tokenRefreshed'); + expect(proc.credentialsCalls.length).toBe(beforeCredCount + 1); + expect(proc.credentialsCalls[proc.credentialsCalls.length - 1]).toEqual({ + token: 'tok-b-refreshed', + url: 'wss://b', + }); + }); + + it('unregisterAudioStream stops metadata pushes', () => { + const roomA = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); + const roomB = makeRoom({ name: 'room-b', token: 'tok-b', serverUrl: 'wss://b' }); + attachRemoteParticipant(roomA, 'alice', [ + { publicationSid: 'PUB_1', trackSid: TRACK_SID }, + ]); + attachRemoteParticipant(roomB, 'bob', [ + { publicationSid: 'PUB_2', trackSid: TRACK_SID }, + ]); + const track = makeTrack(TRACK_SID); + const proc = new RecordingProcessor(); + const stream = makeStream(proc); + + track.registerAudioStream(stream); + track.setRoom(roomA); + expect(proc.streamInfoCalls.length).toBe(1); + + track.unregisterAudioStream(stream); + track.setRoom(roomB); + + expect(proc.streamInfoCalls.length).toBe(1); + expect(proc.credentialsCalls.length).toBe(1); + }); + + it('track leaving room clears processor metadata', () => { + const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); + attachRemoteParticipant(room, 'alice', [ + { publicationSid: 'PUB_1', trackSid: TRACK_SID }, + ]); + const track = makeTrack(TRACK_SID); + const proc = new RecordingProcessor(); + track.registerAudioStream(makeStream(proc)); + + track.setRoom(room); + expect(proc.streamInfoCalls.length).toBe(1); + expect(proc.credentialsCalls.length).toBe(1); + + track.setRoom(null); + + expect(proc.streamInfoClearedCount).toBe(1); + expect(proc.credentialsClearedCount).toBe(1); + expect(proc.streamInfoCalls.length).toBe(1); + expect(proc.credentialsCalls.length).toBe(1); + }); + + it('fanout to multiple registered streams', () => { + const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); + attachRemoteParticipant(room, 'alice', [ + { publicationSid: 'PUB_1', trackSid: TRACK_SID }, + ]); + const track = makeTrack(TRACK_SID); + const proc1 = new RecordingProcessor(); + const proc2 = new RecordingProcessor(); + track.registerAudioStream(makeStream(proc1)); + track.registerAudioStream(makeStream(proc2)); + + track.setRoom(room); + + for (const proc of [proc1, proc2]) { + expect(proc.streamInfoCalls).toEqual([ + { roomName: 'room-a', participantIdentity: 'alice', publicationSid: 'PUB_1' }, + ]); + expect(proc.credentialsCalls).toEqual([{ token: 'tok-a', url: 'wss://a' }]); + } + }); + + it('registerAudioStream before track enters room', () => { + const track = makeTrack(TRACK_SID); + const proc = new RecordingProcessor(); + track.registerAudioStream(makeStream(proc)); + + expect(proc.streamInfoCalls.length).toBe(0); + expect(proc.credentialsCalls.length).toBe(0); + + const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); + attachRemoteParticipant(room, 'alice', [ + { publicationSid: 'PUB_1', trackSid: TRACK_SID }, + ]); + track.setRoom(room); + + expect(proc.streamInfoCalls).toEqual([ + { roomName: 'room-a', participantIdentity: 'alice', publicationSid: 'PUB_1' }, + ]); + expect(proc.credentialsCalls).toEqual([{ token: 'tok-a', url: 'wss://a' }]); + }); + + it('track room cycle attach detach reattach', () => { + const roomA = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); + attachRemoteParticipant(roomA, 'alice', [ + { publicationSid: 'PUB_1', trackSid: TRACK_SID }, + ]); + const roomB = makeRoom({ name: 'room-b', token: 'tok-b', serverUrl: 'wss://b' }); + attachRemoteParticipant(roomB, 'bob', [ + { publicationSid: 'PUB_2', trackSid: TRACK_SID }, + ]); + const track = makeTrack(TRACK_SID); + const proc = new RecordingProcessor(); + track.registerAudioStream(makeStream(proc)); + + track.setRoom(roomA); + track.setRoom(null); + track.setRoom(roomB); + + expect(proc.streamInfoCalls.length).toBe(2); + expect(proc.streamInfoCalls[0]).toEqual({ + roomName: 'room-a', + participantIdentity: 'alice', + publicationSid: 'PUB_1', + }); + expect(proc.streamInfoCalls[1]).toEqual({ + roomName: 'room-b', + participantIdentity: 'bob', + publicationSid: 'PUB_2', + }); + expect(proc.streamInfoClearedCount).toBe(1); + expect(proc.credentialsClearedCount).toBe(1); + expect(roomA.listenerCount('tokenRefreshed')).toBe(0); + expect(roomB.listenerCount('tokenRefreshed')).toBe(1); + }); + + it('setRoom with no registered streams is safe', () => { + const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); + attachRemoteParticipant(room, 'alice', [ + { publicationSid: 'PUB_1', trackSid: TRACK_SID }, + ]); + const track = makeTrack(TRACK_SID); + + expect(() => track.setRoom(room)).not.toThrow(); + + const proc = new RecordingProcessor(); + track.registerAudioStream(makeStream(proc)); + + expect(proc.streamInfoCalls).toEqual([ + { roomName: 'room-a', participantIdentity: 'alice', publicationSid: 'PUB_1' }, + ]); + expect(proc.credentialsCalls).toEqual([{ token: 'tok-a', url: 'wss://a' }]); + }); + + it('unregister one of many streams only fans out to remaining', () => { + const roomA = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); + attachRemoteParticipant(roomA, 'alice', [ + { publicationSid: 'PUB_1', trackSid: TRACK_SID }, + ]); + const roomB = makeRoom({ name: 'room-b', token: 'tok-b', serverUrl: 'wss://b' }); + attachRemoteParticipant(roomB, 'bob', [ + { publicationSid: 'PUB_2', trackSid: TRACK_SID }, + ]); + const track = makeTrack(TRACK_SID); + const proc1 = new RecordingProcessor(); + const proc2 = new RecordingProcessor(); + const stream1 = makeStream(proc1); + const stream2 = makeStream(proc2); + track.registerAudioStream(stream1); + track.registerAudioStream(stream2); + + track.setRoom(roomA); + track.unregisterAudioStream(stream1); + track.setRoom(roomB); + + expect(proc1.streamInfoCalls.length).toBe(1); + expect(proc1.credentialsCalls.length).toBe(1); + expect(proc2.streamInfoCalls.length).toBe(2); + expect(proc2.streamInfoCalls[1]).toEqual({ + roomName: 'room-b', + participantIdentity: 'bob', + publicationSid: 'PUB_2', + }); + expect(proc2.credentialsCalls.length).toBe(2); + }); + + it('close path closes processor when autoClose true', () => { + const track = makeTrack(TRACK_SID); + const proc = new RecordingProcessor(); + const stream = makeStream(proc); + track.registerAudioStream(stream); + + simulateStreamClose(track, stream, true); + + expect(proc.closeCount).toBe(1); + + // Stream is unregistered: further room attaches don't reach the processor. + const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); + attachRemoteParticipant(room, 'alice', [ + { publicationSid: 'PUB_1', trackSid: TRACK_SID }, + ]); + track.setRoom(room); + expect(proc.streamInfoCalls.length).toBe(0); + }); + + it('close path leaves processor open when autoClose false', () => { + const track = makeTrack(TRACK_SID); + const proc = new RecordingProcessor(); + const stream = makeStream(proc); + track.registerAudioStream(stream); + + simulateStreamClose(track, stream, false); + + expect(proc.closeCount).toBe(0); + + // Still unregistered, even though we kept the processor open for reuse. + const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); + attachRemoteParticipant(room, 'alice', [ + { publicationSid: 'PUB_1', trackSid: TRACK_SID }, + ]); + track.setRoom(room); + expect(proc.streamInfoCalls.length).toBe(0); + }); + + it('setRoom(null) is idempotent for cleared callbacks', () => { + const room = makeRoom({ name: 'room-1', token: 'tok-1', serverUrl: 'wss://r' }); + attachRemoteParticipant(room, 'alice', [{ publicationSid: 'PUB_1', trackSid: TRACK_SID }]); + const track = makeTrack(TRACK_SID); + track.setRoom(room); + const proc = new RecordingProcessor(); + track.registerAudioStream(makeStream(proc)); + + track.setRoom(null); // first clear (e.g. room event handler) + track.setRoom(null); // second clear (e.g. unpublishTrack on the same track) + + expect(proc.streamInfoClearedCount).toBe(1); + expect(proc.credentialsClearedCount).toBe(1); + }); + + it('tokenRefreshed listener only removed by setRoom(null)', () => { + const room = makeRoom({ name: 'room-1', token: 'tok-1', serverUrl: 'wss://r' }); + const track = makeTrack(TRACK_SID); + + track.setRoom(room); + expect(room.listenerCount('tokenRefreshed')).toBe(1); + + // Only setRoom(null) detaches it. + track.setRoom(null); + expect(room.listenerCount('tokenRefreshed')).toBe(0); + }); + + it('localTrackRepublished updates track sid and repushes metadata', async () => { + // A full-reconnect republish re-issues the publication SID. The handler must + // keep the local-track invariant (track.sid == publication.sid) intact and + // re-push metadata so attached processors learn the new SID — otherwise the + // SID-based local lookup yields empty participant_identity / publication_sid. + const room = makeRoom({ name: 'room-1', token: 'tok-1', serverUrl: 'wss://r' }); + const track = makeTrack('OLD'); + attachLocalTrack(room, 'agent', 'OLD', track); + track.setRoom(room); + + const proc = new RecordingProcessor(); + track.registerAudioStream(makeStream(proc)); + expect(proc.streamInfoCalls.at(-1)).toEqual({ + roomName: 'room-1', + participantIdentity: 'agent', + publicationSid: 'OLD', + }); + + await dispatchRoomEvent(room, { + case: 'localTrackRepublished', + value: { previousSid: 'OLD', info: { sid: 'NEW' } }, + }); + + // Invariant restored + map rekeyed. + expect(track.sid).toBe('NEW'); + expect(room.localParticipant!.trackPublications.has('NEW')).toBe(true); + expect(room.localParticipant!.trackPublications.has('OLD')).toBe(false); + + // Existing attached processor re-pushed with the NEW sid. + expect(proc.streamInfoCalls.at(-1)).toEqual({ + roomName: 'room-1', + participantIdentity: 'agent', + publicationSid: 'NEW', + }); + + // Regression guard: a stream created AFTER republish also resolves NEW + // (stale track.sid would yield ""). + const proc2 = new RecordingProcessor(); + track.registerAudioStream(makeStream(proc2)); + expect(proc2.streamInfoCalls.at(-1)).toEqual({ + roomName: 'room-1', + participantIdentity: 'agent', + publicationSid: 'NEW', + }); + }); + + it('localTrackUnpublished event nulls publication track', async () => { + const room = makeRoom({ name: 'room-1', token: 'tok-1', serverUrl: 'wss://r' }); + const track = makeTrack(TRACK_SID); + attachLocalTrack(room, 'agent', TRACK_SID, track); + const publication = room.localParticipant!.trackPublications.get(TRACK_SID)!; + track.setRoom(room); + + await dispatchRoomEvent(room, { + case: 'localTrackUnpublished', + value: { publicationSid: TRACK_SID }, + }); + + expect(room.localParticipant!.trackPublications.has(TRACK_SID)).toBe(false); + // The publication's track reference was dropped, and the track left the room. + expect(publication.track).toBeUndefined(); + expect(track.resolveRoom()).toBe(null); + }); + + it('localTrackUnpublished callback still sees track', async () => { + // Backwards-compat: publication.track is nulled AFTER the event is emitted, + // so a callback reading publication.track during the event still sees it. + const room = makeRoom({ name: 'room-1', token: 'tok-1', serverUrl: 'wss://r' }); + const track = makeTrack(TRACK_SID); + attachLocalTrack(room, 'agent', TRACK_SID, track); + const publication = room.localParticipant!.trackPublications.get(TRACK_SID)!; + track.setRoom(room); + + const seenTrack: Array = []; + room.on('localTrackUnpublished', (pub) => { + seenTrack.push(pub.track); + }); + + await dispatchRoomEvent(room, { + case: 'localTrackUnpublished', + value: { publicationSid: TRACK_SID }, + }); + + // The callback saw the track (backwards-compatible payload) ... + expect(seenTrack).toEqual([track]); + // ... and the reference is dropped once the handler returns. + expect(publication.track).toBeUndefined(); + }); + + it('unpublishTrack clears processor when it wins the event race', async () => { + // unpublishTrack races the localTrackUnpublished room event. When unpublish + // wins, the room-event handler later finds the publication gone and skips its + // setRoom(null). The unpublish path must therefore clear the processor itself. + const room = makeRoom({ name: 'room-1', token: 'tok-1', serverUrl: 'wss://r' }); + const track = makeTrack(TRACK_SID); + const local = makeLocalParticipant('agent'); + const publication = makeLocalPublication(TRACK_SID, track); + local.trackPublications.set(TRACK_SID, publication); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (room as any).localParticipant = local; + // Fields unpublishTrack reads beyond the FFI round-trip. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (local as any).ffiEventLock = { lock: async () => () => {} }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (local as any).ffi_handle = { handle: BigInt(1) }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (local as any).disconnectSignal = undefined; + track.setRoom(room); + + const proc = new RecordingProcessor(); + track.registerAudioStream(makeStream(proc)); + const clearedInfoBefore = proc.streamInfoClearedCount; + const clearedCredsBefore = proc.credentialsClearedCount; + + // Mock the FFI round-trip so unpublishTrack resolves without a real server. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const requestSpy = vi.spyOn(FfiClient.instance, 'request').mockReturnValue({ + asyncId: BigInt(1), + } as never); + const waitForSpy = vi + .spyOn(FfiClient.instance, 'waitFor') + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .mockResolvedValue({ error: undefined } as never); + + try { + await local.unpublishTrack(TRACK_SID); + } finally { + requestSpy.mockRestore(); + waitForSpy.mockRestore(); + } + + expect(local.trackPublications.has(TRACK_SID)).toBe(false); + expect(publication.track).toBeUndefined(); + // The unpublish path cleared the processor's room context even though the + // room-event handler never ran. + expect(proc.streamInfoClearedCount).toBe(clearedInfoBefore + 1); + expect(proc.credentialsClearedCount).toBe(clearedCredsBefore + 1); + }); +}); From f37fa1e1a964a3aa355a30fb52fe4ba41db6eb0f Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 24 Jun 2026 14:53:50 -0400 Subject: [PATCH 09/19] fix: address bad formatting --- .../src/audio_stream_room_lifecycle.test.ts | 64 +++++-------------- 1 file changed, 16 insertions(+), 48 deletions(-) diff --git a/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts b/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts index 11b395dd..81f66113 100644 --- a/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts +++ b/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts @@ -170,9 +170,7 @@ const TRACK_SID = 'TR_1'; describe('AudioStream room lifecycle', () => { it('processor receives lifecycle callbacks on room attach', () => { const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); - attachRemoteParticipant(room, 'alice', [ - { publicationSid: 'PUB_1', trackSid: TRACK_SID }, - ]); + attachRemoteParticipant(room, 'alice', [{ publicationSid: 'PUB_1', trackSid: TRACK_SID }]); const track = makeTrack(TRACK_SID); const proc = new RecordingProcessor(); const stream = makeStream(proc); @@ -190,13 +188,9 @@ describe('AudioStream room lifecycle', () => { it('processor callbacks refire on track room change', () => { const roomA = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); - attachRemoteParticipant(roomA, 'alice', [ - { publicationSid: 'PUB_1', trackSid: TRACK_SID }, - ]); + attachRemoteParticipant(roomA, 'alice', [{ publicationSid: 'PUB_1', trackSid: TRACK_SID }]); const roomB = makeRoom({ name: 'room-b', token: 'tok-b', serverUrl: 'wss://b' }); - attachRemoteParticipant(roomB, 'bob', [ - { publicationSid: 'PUB_2', trackSid: TRACK_SID }, - ]); + attachRemoteParticipant(roomB, 'bob', [{ publicationSid: 'PUB_2', trackSid: TRACK_SID }]); const track = makeTrack(TRACK_SID); const proc = new RecordingProcessor(); @@ -217,9 +211,7 @@ describe('AudioStream room lifecycle', () => { it('token refresh propagates to processor', () => { const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); - attachRemoteParticipant(room, 'alice', [ - { publicationSid: 'PUB_1', trackSid: TRACK_SID }, - ]); + attachRemoteParticipant(room, 'alice', [{ publicationSid: 'PUB_1', trackSid: TRACK_SID }]); const track = makeTrack(TRACK_SID); const proc = new RecordingProcessor(); track.registerAudioStream(makeStream(proc)); @@ -297,12 +289,8 @@ describe('AudioStream room lifecycle', () => { it('unregisterAudioStream stops metadata pushes', () => { const roomA = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); const roomB = makeRoom({ name: 'room-b', token: 'tok-b', serverUrl: 'wss://b' }); - attachRemoteParticipant(roomA, 'alice', [ - { publicationSid: 'PUB_1', trackSid: TRACK_SID }, - ]); - attachRemoteParticipant(roomB, 'bob', [ - { publicationSid: 'PUB_2', trackSid: TRACK_SID }, - ]); + attachRemoteParticipant(roomA, 'alice', [{ publicationSid: 'PUB_1', trackSid: TRACK_SID }]); + attachRemoteParticipant(roomB, 'bob', [{ publicationSid: 'PUB_2', trackSid: TRACK_SID }]); const track = makeTrack(TRACK_SID); const proc = new RecordingProcessor(); const stream = makeStream(proc); @@ -320,9 +308,7 @@ describe('AudioStream room lifecycle', () => { it('track leaving room clears processor metadata', () => { const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); - attachRemoteParticipant(room, 'alice', [ - { publicationSid: 'PUB_1', trackSid: TRACK_SID }, - ]); + attachRemoteParticipant(room, 'alice', [{ publicationSid: 'PUB_1', trackSid: TRACK_SID }]); const track = makeTrack(TRACK_SID); const proc = new RecordingProcessor(); track.registerAudioStream(makeStream(proc)); @@ -341,9 +327,7 @@ describe('AudioStream room lifecycle', () => { it('fanout to multiple registered streams', () => { const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); - attachRemoteParticipant(room, 'alice', [ - { publicationSid: 'PUB_1', trackSid: TRACK_SID }, - ]); + attachRemoteParticipant(room, 'alice', [{ publicationSid: 'PUB_1', trackSid: TRACK_SID }]); const track = makeTrack(TRACK_SID); const proc1 = new RecordingProcessor(); const proc2 = new RecordingProcessor(); @@ -369,9 +353,7 @@ describe('AudioStream room lifecycle', () => { expect(proc.credentialsCalls.length).toBe(0); const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); - attachRemoteParticipant(room, 'alice', [ - { publicationSid: 'PUB_1', trackSid: TRACK_SID }, - ]); + attachRemoteParticipant(room, 'alice', [{ publicationSid: 'PUB_1', trackSid: TRACK_SID }]); track.setRoom(room); expect(proc.streamInfoCalls).toEqual([ @@ -382,13 +364,9 @@ describe('AudioStream room lifecycle', () => { it('track room cycle attach detach reattach', () => { const roomA = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); - attachRemoteParticipant(roomA, 'alice', [ - { publicationSid: 'PUB_1', trackSid: TRACK_SID }, - ]); + attachRemoteParticipant(roomA, 'alice', [{ publicationSid: 'PUB_1', trackSid: TRACK_SID }]); const roomB = makeRoom({ name: 'room-b', token: 'tok-b', serverUrl: 'wss://b' }); - attachRemoteParticipant(roomB, 'bob', [ - { publicationSid: 'PUB_2', trackSid: TRACK_SID }, - ]); + attachRemoteParticipant(roomB, 'bob', [{ publicationSid: 'PUB_2', trackSid: TRACK_SID }]); const track = makeTrack(TRACK_SID); const proc = new RecordingProcessor(); track.registerAudioStream(makeStream(proc)); @@ -416,9 +394,7 @@ describe('AudioStream room lifecycle', () => { it('setRoom with no registered streams is safe', () => { const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); - attachRemoteParticipant(room, 'alice', [ - { publicationSid: 'PUB_1', trackSid: TRACK_SID }, - ]); + attachRemoteParticipant(room, 'alice', [{ publicationSid: 'PUB_1', trackSid: TRACK_SID }]); const track = makeTrack(TRACK_SID); expect(() => track.setRoom(room)).not.toThrow(); @@ -434,13 +410,9 @@ describe('AudioStream room lifecycle', () => { it('unregister one of many streams only fans out to remaining', () => { const roomA = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); - attachRemoteParticipant(roomA, 'alice', [ - { publicationSid: 'PUB_1', trackSid: TRACK_SID }, - ]); + attachRemoteParticipant(roomA, 'alice', [{ publicationSid: 'PUB_1', trackSid: TRACK_SID }]); const roomB = makeRoom({ name: 'room-b', token: 'tok-b', serverUrl: 'wss://b' }); - attachRemoteParticipant(roomB, 'bob', [ - { publicationSid: 'PUB_2', trackSid: TRACK_SID }, - ]); + attachRemoteParticipant(roomB, 'bob', [{ publicationSid: 'PUB_2', trackSid: TRACK_SID }]); const track = makeTrack(TRACK_SID); const proc1 = new RecordingProcessor(); const proc2 = new RecordingProcessor(); @@ -476,9 +448,7 @@ describe('AudioStream room lifecycle', () => { // Stream is unregistered: further room attaches don't reach the processor. const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); - attachRemoteParticipant(room, 'alice', [ - { publicationSid: 'PUB_1', trackSid: TRACK_SID }, - ]); + attachRemoteParticipant(room, 'alice', [{ publicationSid: 'PUB_1', trackSid: TRACK_SID }]); track.setRoom(room); expect(proc.streamInfoCalls.length).toBe(0); }); @@ -495,9 +465,7 @@ describe('AudioStream room lifecycle', () => { // Still unregistered, even though we kept the processor open for reuse. const room = makeRoom({ name: 'room-a', token: 'tok-a', serverUrl: 'wss://a' }); - attachRemoteParticipant(room, 'alice', [ - { publicationSid: 'PUB_1', trackSid: TRACK_SID }, - ]); + attachRemoteParticipant(room, 'alice', [{ publicationSid: 'PUB_1', trackSid: TRACK_SID }]); track.setRoom(room); expect(proc.streamInfoCalls.length).toBe(0); }); From 01fda689d832bdef5f42728e7a6e0c555d734d2d Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 24 Jun 2026 15:10:15 -0400 Subject: [PATCH 10/19] fix: hack around ffi free running for fake handles MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cases like this: ⎯⎯⎯⎯⎯ Uncaught Exception ⎯⎯⎯⎯⎯ Error: trying to drop an invalid handle ⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯⎯ Serialized Error: { code: 'GenericFailure' } This error originated in "src/audio_stream_room_lifecycle.test.ts" test file. It doesn't mean the error was thrown inside the file itself, but while it was running. --- .../src/audio_stream_room_lifecycle.test.ts | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts b/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts index 81f66113..9eabd254 100644 --- a/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts +++ b/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts @@ -16,6 +16,27 @@ import { Room } from './room.js'; import { RemoteAudioTrack, type Track } from './track.js'; import { LocalTrackPublication } from './track_publication.js'; +// These tests fabricate Tracks with synthetic (invalid) FFI handle ids to avoid +// touching the real FFI server. The native FfiHandle has a Rust-side drop that +// runs when the JS wrapper is garbage-collected; dropping an unallocated handle +// throws "trying to drop an invalid handle" as an uncaught exception at GC time +// (intermittent locally, reliably on CI). Replace FfiHandle with an inert stub +// so no native drop is ever scheduled; everything else in the bindings stays real. +vi.mock('@livekit/rtc-ffi-bindings', async (importActual) => { + const actual = await importActual(); + class FakeFfiHandle { + private _handle: bigint; + constructor(handle: bigint) { + this._handle = handle; + } + dispose(): void {} + get handle(): bigint { + return this._handle; + } + } + return { ...actual, FfiHandle: FakeFfiHandle }; +}); + class RecordingProcessor extends FrameProcessor { enabled = false; streamInfoCalls: Array = []; From 6ba8a8d9698d13e5271abbc6c0bc35e692f04f8c Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 24 Jun 2026 15:32:28 -0400 Subject: [PATCH 11/19] fix: ensure track.info.sid is rewritten during full reconnect path --- .../src/audio_stream_room_lifecycle.test.ts | 94 ++++++++++++++++++- packages/livekit-rtc/src/participant.ts | 8 ++ 2 files changed, 101 insertions(+), 1 deletion(-) diff --git a/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts b/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts index 9eabd254..fff230b4 100644 --- a/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts +++ b/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import type { OwnedTrack } from '@livekit/rtc-ffi-bindings'; +import { TrackPublishOptions } from '@livekit/rtc-ffi-bindings'; import { describe, expect, it, vi } from 'vitest'; import type { AudioFrame } from './audio_frame.js'; import type { AudioStreamSource } from './audio_stream.js'; @@ -13,7 +14,7 @@ import { } from './frame_processor.js'; import { LocalParticipant } from './participant.js'; import { Room } from './room.js'; -import { RemoteAudioTrack, type Track } from './track.js'; +import { LocalAudioTrack, RemoteAudioTrack, type Track } from './track.js'; import { LocalTrackPublication } from './track_publication.js'; // These tests fabricate Tracks with synthetic (invalid) FFI handle ids to avoid @@ -110,6 +111,16 @@ function makeTrack(sid: string): RemoteAudioTrack { return new RemoteAudioTrack(owned); } +function makeLocalAudioTrack(sid: string): LocalAudioTrack { + // Safe under the FfiHandle mock — no native handle is created. + const owned = { + info: { sid }, + handle: { id: BigInt(0) }, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } as any as OwnedTrack; + return new LocalAudioTrack(owned); +} + function makeStream(processor: FrameProcessor | null): AudioStreamSource { // Minimal stub exercising only the surface the Track touches: the `processor` // getter and a no-op `cancel()`. Keeping cancel inert isolates the @@ -656,4 +667,85 @@ describe('AudioStream room lifecycle', () => { expect(proc.streamInfoClearedCount).toBe(clearedInfoBefore + 1); expect(proc.credentialsClearedCount).toBe(clearedCredsBefore + 1); }); + + it('disconnect clears processors and detaches listeners', () => { + // On disconnect, cleanupOnDisconnect must walk every publication and detach + // its track so attached processors get cleared callbacks and the + // tokenRefreshed listener is removed — covers both the local and remote + // participant maps. + const room = makeRoom({ name: 'room-1', token: 'tok-1', serverUrl: 'wss://r' }); + + // Local track + processor. + const localTrack = makeTrack('TR_LOCAL'); + attachLocalTrack(room, 'agent', 'TR_LOCAL', localTrack); + const localProc = new RecordingProcessor(); + localTrack.registerAudioStream(makeStream(localProc)); + localTrack.setRoom(room); + + // Remote track + processor on the same room. + const remoteTrack = makeTrack('TR_REMOTE'); + const remoteParticipant = { + identity: 'bob', + trackPublications: new Map([['TR_REMOTE', { sid: 'PUB_REMOTE', track: remoteTrack }]]), + }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + room.remoteParticipants.set('bob', remoteParticipant as any); + const remoteProc = new RecordingProcessor(); + remoteTrack.registerAudioStream(makeStream(remoteProc)); + remoteTrack.setRoom(room); + + expect(room.listenerCount('tokenRefreshed')).toBe(2); + expect(localProc.streamInfoCalls.length).toBe(1); + expect(remoteProc.streamInfoCalls.length).toBe(1); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (room as any).cleanupOnDisconnect(); + + for (const proc of [localProc, remoteProc]) { + expect(proc.streamInfoClearedCount).toBe(1); + expect(proc.credentialsClearedCount).toBe(1); + } + expect(room.listenerCount('tokenRefreshed')).toBe(0); + expect(localTrack.resolveRoom()).toBe(null); + expect(remoteTrack.resolveRoom()).toBe(null); + }); + + it('publishTrack stamps publication sid onto track', async () => { + // Mirrors Python publish_track: after the server assigns the publication SID, + // the track's own info.sid is updated to match so the local-publication + // lookup in pushProcessorMetadataToStream resolves it. + const local = makeLocalParticipant('agent'); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (local as any).ffiEventLock = { lock: async () => () => {} }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (local as any).ffi_handle = { handle: BigInt(1) }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (local as any).disconnectSignal = undefined; + + // Track starts with a provisional SID that differs from the publication SID. + const track = makeLocalAudioTrack('PROVISIONAL'); + + const requestSpy = vi.spyOn(FfiClient.instance, 'request').mockReturnValue({ + asyncId: BigInt(1), + } as never); + const waitForSpy = vi.spyOn(FfiClient.instance, 'waitFor').mockResolvedValue({ + message: { + case: 'publication', + value: { info: { sid: 'PUB_NEW' }, handle: { id: BigInt(0) } }, + }, + } as never); + + let pub: LocalTrackPublication; + try { + pub = await local.publishTrack(track, new TrackPublishOptions()); + } finally { + requestSpy.mockRestore(); + waitForSpy.mockRestore(); + } + + expect(pub.sid).toBe('PUB_NEW'); + // The invariant: the track's own SID now matches the publication SID. + expect(track.sid).toBe('PUB_NEW'); + expect(local.trackPublications.has('PUB_NEW')).toBe(true); + }); }); diff --git a/packages/livekit-rtc/src/participant.ts b/packages/livekit-rtc/src/participant.ts index 4a6a2ce4..87372e8f 100644 --- a/packages/livekit-rtc/src/participant.ts +++ b/packages/livekit-rtc/src/participant.ts @@ -731,6 +731,14 @@ export class LocalParticipant extends Participant { case 'publication': const track_publication = new LocalTrackPublication(cb.message.value!); track_publication.track = track; + // Stamp the server-assigned publication + // SID onto the track so track.sid == publication.sid. Both + // Track.pushProcessorMetadataToStream and the localTrackRepublished + // handler look up the local publication by this SID; without it they + // depend on createAudioTrack's provisional SID happening to match. + if (track.info && track_publication.sid) { + track.info.sid = track_publication.sid; + } this.trackPublications.set(track_publication.sid!, track_publication); return track_publication; From 15677962664e1c2890ba7ce56e3704542a8525ca Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 24 Jun 2026 15:34:09 -0400 Subject: [PATCH 12/19] fix: be sure to clean up frame processors when disconnecting the room --- packages/livekit-rtc/src/room.ts | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 85efa85d..6ee23de0 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -444,6 +444,23 @@ export class Room extends (EventEmitter as new () => TypedEmitter } this.textStreamControllers.clear(); + // Detach every track from this room so attached FrameProcessors receive + // onStreamInfoCleared/onCredentialsCleared and the tokenRefreshed listener + // is removed. Otherwise a server-initiated disconnect leaves processors with + // stale room context and the Room holding a strong ref to each Track's bound + // listener until GC. setRoom(null) is idempotent, so this is safe even if a + // track was already detached (e.g. via unsubscribe/unpublish). + if (this.localParticipant) { + for (const pub of this.localParticipant.trackPublications.values()) { + pub.track?.setRoom(null); + } + } + for (const participant of this.remoteParticipants.values()) { + for (const pub of participant.trackPublications.values()) { + pub.track?.setRoom(null); + } + } + // Clear sidPromise before removing listeners so that a reconnect // doesn't return a stale, permanently-pending promise. this.sidPromise = undefined; From 4f2d86e81150c08f8c2d8f49565334b94862044a Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 24 Jun 2026 15:38:10 -0400 Subject: [PATCH 13/19] fix: swap importActual -> vi.importActual --- .../livekit-rtc/src/audio_stream_room_lifecycle.test.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts b/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts index fff230b4..8d846896 100644 --- a/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts +++ b/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts @@ -23,8 +23,10 @@ import { LocalTrackPublication } from './track_publication.js'; // throws "trying to drop an invalid handle" as an uncaught exception at GC time // (intermittent locally, reliably on CI). Replace FfiHandle with an inert stub // so no native drop is ever scheduled; everything else in the bindings stays real. -vi.mock('@livekit/rtc-ffi-bindings', async (importActual) => { - const actual = await importActual(); +vi.mock('@livekit/rtc-ffi-bindings', async () => { + const actual = await vi.importActual( + '@livekit/rtc-ffi-bindings', + ); class FakeFfiHandle { private _handle: bigint; constructor(handle: bigint) { From 255f712af41de5d02df2196188bf75af9060bfb4 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 24 Jun 2026 15:54:36 -0400 Subject: [PATCH 14/19] fix: attempt LLM driven update to get frame processor tests to pass both in node and bun test runners --- .../src/audio_stream_room_lifecycle.test.ts | 123 ++++++++++-------- packages/livekit-rtc/src/track.ts | 25 +++- 2 files changed, 86 insertions(+), 62 deletions(-) diff --git a/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts b/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts index 8d846896..d27bf2e1 100644 --- a/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts +++ b/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts @@ -1,9 +1,8 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import type { OwnedTrack } from '@livekit/rtc-ffi-bindings'; import { TrackPublishOptions } from '@livekit/rtc-ffi-bindings'; -import { describe, expect, it, vi } from 'vitest'; +import { describe, expect, it } from 'vitest'; import type { AudioFrame } from './audio_frame.js'; import type { AudioStreamSource } from './audio_stream.js'; import { FfiClient } from './ffi_client.js'; @@ -17,28 +16,43 @@ import { Room } from './room.js'; import { LocalAudioTrack, RemoteAudioTrack, type Track } from './track.js'; import { LocalTrackPublication } from './track_publication.js'; -// These tests fabricate Tracks with synthetic (invalid) FFI handle ids to avoid -// touching the real FFI server. The native FfiHandle has a Rust-side drop that -// runs when the JS wrapper is garbage-collected; dropping an unallocated handle -// throws "trying to drop an invalid handle" as an uncaught exception at GC time -// (intermittent locally, reliably on CI). Replace FfiHandle with an inert stub -// so no native drop is ever scheduled; everything else in the bindings stays real. -vi.mock('@livekit/rtc-ffi-bindings', async () => { - const actual = await vi.importActual( - '@livekit/rtc-ffi-bindings', - ); - class FakeFfiHandle { - private _handle: bigint; - constructor(handle: bigint) { - this._handle = handle; - } - dispose(): void {} - get handle(): bigint { - return this._handle; - } - } - return { ...actual, FfiHandle: FakeFfiHandle }; -}); +// These tests fabricate Tracks/publications without going through the real FFI. +// The native FfiHandle (a NAPI class) registers a Rust-side drop on the JS +// wrapper that fires at GC and throws "trying to drop an invalid handle" for any +// unallocated id — an uncaught exception that surfaces intermittently on CI. +// There is no JS-level hook to disarm that native finalizer, and module-mocking +// the bindings isn't portable (the suite also runs under `bun test`, whose `vi` +// shim lacks vi.mock/vi.importActual). So we never construct a real handle: +// Tracks are built via Object.create (bypassing the constructor — the Track +// class is written to support this), and the one production path that does build +// a real handle (publishTrack -> LocalTrackPublication) is pinned to keep its +// wrapper alive so the finalizer never runs. + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +const inertFfiHandle = () => ({ handle: BigInt(0), dispose() {} }) as any; + +// Keeps real FfiHandle wrappers reachable for the lifetime of the test process +// so their native finalizers never fire. See the note above. +const ffiHandleKeepAlive: Array = []; + +/** + * Stub the FfiClient singleton's request/waitFor round-trip via plain property + * assignment. Runner-agnostic — avoids vi.spyOn, which Bun's `vi` shim doesn't + * fully implement. Returns a restore function to call in `finally`. + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function stubFfiRoundTrip(waitForResult: any): () => void { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const client = FfiClient.instance as any; + const origRequest = client.request; + const origWaitFor = client.waitFor; + client.request = () => ({ asyncId: BigInt(1) }); + client.waitFor = async () => waitForResult; + return () => { + client.request = origRequest; + client.waitFor = origWaitFor; + }; +} class RecordingProcessor extends FrameProcessor { enabled = false; @@ -104,23 +118,28 @@ function attachRemoteParticipant( room.remoteParticipants.set(identity, participant as any); } +// Build a Track via Object.create so no real FfiHandle is constructed. Sets only +// the instance state the Track methods read; the bound tokenRefreshed listener +// is created lazily by the class getter, so it survives this bypass. +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function initTrack(track: any, sid: string): void { + track.info = { sid }; + track.ffi_handle = inertFfiHandle(); + track.roomRef = null; + track.audioStreams = new Set(); + track.streamFinalizationRegistry = { register() {}, unregister() {} }; +} + function makeTrack(sid: string): RemoteAudioTrack { - const owned = { - info: { sid }, - handle: { id: BigInt(0) }, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } as any as OwnedTrack; - return new RemoteAudioTrack(owned); + const track = Object.create(RemoteAudioTrack.prototype) as RemoteAudioTrack; + initTrack(track, sid); + return track; } function makeLocalAudioTrack(sid: string): LocalAudioTrack { - // Safe under the FfiHandle mock — no native handle is created. - const owned = { - info: { sid }, - handle: { id: BigInt(0) }, - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } as any as OwnedTrack; - return new LocalAudioTrack(owned); + const track = Object.create(LocalAudioTrack.prototype) as LocalAudioTrack; + initTrack(track, sid); + return track; } function makeStream(processor: FrameProcessor | null): AudioStreamSource { @@ -645,21 +664,12 @@ describe('AudioStream room lifecycle', () => { const clearedInfoBefore = proc.streamInfoClearedCount; const clearedCredsBefore = proc.credentialsClearedCount; - // Mock the FFI round-trip so unpublishTrack resolves without a real server. - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const requestSpy = vi.spyOn(FfiClient.instance, 'request').mockReturnValue({ - asyncId: BigInt(1), - } as never); - const waitForSpy = vi - .spyOn(FfiClient.instance, 'waitFor') - // eslint-disable-next-line @typescript-eslint/no-explicit-any - .mockResolvedValue({ error: undefined } as never); - + // Stub the FFI round-trip so unpublishTrack resolves without a real server. + const restore = stubFfiRoundTrip({ error: undefined }); try { await local.unpublishTrack(TRACK_SID); } finally { - requestSpy.mockRestore(); - waitForSpy.mockRestore(); + restore(); } expect(local.trackPublications.has(TRACK_SID)).toBe(false); @@ -727,23 +737,22 @@ describe('AudioStream room lifecycle', () => { // Track starts with a provisional SID that differs from the publication SID. const track = makeLocalAudioTrack('PROVISIONAL'); - const requestSpy = vi.spyOn(FfiClient.instance, 'request').mockReturnValue({ - asyncId: BigInt(1), - } as never); - const waitForSpy = vi.spyOn(FfiClient.instance, 'waitFor').mockResolvedValue({ + const restore = stubFfiRoundTrip({ message: { case: 'publication', value: { info: { sid: 'PUB_NEW' }, handle: { id: BigInt(0) } }, }, - } as never); - + }); let pub: LocalTrackPublication; try { pub = await local.publishTrack(track, new TrackPublishOptions()); } finally { - requestSpy.mockRestore(); - waitForSpy.mockRestore(); + restore(); } + // publishTrack builds a real LocalTrackPublication, the one path here that + // constructs a native FfiHandle. Pin it so its finalizer never fires. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ffiHandleKeepAlive.push((pub as any).ffiHandle); expect(pub.sid).toBe('PUB_NEW'); // The invariant: the track's own SID now matches the publication SID. diff --git a/packages/livekit-rtc/src/track.ts b/packages/livekit-rtc/src/track.ts index b5ca1193..88f857ba 100644 --- a/packages/livekit-rtc/src/track.ts +++ b/packages/livekit-rtc/src/track.ts @@ -26,7 +26,22 @@ export abstract class Track { private roomRef: WeakRef | null = null; private audioStreams: Set> = new Set(); private streamFinalizationRegistry: FinalizationRegistry>; - private onRoomTokenRefreshed = () => { + + // Lazily-created bound listener for the Room's `tokenRefreshed` event. Stored + // so the same function reference is used for both on() and off(). Defined via + // a getter + plain method (rather than an instance arrow field) so the class + // can be constructed through Object.create in tests — bypassing the FFI handle + // — while keeping a stable listener identity. + private boundOnRoomTokenRefreshed?: () => void; + + private get roomTokenRefreshedListener(): () => void { + if (!this.boundOnRoomTokenRefreshed) { + this.boundOnRoomTokenRefreshed = () => this.onRoomTokenRefreshed(); + } + return this.boundOnRoomTokenRefreshed; + } + + private onRoomTokenRefreshed(): void { const room = this.resolveRoom(); if (!room || !room.token || !room.serverUrl) return; for (const stream of this.iterateStreams()) { @@ -36,7 +51,7 @@ export abstract class Track { } processor.onCredentialsUpdated({ token: room.token, url: room.serverUrl }); } - }; + } constructor(owned: OwnedTrack) { this.info = owned.info; @@ -64,11 +79,11 @@ export abstract class Track { return; } if (oldRoom && oldRoom !== room) { - oldRoom.off('tokenRefreshed', this.onRoomTokenRefreshed); + oldRoom.off('tokenRefreshed', this.roomTokenRefreshedListener); } if (room) { - room.off('tokenRefreshed', this.onRoomTokenRefreshed); - room.on('tokenRefreshed', this.onRoomTokenRefreshed); + room.off('tokenRefreshed', this.roomTokenRefreshedListener); + room.on('tokenRefreshed', this.roomTokenRefreshedListener); } this.roomRef = room ? new WeakRef(room) : null; for (const stream of this.iterateStreams()) { From 83203b17e212ad5ded1a479f39b7ef845fba71fc Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Fri, 26 Jun 2026 09:41:56 -0400 Subject: [PATCH 15/19] fix: add missing internal to new export only used in tests --- packages/livekit-rtc/src/audio_stream.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/livekit-rtc/src/audio_stream.ts b/packages/livekit-rtc/src/audio_stream.ts index f537f577..d7dde0df 100644 --- a/packages/livekit-rtc/src/audio_stream.ts +++ b/packages/livekit-rtc/src/audio_stream.ts @@ -31,6 +31,7 @@ export interface NoiseCancellationOptions { options: Record; } +/** @internal */ export class AudioStreamSource implements UnderlyingSource { private controller?: ReadableStreamDefaultController; private ffiHandle: FfiHandle; From 052ed6df28f3306836979b0865782be54ffa67fe Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Fri, 26 Jun 2026 09:55:35 -0400 Subject: [PATCH 16/19] fix: address formatting --- packages/livekit-rtc/src/track.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/livekit-rtc/src/track.ts b/packages/livekit-rtc/src/track.ts index 88f857ba..08373b08 100644 --- a/packages/livekit-rtc/src/track.ts +++ b/packages/livekit-rtc/src/track.ts @@ -43,7 +43,9 @@ export abstract class Track { private onRoomTokenRefreshed(): void { const room = this.resolveRoom(); - if (!room || !room.token || !room.serverUrl) return; + if (!room || !room.token || !room.serverUrl) { + return; + } for (const stream of this.iterateStreams()) { const processor = stream.processor; if (!processor) { From 8e83a43d3f424dd781e10e7ec8be73b2605d1a69 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Fri, 26 Jun 2026 10:23:40 -0400 Subject: [PATCH 17/19] Revert "fix: attempt LLM driven update to get frame processor tests to pass both in node and bun test runners" This reverts commit 255f712af41de5d02df2196188bf75af9060bfb4. The bun test-runner workaround (Object.create-based track fabrication, inert FfiHandle stubs, keep-alive pinning, and replacing vi.mock/vi.spyOn with manual FFI stubbing) is no longer needed. CI now runs vitest under the bun runtime via `bun --bun run test` (see the CI change), which forces bun to execute vitest (overriding its node shebang) so test bodies run on bun while vitest still provides full module mocking. This restores the clean vi.mock-based tests and the original arrow-field Track listener. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/audio_stream_room_lifecycle.test.ts | 123 ++++++++---------- packages/livekit-rtc/src/track.ts | 25 +--- 2 files changed, 62 insertions(+), 86 deletions(-) diff --git a/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts b/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts index d27bf2e1..8d846896 100644 --- a/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts +++ b/packages/livekit-rtc/src/audio_stream_room_lifecycle.test.ts @@ -1,8 +1,9 @@ // SPDX-FileCopyrightText: 2026 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 +import type { OwnedTrack } from '@livekit/rtc-ffi-bindings'; import { TrackPublishOptions } from '@livekit/rtc-ffi-bindings'; -import { describe, expect, it } from 'vitest'; +import { describe, expect, it, vi } from 'vitest'; import type { AudioFrame } from './audio_frame.js'; import type { AudioStreamSource } from './audio_stream.js'; import { FfiClient } from './ffi_client.js'; @@ -16,43 +17,28 @@ import { Room } from './room.js'; import { LocalAudioTrack, RemoteAudioTrack, type Track } from './track.js'; import { LocalTrackPublication } from './track_publication.js'; -// These tests fabricate Tracks/publications without going through the real FFI. -// The native FfiHandle (a NAPI class) registers a Rust-side drop on the JS -// wrapper that fires at GC and throws "trying to drop an invalid handle" for any -// unallocated id — an uncaught exception that surfaces intermittently on CI. -// There is no JS-level hook to disarm that native finalizer, and module-mocking -// the bindings isn't portable (the suite also runs under `bun test`, whose `vi` -// shim lacks vi.mock/vi.importActual). So we never construct a real handle: -// Tracks are built via Object.create (bypassing the constructor — the Track -// class is written to support this), and the one production path that does build -// a real handle (publishTrack -> LocalTrackPublication) is pinned to keep its -// wrapper alive so the finalizer never runs. - -// eslint-disable-next-line @typescript-eslint/no-explicit-any -const inertFfiHandle = () => ({ handle: BigInt(0), dispose() {} }) as any; - -// Keeps real FfiHandle wrappers reachable for the lifetime of the test process -// so their native finalizers never fire. See the note above. -const ffiHandleKeepAlive: Array = []; - -/** - * Stub the FfiClient singleton's request/waitFor round-trip via plain property - * assignment. Runner-agnostic — avoids vi.spyOn, which Bun's `vi` shim doesn't - * fully implement. Returns a restore function to call in `finally`. - */ -// eslint-disable-next-line @typescript-eslint/no-explicit-any -function stubFfiRoundTrip(waitForResult: any): () => void { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const client = FfiClient.instance as any; - const origRequest = client.request; - const origWaitFor = client.waitFor; - client.request = () => ({ asyncId: BigInt(1) }); - client.waitFor = async () => waitForResult; - return () => { - client.request = origRequest; - client.waitFor = origWaitFor; - }; -} +// These tests fabricate Tracks with synthetic (invalid) FFI handle ids to avoid +// touching the real FFI server. The native FfiHandle has a Rust-side drop that +// runs when the JS wrapper is garbage-collected; dropping an unallocated handle +// throws "trying to drop an invalid handle" as an uncaught exception at GC time +// (intermittent locally, reliably on CI). Replace FfiHandle with an inert stub +// so no native drop is ever scheduled; everything else in the bindings stays real. +vi.mock('@livekit/rtc-ffi-bindings', async () => { + const actual = await vi.importActual( + '@livekit/rtc-ffi-bindings', + ); + class FakeFfiHandle { + private _handle: bigint; + constructor(handle: bigint) { + this._handle = handle; + } + dispose(): void {} + get handle(): bigint { + return this._handle; + } + } + return { ...actual, FfiHandle: FakeFfiHandle }; +}); class RecordingProcessor extends FrameProcessor { enabled = false; @@ -118,28 +104,23 @@ function attachRemoteParticipant( room.remoteParticipants.set(identity, participant as any); } -// Build a Track via Object.create so no real FfiHandle is constructed. Sets only -// the instance state the Track methods read; the bound tokenRefreshed listener -// is created lazily by the class getter, so it survives this bypass. -// eslint-disable-next-line @typescript-eslint/no-explicit-any -function initTrack(track: any, sid: string): void { - track.info = { sid }; - track.ffi_handle = inertFfiHandle(); - track.roomRef = null; - track.audioStreams = new Set(); - track.streamFinalizationRegistry = { register() {}, unregister() {} }; -} - function makeTrack(sid: string): RemoteAudioTrack { - const track = Object.create(RemoteAudioTrack.prototype) as RemoteAudioTrack; - initTrack(track, sid); - return track; + const owned = { + info: { sid }, + handle: { id: BigInt(0) }, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } as any as OwnedTrack; + return new RemoteAudioTrack(owned); } function makeLocalAudioTrack(sid: string): LocalAudioTrack { - const track = Object.create(LocalAudioTrack.prototype) as LocalAudioTrack; - initTrack(track, sid); - return track; + // Safe under the FfiHandle mock — no native handle is created. + const owned = { + info: { sid }, + handle: { id: BigInt(0) }, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + } as any as OwnedTrack; + return new LocalAudioTrack(owned); } function makeStream(processor: FrameProcessor | null): AudioStreamSource { @@ -664,12 +645,21 @@ describe('AudioStream room lifecycle', () => { const clearedInfoBefore = proc.streamInfoClearedCount; const clearedCredsBefore = proc.credentialsClearedCount; - // Stub the FFI round-trip so unpublishTrack resolves without a real server. - const restore = stubFfiRoundTrip({ error: undefined }); + // Mock the FFI round-trip so unpublishTrack resolves without a real server. + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const requestSpy = vi.spyOn(FfiClient.instance, 'request').mockReturnValue({ + asyncId: BigInt(1), + } as never); + const waitForSpy = vi + .spyOn(FfiClient.instance, 'waitFor') + // eslint-disable-next-line @typescript-eslint/no-explicit-any + .mockResolvedValue({ error: undefined } as never); + try { await local.unpublishTrack(TRACK_SID); } finally { - restore(); + requestSpy.mockRestore(); + waitForSpy.mockRestore(); } expect(local.trackPublications.has(TRACK_SID)).toBe(false); @@ -737,22 +727,23 @@ describe('AudioStream room lifecycle', () => { // Track starts with a provisional SID that differs from the publication SID. const track = makeLocalAudioTrack('PROVISIONAL'); - const restore = stubFfiRoundTrip({ + const requestSpy = vi.spyOn(FfiClient.instance, 'request').mockReturnValue({ + asyncId: BigInt(1), + } as never); + const waitForSpy = vi.spyOn(FfiClient.instance, 'waitFor').mockResolvedValue({ message: { case: 'publication', value: { info: { sid: 'PUB_NEW' }, handle: { id: BigInt(0) } }, }, - }); + } as never); + let pub: LocalTrackPublication; try { pub = await local.publishTrack(track, new TrackPublishOptions()); } finally { - restore(); + requestSpy.mockRestore(); + waitForSpy.mockRestore(); } - // publishTrack builds a real LocalTrackPublication, the one path here that - // constructs a native FfiHandle. Pin it so its finalizer never fires. - // eslint-disable-next-line @typescript-eslint/no-explicit-any - ffiHandleKeepAlive.push((pub as any).ffiHandle); expect(pub.sid).toBe('PUB_NEW'); // The invariant: the track's own SID now matches the publication SID. diff --git a/packages/livekit-rtc/src/track.ts b/packages/livekit-rtc/src/track.ts index 08373b08..bfe50628 100644 --- a/packages/livekit-rtc/src/track.ts +++ b/packages/livekit-rtc/src/track.ts @@ -26,22 +26,7 @@ export abstract class Track { private roomRef: WeakRef | null = null; private audioStreams: Set> = new Set(); private streamFinalizationRegistry: FinalizationRegistry>; - - // Lazily-created bound listener for the Room's `tokenRefreshed` event. Stored - // so the same function reference is used for both on() and off(). Defined via - // a getter + plain method (rather than an instance arrow field) so the class - // can be constructed through Object.create in tests — bypassing the FFI handle - // — while keeping a stable listener identity. - private boundOnRoomTokenRefreshed?: () => void; - - private get roomTokenRefreshedListener(): () => void { - if (!this.boundOnRoomTokenRefreshed) { - this.boundOnRoomTokenRefreshed = () => this.onRoomTokenRefreshed(); - } - return this.boundOnRoomTokenRefreshed; - } - - private onRoomTokenRefreshed(): void { + private onRoomTokenRefreshed = () => { const room = this.resolveRoom(); if (!room || !room.token || !room.serverUrl) { return; @@ -53,7 +38,7 @@ export abstract class Track { } processor.onCredentialsUpdated({ token: room.token, url: room.serverUrl }); } - } + }; constructor(owned: OwnedTrack) { this.info = owned.info; @@ -81,11 +66,11 @@ export abstract class Track { return; } if (oldRoom && oldRoom !== room) { - oldRoom.off('tokenRefreshed', this.roomTokenRefreshedListener); + oldRoom.off('tokenRefreshed', this.onRoomTokenRefreshed); } if (room) { - room.off('tokenRefreshed', this.roomTokenRefreshedListener); - room.on('tokenRefreshed', this.roomTokenRefreshedListener); + room.off('tokenRefreshed', this.onRoomTokenRefreshed); + room.on('tokenRefreshed', this.onRoomTokenRefreshed); } this.roomRef = room ? new WeakRef(room) : null; for (const stream of this.iterateStreams()) { From 2013b96736af9b097b35c5d429c9f82576ef927f Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Fri, 26 Jun 2026 10:20:13 -0400 Subject: [PATCH 18/19] fix: swap from bun test -> bun --bun run test --- .github/workflows/ci.yml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index eb918eb6..7c3225c0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -78,7 +78,14 @@ jobs: LIVEKIT_API_SECRET: ${{ secrets.LIVEKIT_API_SECRET }} - name: Test server sdk + rtc-node (Bun) - run: bun install && bun test --concurrent + # Run vitest under the bun runtime rather than bun's native test runner. + # `--bun` forces bun to execute vitest (overriding its node shebang), so + # test bodies still run on bun while vitest provides full module mocking + # (vi.mock / vi.spyOn) that bun's native runner lacks. + run: | + bun install + (cd packages/livekit-server-sdk && bun --bun run test) + (cd packages/livekit-rtc && bun --bun run test) build_and_release: if: github.ref == 'refs/heads/main' From e4a769c9717e6657f03ef44528640c2fb74973a8 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Fri, 26 Jun 2026 10:38:14 -0400 Subject: [PATCH 19/19] feat: guard that the bun CI job actually runs on bun Adds src/bun_runtime.test.ts, armed via EXPECT_BUN_RUNTIME=1 in the bun CI step. It runs through the same `bun --bun run test` vitest path as the real suite and asserts process.versions.bun is set, so it fails if the runtime ever silently falls back to node (e.g. the --bun flag stops overriding vitest's node shebang). Skipped everywhere the env var is unset (node CI job, local dev). --- .github/workflows/ci.yml | 8 ++++-- packages/livekit-rtc/src/bun_runtime.test.ts | 27 ++++++++++++++++++++ turbo.json | 2 +- 3 files changed, 34 insertions(+), 3 deletions(-) create mode 100644 packages/livekit-rtc/src/bun_runtime.test.ts diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7c3225c0..662377c7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -82,10 +82,14 @@ jobs: # `--bun` forces bun to execute vitest (overriding its node shebang), so # test bodies still run on bun while vitest provides full module mocking # (vi.mock / vi.spyOn) that bun's native runner lacks. + # EXPECT_BUN_RUNTIME=1 arms a guard test (src/bun_runtime.test.ts) that + # fails if test bodies aren't actually executing on bun — i.e. if the + # `--bun` flag ever stops overriding vitest's node shebang and the job + # silently regresses to running under node. run: | bun install - (cd packages/livekit-server-sdk && bun --bun run test) - (cd packages/livekit-rtc && bun --bun run test) + (cd packages/livekit-server-sdk && EXPECT_BUN_RUNTIME=1 bun --bun run test) + (cd packages/livekit-rtc && EXPECT_BUN_RUNTIME=1 bun --bun run test) build_and_release: if: github.ref == 'refs/heads/main' diff --git a/packages/livekit-rtc/src/bun_runtime.test.ts b/packages/livekit-rtc/src/bun_runtime.test.ts new file mode 100644 index 00000000..e0664187 --- /dev/null +++ b/packages/livekit-rtc/src/bun_runtime.test.ts @@ -0,0 +1,27 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { describe, expect, it } from 'vitest'; + +// Guard: the "Bun" CI job runs vitest via `bun --bun run test` so test bodies +// execute on the bun runtime (with vitest still providing module mocking). The +// `--bun` flag is load-bearing — without it, bun honors vitest's +// `#!/usr/bin/env node` shebang and silently runs everything under node, making +// the bun job a meaningless duplicate of the node job. +// +// When EXPECT_BUN_RUNTIME=1 (set only by the bun CI step), assert that this test +// body is actually executing under bun. It runs through the exact same vitest +// path as the real suite, so it catches a regression where the runtime falls +// back to node. In every other run (the node CI job, local dev) the var is +// unset and this is skipped. +describe('bun runtime guard', () => { + const enforced = process.env.EXPECT_BUN_RUNTIME === '1'; + + it.skipIf(!enforced)('test bodies execute on the bun runtime', () => { + expect( + process.versions.bun, + 'EXPECT_BUN_RUNTIME=1 but tests are not running under bun — `bun --bun run test` ' + + 'fell back to node (check the --bun flag / invocation).', + ).toBeTruthy(); + }); +}); diff --git a/turbo.json b/turbo.json index 90b287f2..74d3222b 100644 --- a/turbo.json +++ b/turbo.json @@ -1,6 +1,6 @@ { "$schema": "https://turborepo.org/schema.json", - "globalEnv": ["LIVEKIT_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET", "NODE_ENV", "LIVEKIT_DEBUG_LOG_ROOM_EVENTS"], + "globalEnv": ["LIVEKIT_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET", "NODE_ENV", "LIVEKIT_DEBUG_LOG_ROOM_EVENTS", "EXPECT_BUN_RUNTIME"], "tasks": { "build": { "dependsOn": ["^build"],