diff --git a/packages/engine/src/services/parallelCoordinator.ts b/packages/engine/src/services/parallelCoordinator.ts index 4b21019bd..cf405476c 100644 --- a/packages/engine/src/services/parallelCoordinator.ts +++ b/packages/engine/src/services/parallelCoordinator.ts @@ -29,6 +29,15 @@ export interface WorkerTask { startFrame: number; endFrame: number; outputDir: string; + /** + * Offset subtracted from the absolute frame index when naming the captured + * file (`frame_.{ext}`). Default 0. Distributed + * chunks set this to the chunk's absolute startFrame so file names land + * 0-indexed within the chunk's range — the encoder reads frames + * sequentially without an `-start_number` override. The per-frame TIME + * calculation still uses the absolute frame index. + */ + outputFrameOffset?: number; } export interface WorkerResult { @@ -148,20 +157,22 @@ export function distributeFrames( totalFrames: number, workerCount: number, workDir: string, + rangeStart: number = 0, ): WorkerTask[] { const tasks: WorkerTask[] = []; const framesPerWorker = Math.ceil(totalFrames / workerCount); for (let i = 0; i < workerCount; i++) { - const startFrame = i * framesPerWorker; - const endFrame = Math.min((i + 1) * framesPerWorker, totalFrames); - if (startFrame >= totalFrames) break; + const startFrame = rangeStart + i * framesPerWorker; + const endFrame = Math.min(rangeStart + (i + 1) * framesPerWorker, rangeStart + totalFrames); + if (startFrame >= rangeStart + totalFrames) break; tasks.push({ workerId: i, startFrame, endFrame, outputDir: join(workDir, `worker-${i}`), + outputFrameOffset: rangeStart, }); } @@ -196,6 +207,7 @@ async function executeWorkerTask( ); await initializeSession(session); + const outputOffset = task.outputFrameOffset ?? 0; for (let i = task.startFrame; i < task.endFrame; i++) { if (signal?.aborted) { throw new Error("Parallel worker cancelled"); @@ -204,14 +216,16 @@ async function executeWorkerTask( // frame-index → time math. The 1-in-1001 ULP loss for NTSC is invisible // at our scales (frame count tops out at single-digit thousands). const time = (i * captureOptions.fps.den) / captureOptions.fps.num; + const fileFrameIdx = i - outputOffset; if (onFrameBuffer) { - // Streaming mode: capture to buffer and invoke callback - const { buffer } = await captureFrameToBuffer(session, i, time); + // The streaming-encode callback receives the absolute index `i` + // (not `fileFrameIdx`) so the encoder sequences frames against the + // composition's timeline. + const { buffer } = await captureFrameToBuffer(session, fileFrameIdx, time); await onFrameBuffer(i, buffer); } else { - // Disk mode: capture to file - await captureFrame(session, i, time); + await captureFrame(session, fileFrameIdx, time); } framesCaptured++; diff --git a/packages/producer/src/services/distributed/renderChunk.ts b/packages/producer/src/services/distributed/renderChunk.ts index 0cc494d8b..d0f9013e6 100644 --- a/packages/producer/src/services/distributed/renderChunk.ts +++ b/packages/producer/src/services/distributed/renderChunk.ts @@ -42,6 +42,7 @@ import { assertSwiftShader, type BeforeCaptureHook, BROWSER_GPU_NOT_SOFTWARE, + calculateOptimalWorkers, type CaptureOptions, type CaptureSession, closeCaptureSession, @@ -531,7 +532,13 @@ export async function renderChunk( // would deadlock Chrome's compositor by issuing a second beginFrame // at a `frameTimeTicks` it had just advanced to. - // ── Capture the chunk's range via runCaptureStage ── + // Capture-cost calibration based on shader transitions / + // renderModeHints is not threaded through to chunks yet; the in-process + // renderer's `resolveRenderWorkerCount` wraps this with that reduction, + // but `PlanJson` doesn't carry the compiled hints needed to call it + // directly. The existing adaptive-retry path reduces workers if + // compositor contention surfaces as CDP timeouts. + const chunkWorkerCount = calculateOptimalWorkers(framesInChunk, undefined, cfg); await runCaptureStage({ fileServer, workDir, @@ -541,11 +548,10 @@ export async function renderChunk( cfg, forceScreenshot: encoder.forceScreenshot, log, - workerCount: 1, - // Pass the pre-warmed session through as `probeSession` so captureStage - // reuses it via `prepareCaptureSessionForReuse` instead of spinning up - // a fresh browser. The stage closes the session in its `finally`, - // so we MUST clear our own reference here to avoid a double-close. + workerCount: chunkWorkerCount, + // The parallel branch closes this session and spins up its own + // worker sessions, wasting the ~3-5s of pre-warmed setup. Worth a + // follow-up to skip pre-warmup when the resolved workerCount > 1. probeSession: session, needsAlpha: plan.dimensions.format !== "mp4", captureAttempts: [], diff --git a/packages/producer/src/services/render/stages/captureStage.ts b/packages/producer/src/services/render/stages/captureStage.ts index c52ad2d7a..176498b70 100644 --- a/packages/producer/src/services/render/stages/captureStage.ts +++ b/packages/producer/src/services/render/stages/captureStage.ts @@ -96,23 +96,15 @@ export interface CaptureStageInput { onProgress?: ProgressCallback; /** * Capture a sub-range `[startFrame, endFrame)` of the composition's - * timeline. Used by distributed `renderChunk` workers to render only - * their assigned chunk. Captured frames are written with file names - * normalized to start at zero (`frame_000000.{ext}`) so the encoder - * doesn't need an `-start_number` override; per-frame TIMES still - * reflect the absolute frame index via `(absIdx * fps.den) / fps.num`, - * keeping the page's virtual clock identical to what an in-process - * render at that frame would see. + * timeline. Used by distributed `renderChunk` to render only its chunk. + * Captured file names are 0-indexed within the range; per-frame TIMES use + * the absolute frame index so the page's virtual clock matches an + * in-process render at that frame. Supported on both the sequential and + * parallel branches; the parallel branch threads `frameRange.startFrame` + * through as `frameRangeStart`. See `WorkerTask.outputFrameOffset`. * - * Only honored on the sequential capture branch (workerCount === 1). - * The parallel branch in this stage targets in-process renders where - * adaptive retry across the whole timeline is the contract, and chunk - * workers fan out at the activity layer instead. Passing `frameRange` - * with `workerCount > 1` throws — the caller should reduce - * `workerCount` to 1. - * - * Default `undefined`: the stage captures `[0, totalFrames)` (the - * in-process contract). + * Default `undefined`: capture `[0, totalFrames)` (in-process contract). + * When set, `endFrame - startFrame` MUST equal `totalFrames`. */ frameRange?: { startFrame: number; endFrame: number }; } @@ -155,12 +147,6 @@ export async function runCaptureStage(input: CaptureStageInput): Promise 1) { - throw new Error( - `[captureStage] frameRange capture requires workerCount === 1 (received workerCount=${workerCount}). ` + - `Distributed chunk workers fan out at the activity layer; reduce workerCount to 1 when passing frameRange.`, - ); - } if (frameRange !== undefined) { if ( !Number.isFinite(frameRange.startFrame) || @@ -173,10 +159,24 @@ export async function runCaptureStage(input: CaptureStageInput): Promise 1) { - // Parallel capture + // Parallel capture. When `frameRange` is set (distributed chunk), pass + // `frameRangeStart` so workers land on absolute composition frame indices + // for time math while file names stay 0-indexed within the chunk range. const attempts = await executeDiskCaptureWithAdaptiveRetry({ serverUrl: fileServer.url, workDir, @@ -188,6 +188,7 @@ export async function runCaptureStage(input: CaptureStageInput): Promise { job.framesRendered = progress.capturedFrames; const frameProgress = progress.capturedFrames / progress.totalFrames; diff --git a/packages/producer/src/services/renderOrchestrator.ts b/packages/producer/src/services/renderOrchestrator.ts index 3a8ee69f1..5c5ce9ebf 100644 --- a/packages/producer/src/services/renderOrchestrator.ts +++ b/packages/producer/src/services/renderOrchestrator.ts @@ -536,17 +536,24 @@ export function buildMissingFrameRetryBatches( maxWorkers: number, workDir: string, attempt: number, + rangeStart: number = 0, ): WorkerTask[][] { const workersPerBatch = Math.max(1, Math.floor(maxWorkers)); const batches: WorkerTask[][] = []; + // `ranges` are 0-indexed within the chunk's frame range (or full timeline + // when `rangeStart === 0`); translate to absolute composition indices so + // `WorkerTask`'s per-frame time math lands on the page's actual virtual + // clock, and propagate `outputFrameOffset` so the retry captures back at + // the same local file name `findMissingFrameRanges` was looking for. for (let i = 0; i < ranges.length; i += workersPerBatch) { const batchIndex = batches.length; const batch = ranges.slice(i, i + workersPerBatch).map((range, workerId) => ({ workerId, - startFrame: range.startFrame, - endFrame: range.endFrame, + startFrame: rangeStart + range.startFrame, + endFrame: rangeStart + range.endFrame, outputDir: join(workDir, `retry-${attempt}-batch-${batchIndex}-worker-${workerId}`), + outputFrameOffset: rangeStart, })); batches.push(batch); } @@ -605,11 +612,18 @@ export async function executeDiskCaptureWithAdaptiveRetry(options: { onProgress?: (progress: ParallelProgress) => void; cfg: EngineConfig; log: ProducerLogger; + /** + * Forwarded to each `WorkerTask`'s `outputFrameOffset` and to the + * `buildMissingFrameRetryBatches` translation. Default 0 (in-process + * contract: `[0, totalFrames)`). See `WorkerTask.outputFrameOffset`. + */ + frameRangeStart?: number; }): Promise { const attempts: CaptureAttemptSummary[] = []; let currentWorkers = options.initialWorkerCount; let missingRanges: FrameRange[] | null = null; let attempt = 0; + const rangeStart = options.frameRangeStart ?? 0; while (true) { const frameCount = missingRanges ? countFrameRanges(missingRanges) : options.totalFrames; @@ -622,8 +636,14 @@ export async function executeDiskCaptureWithAdaptiveRetry(options: { const attemptWorkDir = join(options.workDir, `capture-attempt-${attempt}`); const batches = missingRanges - ? buildMissingFrameRetryBatches(missingRanges, currentWorkers, attemptWorkDir, attempt) - : [distributeFrames(options.totalFrames, currentWorkers, attemptWorkDir)]; + ? buildMissingFrameRetryBatches( + missingRanges, + currentWorkers, + attemptWorkDir, + attempt, + rangeStart, + ) + : [distributeFrames(options.totalFrames, currentWorkers, attemptWorkDir, rangeStart)]; try { for (const tasks of batches) {