Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions packages/engine/src/services/parallelCoordinator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ export interface WorkerTask {
startFrame: number;
endFrame: number;
outputDir: string;
/**
* Offset subtracted from the absolute frame index when naming the captured
* file (`frame_<i - outputFrameOffset>.{ext}`). Default 0. Distributed
* chunks set this to the chunk's absolute startFrame so file names land
* 0-indexed within the chunk's range — the encoder reads frames
* sequentially without an `-start_number` override. The per-frame TIME
* calculation still uses the absolute frame index.
*/
outputFrameOffset?: number;
}

export interface WorkerResult {
Expand Down Expand Up @@ -148,20 +157,22 @@ export function distributeFrames(
totalFrames: number,
workerCount: number,
workDir: string,
rangeStart: number = 0,
): WorkerTask[] {
const tasks: WorkerTask[] = [];
const framesPerWorker = Math.ceil(totalFrames / workerCount);

for (let i = 0; i < workerCount; i++) {
const startFrame = i * framesPerWorker;
const endFrame = Math.min((i + 1) * framesPerWorker, totalFrames);
if (startFrame >= totalFrames) break;
const startFrame = rangeStart + i * framesPerWorker;
const endFrame = Math.min(rangeStart + (i + 1) * framesPerWorker, rangeStart + totalFrames);
if (startFrame >= rangeStart + totalFrames) break;

tasks.push({
workerId: i,
startFrame,
endFrame,
outputDir: join(workDir, `worker-${i}`),
outputFrameOffset: rangeStart,
});
}

Expand Down Expand Up @@ -196,6 +207,7 @@ async function executeWorkerTask(
);
await initializeSession(session);

const outputOffset = task.outputFrameOffset ?? 0;
for (let i = task.startFrame; i < task.endFrame; i++) {
if (signal?.aborted) {
throw new Error("Parallel worker cancelled");
Expand All @@ -204,14 +216,16 @@ async function executeWorkerTask(
// frame-index → time math. The 1-in-1001 ULP loss for NTSC is invisible
// at our scales (frame count tops out at single-digit thousands).
const time = (i * captureOptions.fps.den) / captureOptions.fps.num;
const fileFrameIdx = i - outputOffset;

if (onFrameBuffer) {
// Streaming mode: capture to buffer and invoke callback
const { buffer } = await captureFrameToBuffer(session, i, time);
// The streaming-encode callback receives the absolute index `i`
// (not `fileFrameIdx`) so the encoder sequences frames against the
// composition's timeline.
const { buffer } = await captureFrameToBuffer(session, fileFrameIdx, time);
await onFrameBuffer(i, buffer);
} else {
// Disk mode: capture to file
await captureFrame(session, i, time);
await captureFrame(session, fileFrameIdx, time);
}
framesCaptured++;

Expand Down
18 changes: 12 additions & 6 deletions packages/producer/src/services/distributed/renderChunk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {
assertSwiftShader,
type BeforeCaptureHook,
BROWSER_GPU_NOT_SOFTWARE,
calculateOptimalWorkers,
type CaptureOptions,
type CaptureSession,
closeCaptureSession,
Expand Down Expand Up @@ -531,7 +532,13 @@ export async function renderChunk(
// would deadlock Chrome's compositor by issuing a second beginFrame
// at a `frameTimeTicks` it had just advanced to.

// ── Capture the chunk's range via runCaptureStage ──
// Capture-cost calibration based on shader transitions /
// renderModeHints is not threaded through to chunks yet; the in-process
// renderer's `resolveRenderWorkerCount` wraps this with that reduction,
// but `PlanJson` doesn't carry the compiled hints needed to call it
// directly. The existing adaptive-retry path reduces workers if
// compositor contention surfaces as CDP timeouts.
const chunkWorkerCount = calculateOptimalWorkers(framesInChunk, undefined, cfg);
await runCaptureStage({
fileServer,
workDir,
Expand All @@ -541,11 +548,10 @@ export async function renderChunk(
cfg,
forceScreenshot: encoder.forceScreenshot,
log,
workerCount: 1,
// Pass the pre-warmed session through as `probeSession` so captureStage
// reuses it via `prepareCaptureSessionForReuse` instead of spinning up
// a fresh browser. The stage closes the session in its `finally`,
// so we MUST clear our own reference here to avoid a double-close.
workerCount: chunkWorkerCount,
// The parallel branch closes this session and spins up its own
// worker sessions, wasting the ~3-5s of pre-warmed setup. Worth a
// follow-up to skip pre-warmup when the resolved workerCount > 1.
probeSession: session,
needsAlpha: plan.dimensions.format !== "mp4",
captureAttempts: [],
Expand Down
47 changes: 24 additions & 23 deletions packages/producer/src/services/render/stages/captureStage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,23 +96,15 @@ export interface CaptureStageInput {
onProgress?: ProgressCallback;
/**
* Capture a sub-range `[startFrame, endFrame)` of the composition's
* timeline. Used by distributed `renderChunk` workers to render only
* their assigned chunk. Captured frames are written with file names
* normalized to start at zero (`frame_000000.{ext}`) so the encoder
* doesn't need an `-start_number` override; per-frame TIMES still
* reflect the absolute frame index via `(absIdx * fps.den) / fps.num`,
* keeping the page's virtual clock identical to what an in-process
* render at that frame would see.
* timeline. Used by distributed `renderChunk` to render only its chunk.
* Captured file names are 0-indexed within the range; per-frame TIMES use
* the absolute frame index so the page's virtual clock matches an
* in-process render at that frame. Supported on both the sequential and
* parallel branches; the parallel branch threads `frameRange.startFrame`
* through as `frameRangeStart`. See `WorkerTask.outputFrameOffset`.
*
* Only honored on the sequential capture branch (workerCount === 1).
* The parallel branch in this stage targets in-process renders where
* adaptive retry across the whole timeline is the contract, and chunk
* workers fan out at the activity layer instead. Passing `frameRange`
* with `workerCount > 1` throws — the caller should reduce
* `workerCount` to 1.
*
* Default `undefined`: the stage captures `[0, totalFrames)` (the
* in-process contract).
* Default `undefined`: capture `[0, totalFrames)` (in-process contract).
* When set, `endFrame - startFrame` MUST equal `totalFrames`.
*/
frameRange?: { startFrame: number; endFrame: number };
}
Expand Down Expand Up @@ -155,12 +147,6 @@ export async function runCaptureStage(input: CaptureStageInput): Promise<Capture
const captureCfg: EngineConfig =
cfg.forceScreenshot === forceScreenshot ? cfg : { ...cfg, forceScreenshot };

if (frameRange !== undefined && workerCount > 1) {
throw new Error(
`[captureStage] frameRange capture requires workerCount === 1 (received workerCount=${workerCount}). ` +
`Distributed chunk workers fan out at the activity layer; reduce workerCount to 1 when passing frameRange.`,
);
}
if (frameRange !== undefined) {
if (
!Number.isFinite(frameRange.startFrame) ||
Expand All @@ -173,10 +159,24 @@ export async function runCaptureStage(input: CaptureStageInput): Promise<Capture
`Expected non-negative startFrame strictly less than endFrame.`,
);
}
// The parallel branch passes `totalFrames` to executeDiskCaptureWithAdaptiveRetry
// (which drives `distributeFrames` partitioning and `findMissingFrameRanges`
// completion checks) AND `frameRangeStart` separately. They must describe the
// same window: callers passing `totalFrames=100, frameRange={50, 200}` would
// get a silently wrong distribution.
const rangeFrames = frameRange.endFrame - frameRange.startFrame;
if (rangeFrames !== totalFrames) {
throw new Error(
`[captureStage] frameRange size (${rangeFrames}) must equal totalFrames (${totalFrames}). ` +
`Received frameRange=${JSON.stringify(frameRange)}.`,
);
}
}

if (workerCount > 1) {
// Parallel capture
// Parallel capture. When `frameRange` is set (distributed chunk), pass
// `frameRangeStart` so workers land on absolute composition frame indices
// for time math while file names stay 0-indexed within the chunk range.
const attempts = await executeDiskCaptureWithAdaptiveRetry({
serverUrl: fileServer.url,
workDir,
Expand All @@ -188,6 +188,7 @@ export async function runCaptureStage(input: CaptureStageInput): Promise<Capture
captureOptions: buildCaptureOptions(),
createBeforeCaptureHook: createRenderVideoFrameInjector,
abortSignal,
frameRangeStart: frameRange?.startFrame,
onProgress: (progress) => {
job.framesRendered = progress.capturedFrames;
const frameProgress = progress.capturedFrames / progress.totalFrames;
Expand Down
28 changes: 24 additions & 4 deletions packages/producer/src/services/renderOrchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -536,17 +536,24 @@ export function buildMissingFrameRetryBatches(
maxWorkers: number,
workDir: string,
attempt: number,
rangeStart: number = 0,
): WorkerTask[][] {
const workersPerBatch = Math.max(1, Math.floor(maxWorkers));
const batches: WorkerTask[][] = [];

// `ranges` are 0-indexed within the chunk's frame range (or full timeline
// when `rangeStart === 0`); translate to absolute composition indices so
// `WorkerTask`'s per-frame time math lands on the page's actual virtual
// clock, and propagate `outputFrameOffset` so the retry captures back at
// the same local file name `findMissingFrameRanges` was looking for.
for (let i = 0; i < ranges.length; i += workersPerBatch) {
const batchIndex = batches.length;
const batch = ranges.slice(i, i + workersPerBatch).map((range, workerId) => ({
workerId,
startFrame: range.startFrame,
endFrame: range.endFrame,
startFrame: rangeStart + range.startFrame,
endFrame: rangeStart + range.endFrame,
outputDir: join(workDir, `retry-${attempt}-batch-${batchIndex}-worker-${workerId}`),
outputFrameOffset: rangeStart,
}));
batches.push(batch);
}
Expand Down Expand Up @@ -605,11 +612,18 @@ export async function executeDiskCaptureWithAdaptiveRetry(options: {
onProgress?: (progress: ParallelProgress) => void;
cfg: EngineConfig;
log: ProducerLogger;
/**
* Forwarded to each `WorkerTask`'s `outputFrameOffset` and to the
* `buildMissingFrameRetryBatches` translation. Default 0 (in-process
* contract: `[0, totalFrames)`). See `WorkerTask.outputFrameOffset`.
*/
frameRangeStart?: number;
}): Promise<CaptureAttemptSummary[]> {
const attempts: CaptureAttemptSummary[] = [];
let currentWorkers = options.initialWorkerCount;
let missingRanges: FrameRange[] | null = null;
let attempt = 0;
const rangeStart = options.frameRangeStart ?? 0;

while (true) {
const frameCount = missingRanges ? countFrameRanges(missingRanges) : options.totalFrames;
Expand All @@ -622,8 +636,14 @@ export async function executeDiskCaptureWithAdaptiveRetry(options: {

const attemptWorkDir = join(options.workDir, `capture-attempt-${attempt}`);
const batches = missingRanges
? buildMissingFrameRetryBatches(missingRanges, currentWorkers, attemptWorkDir, attempt)
: [distributeFrames(options.totalFrames, currentWorkers, attemptWorkDir)];
? buildMissingFrameRetryBatches(
missingRanges,
currentWorkers,
attemptWorkDir,
attempt,
rangeStart,
)
: [distributeFrames(options.totalFrames, currentWorkers, attemptWorkDir, rangeStart)];

try {
for (const tasks of batches) {
Expand Down
Loading