diff --git a/README.md b/README.md index 00573e5..ba03c43 100644 --- a/README.md +++ b/README.md @@ -128,7 +128,7 @@ Run a Codex turn, then open your Langfuse project to see the trace. - **Authentication fails** — check that the public/secret keys are valid and that `LANGFUSE_BASE_URL` matches the region the keys belong to. - **Traces land in the wrong project** — API keys are project-scoped in Langfuse; use the keys for the project you want. - **Testing hook failures** — set `LANGFUSE_CODEX_FAIL_ON_ERROR=true` together with `LANGFUSE_CODEX_DEBUG=true` to make Codex report upload or flush errors instead of failing open. -- **Checking dedup sidecars** — successful uploads of completed turns are recorded next to the rollout as `.jsonl.langfuse`. If a Stop hook reads the rollout before Codex has written the turn-completed marker, the trace may upload without a sidecar entry; the next Stop hook will finalize and mark it. +- **Checking dedup sidecars** — successful uploads of completed turns are recorded next to the rollout as `.jsonl.langfuse`. If a Stop hook reads the rollout before Codex has written the turn-completed marker, the plugin skips that trailing in-progress turn; the next Stop hook uploads and marks it after completion. - **Verifying in Langfuse** — use `npx langfuse-cli api traces list --from-timestamp --limit 10 --order-by timestamp.desc --fields core,metrics,observations --json` with credentials for the same project. - **Sandboxed/network-restricted runs** — Codex sandbox or network policy can prevent exports from reaching Langfuse. Debug logging and fail-on-error mode are the quickest way to distinguish hook execution from network failure. - **Self-hosting** — the TypeScript SDK requires Langfuse platform version >= 3.95.0. diff --git a/plugins/tracing/dist/index.mjs b/plugins/tracing/dist/index.mjs index 0460e13..3c27623 100644 --- a/plugins/tracing/dist/index.mjs +++ b/plugins/tracing/dist/index.mjs @@ -46823,8 +46823,9 @@ function parseSession(lines) { * The `Stop` hook fires after every Codex turn and re-reads the whole rollout * file, so completed turns would be re-uploaded each time. We record uploaded * turn ids in a sidecar file (`.langfuse`) and skip them on -* subsequent invocations. In-progress (not-yet-completed) turns are uploaded -* but intentionally not recorded, so they finalize on the next hook run. +* subsequent invocations. In-progress (not-yet-completed) trailing turns are +* skipped until Codex writes the completion marker, so a later hook invocation +* uploads the final turn once instead of creating a partial duplicate. */ async function loadUploadedTurnIds(rolloutFile) { try { @@ -47002,7 +47003,11 @@ async function convertRollout(rolloutFile, options) { } const uploaded = await loadUploadedTurnIds(rolloutFile); for (const turn of turns) { - if (turn.completed && turn.turnId && uploaded.has(turn.turnId)) continue; + if (!turn.completed) { + if (turn.turnId) debugLog(`skipping in-progress turn ${turn.turnId}; waiting for completion`); + continue; + } + if (turn.turnId && uploaded.has(turn.turnId)) continue; await propagateAttributes({ sessionId: sessionMeta.sessionId, traceName: "Codex Turn", @@ -47015,10 +47020,10 @@ async function convertRollout(rolloutFile, options) { rolloutFile }); }); - if (turn.completed && turn.turnId) { + if (turn.turnId) { uploaded.add(turn.turnId); await markTurnUploaded(rolloutFile, turn.turnId); - } else if (turn.turnId) debugLog(`uploaded in-progress turn ${turn.turnId}; waiting for completion before sidecar mark`); + } } } diff --git a/plugins/tracing/src/sidecar.ts b/plugins/tracing/src/sidecar.ts index e0efd87..fdb0f3c 100644 --- a/plugins/tracing/src/sidecar.ts +++ b/plugins/tracing/src/sidecar.ts @@ -6,8 +6,9 @@ import * as fs from "node:fs/promises"; * The `Stop` hook fires after every Codex turn and re-reads the whole rollout * file, so completed turns would be re-uploaded each time. We record uploaded * turn ids in a sidecar file (`.langfuse`) and skip them on - * subsequent invocations. In-progress (not-yet-completed) turns are uploaded - * but intentionally not recorded, so they finalize on the next hook run. + * subsequent invocations. In-progress (not-yet-completed) trailing turns are + * skipped until Codex writes the completion marker, so a later hook invocation + * uploads the final turn once instead of creating a partial duplicate. */ export async function loadUploadedTurnIds(rolloutFile: string): Promise> { try { diff --git a/plugins/tracing/src/trace.ts b/plugins/tracing/src/trace.ts index 2c949ab..702ff16 100644 --- a/plugins/tracing/src/trace.ts +++ b/plugins/tracing/src/trace.ts @@ -250,7 +250,14 @@ export async function convertRollout( const uploaded = await loadUploadedTurnIds(rolloutFile); for (const turn of turns) { - if (turn.completed && turn.turnId && uploaded.has(turn.turnId)) { + if (!turn.completed) { + if (turn.turnId) { + debugLog(`skipping in-progress turn ${turn.turnId}; waiting for completion`); + } + continue; + } + + if (turn.turnId && uploaded.has(turn.turnId)) { continue; // already uploaded in a previous hook invocation } @@ -270,15 +277,9 @@ export async function convertRollout( }, ); - // Only mark completed turns as uploaded; an in-progress trailing turn is - // re-uploaded (and finalized) on the next hook invocation. - if (turn.completed && turn.turnId) { + if (turn.turnId) { uploaded.add(turn.turnId); await markTurnUploaded(rolloutFile, turn.turnId); - } else if (turn.turnId) { - debugLog( - `uploaded in-progress turn ${turn.turnId}; waiting for completion before sidecar mark`, - ); } } } diff --git a/plugins/tracing/test/trace.test.ts b/plugins/tracing/test/trace.test.ts index ec429d1..3378c80 100644 --- a/plugins/tracing/test/trace.test.ts +++ b/plugins/tracing/test/trace.test.ts @@ -138,4 +138,51 @@ describe("convertRollout", () => { await convertRollout(file, { config: baseConfig }); expect(exporter.getFinishedSpans()).toHaveLength(0); }); + + it("waits for a trailing in-progress turn instead of uploading a partial duplicate", async () => { + const dir = stageFixtures(); + const file = path.join(dir, "rollout-in-progress.jsonl"); + + fs.writeFileSync( + file, + [ + { + timestamp: "2026-06-03T12:00:00.000Z", + type: "session_meta", + payload: { id: "sess-in-progress" }, + }, + { + timestamp: "2026-06-03T12:00:01.000Z", + type: "event_msg", + payload: { type: "task_started", turn_id: "turn-in-progress" }, + }, + { + timestamp: "2026-06-03T12:00:01.200Z", + type: "turn_context", + payload: { model: "gpt-5.4" }, + }, + { + timestamp: "2026-06-03T12:00:01.300Z", + type: "event_msg", + payload: { type: "user_message", message: "hi" }, + }, + { + timestamp: "2026-06-03T12:00:02.000Z", + type: "response_item", + payload: { + type: "message", + role: "assistant", + content: [{ type: "output_text", text: "working..." }], + }, + }, + ] + .map((line) => JSON.stringify(line)) + .join("\n"), + ); + + await convertRollout(file, { config: baseConfig }); + + expect(exporter.getFinishedSpans()).toHaveLength(0); + expect(fs.existsSync(`${file}.langfuse`)).toBe(false); + }); });