Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ Run a Codex turn, then open your Langfuse project to see the trace.
- **Authentication fails** — check that the public/secret keys are valid and that `LANGFUSE_BASE_URL` matches the region the keys belong to.
- **Traces land in the wrong project** — API keys are project-scoped in Langfuse; use the keys for the project you want.
- **Testing hook failures** — set `LANGFUSE_CODEX_FAIL_ON_ERROR=true` together with `LANGFUSE_CODEX_DEBUG=true` to make Codex report upload or flush errors instead of failing open.
- **Checking dedup sidecars** — successful uploads of completed turns are recorded next to the rollout as `<rollout>.jsonl.langfuse`. If a Stop hook reads the rollout before Codex has written the turn-completed marker, the trace may upload without a sidecar entry; the next Stop hook will finalize and mark it.
- **Checking dedup sidecars** — successful uploads of completed turns are recorded next to the rollout as `<rollout>.jsonl.langfuse`. If a Stop hook reads the rollout before Codex has written the turn-completed marker, the plugin skips that trailing in-progress turn; the next Stop hook uploads and marks it after completion.
- **Verifying in Langfuse** — use `npx langfuse-cli api traces list --from-timestamp <recent ISO> --limit 10 --order-by timestamp.desc --fields core,metrics,observations --json` with credentials for the same project.
- **Sandboxed/network-restricted runs** — Codex sandbox or network policy can prevent exports from reaching Langfuse. Debug logging and fail-on-error mode are the quickest way to distinguish hook execution from network failure.
- **Self-hosting** — the TypeScript SDK requires Langfuse platform version >= 3.95.0.
Expand Down
15 changes: 10 additions & 5 deletions plugins/tracing/dist/index.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -46823,8 +46823,9 @@ function parseSession(lines) {
* The `Stop` hook fires after every Codex turn and re-reads the whole rollout
* file, so completed turns would be re-uploaded each time. We record uploaded
* turn ids in a sidecar file (`<rolloutFile>.langfuse`) and skip them on
* subsequent invocations. In-progress (not-yet-completed) turns are uploaded
* but intentionally not recorded, so they finalize on the next hook run.
* subsequent invocations. In-progress (not-yet-completed) trailing turns are
* skipped until Codex writes the completion marker, so a later hook invocation
* uploads the final turn once instead of creating a partial duplicate.
*/
async function loadUploadedTurnIds(rolloutFile) {
try {
Expand Down Expand Up @@ -47002,7 +47003,11 @@ async function convertRollout(rolloutFile, options) {
}
const uploaded = await loadUploadedTurnIds(rolloutFile);
for (const turn of turns) {
if (turn.completed && turn.turnId && uploaded.has(turn.turnId)) continue;
if (!turn.completed) {
if (turn.turnId) debugLog(`skipping in-progress turn ${turn.turnId}; waiting for completion`);
continue;
}
if (turn.turnId && uploaded.has(turn.turnId)) continue;
await propagateAttributes({
sessionId: sessionMeta.sessionId,
traceName: "Codex Turn",
Expand All @@ -47015,10 +47020,10 @@ async function convertRollout(rolloutFile, options) {
rolloutFile
});
});
if (turn.completed && turn.turnId) {
if (turn.turnId) {
uploaded.add(turn.turnId);
await markTurnUploaded(rolloutFile, turn.turnId);
} else if (turn.turnId) debugLog(`uploaded in-progress turn ${turn.turnId}; waiting for completion before sidecar mark`);
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions plugins/tracing/src/sidecar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import * as fs from "node:fs/promises";
* The `Stop` hook fires after every Codex turn and re-reads the whole rollout
* file, so completed turns would be re-uploaded each time. We record uploaded
* turn ids in a sidecar file (`<rolloutFile>.langfuse`) and skip them on
* subsequent invocations. In-progress (not-yet-completed) turns are uploaded
* but intentionally not recorded, so they finalize on the next hook run.
* subsequent invocations. In-progress (not-yet-completed) trailing turns are
* skipped until Codex writes the completion marker, so a later hook invocation
* uploads the final turn once instead of creating a partial duplicate.
*/
export async function loadUploadedTurnIds(rolloutFile: string): Promise<Set<string>> {
try {
Expand Down
17 changes: 9 additions & 8 deletions plugins/tracing/src/trace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,14 @@ export async function convertRollout(
const uploaded = await loadUploadedTurnIds(rolloutFile);

for (const turn of turns) {
if (turn.completed && turn.turnId && uploaded.has(turn.turnId)) {
if (!turn.completed) {
if (turn.turnId) {
debugLog(`skipping in-progress turn ${turn.turnId}; waiting for completion`);
}
continue;
}

if (turn.turnId && uploaded.has(turn.turnId)) {
continue; // already uploaded in a previous hook invocation
}

Expand All @@ -270,15 +277,9 @@ export async function convertRollout(
},
);

// Only mark completed turns as uploaded; an in-progress trailing turn is
// re-uploaded (and finalized) on the next hook invocation.
if (turn.completed && turn.turnId) {
if (turn.turnId) {
uploaded.add(turn.turnId);
await markTurnUploaded(rolloutFile, turn.turnId);
} else if (turn.turnId) {
debugLog(
`uploaded in-progress turn ${turn.turnId}; waiting for completion before sidecar mark`,
);
}
}
}
47 changes: 47 additions & 0 deletions plugins/tracing/test/trace.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,51 @@ describe("convertRollout", () => {
await convertRollout(file, { config: baseConfig });
expect(exporter.getFinishedSpans()).toHaveLength(0);
});

it("waits for a trailing in-progress turn instead of uploading a partial duplicate", async () => {
const dir = stageFixtures();
const file = path.join(dir, "rollout-in-progress.jsonl");

fs.writeFileSync(
file,
[
{
timestamp: "2026-06-03T12:00:00.000Z",
type: "session_meta",
payload: { id: "sess-in-progress" },
},
{
timestamp: "2026-06-03T12:00:01.000Z",
type: "event_msg",
payload: { type: "task_started", turn_id: "turn-in-progress" },
},
{
timestamp: "2026-06-03T12:00:01.200Z",
type: "turn_context",
payload: { model: "gpt-5.4" },
},
{
timestamp: "2026-06-03T12:00:01.300Z",
type: "event_msg",
payload: { type: "user_message", message: "hi" },
},
{
timestamp: "2026-06-03T12:00:02.000Z",
type: "response_item",
payload: {
type: "message",
role: "assistant",
content: [{ type: "output_text", text: "working..." }],
},
},
]
.map((line) => JSON.stringify(line))
.join("\n"),
);

await convertRollout(file, { config: baseConfig });

expect(exporter.getFinishedSpans()).toHaveLength(0);
expect(fs.existsSync(`${file}.langfuse`)).toBe(false);
});
});