Skip to content

Commit 88d6d10

Browse files
d-csclaude
andcommitted
feat(run-ops): clickhouse cross-producer version helper + testcontainers run-ops dep
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent ea51491 commit 88d6d10

5 files changed

Lines changed: 804 additions & 0 deletions

File tree

internal-packages/clickhouse/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ export {
7878
PAYLOAD_INDEX,
7979
getTaskRunField,
8080
getPayloadField,
81+
composeTaskRunVersion,
8182
} from "./taskRuns.js";
8283

8384
export { SESSION_COLUMNS, SESSION_INDEX, getSessionField } from "./sessions.js";

internal-packages/clickhouse/src/taskRuns.test.ts

Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { z } from "zod";
33
import { ClickhouseClient } from "./client/client.js";
44
import {
55
TASK_RUN_INDEX,
6+
composeTaskRunVersion,
67
getChildRunStatusCounts,
78
getTaskRunsQueryBuilder,
89
insertRawTaskRunPayloadsCompactArrays,
@@ -887,4 +888,313 @@ describe("Task Runs V2", () => {
887888
);
888889
}
889890
);
891+
892+
clickhouseTest(
893+
"should collapse the same run from two producers to one latest-snapshot row",
894+
async ({ clickhouseContainer }) => {
895+
const client = new ClickhouseClient({
896+
name: "test",
897+
url: clickhouseContainer.getConnectionUrl(),
898+
});
899+
const insert = insertTaskRunsCompactArrays(client, { async_insert: 0 });
900+
901+
const createdAt = new Date("2025-04-30 16:34:04.312").getTime();
902+
903+
const base: TaskRunInsertArray = [
904+
"cm9kddfcs01zqdy88ld9mmrli",
905+
"cm8zs78wb0002dy616dg75tv3",
906+
"cm9kddfbz01zpdy88t9dstecu",
907+
"cma45oli70002qrdy47w0j4n7",
908+
createdAt,
909+
createdAt,
910+
"PENDING",
911+
"PRODUCTION",
912+
"run_cma45oli70002qrdy47w0j4n7",
913+
1,
914+
"V2",
915+
"retry-task",
916+
"task/retry-task",
917+
"",
918+
"",
919+
null,
920+
null,
921+
null,
922+
null,
923+
createdAt,
924+
null,
925+
0,
926+
0,
927+
0,
928+
{ data: null },
929+
{ data: null },
930+
"",
931+
[],
932+
"",
933+
"",
934+
"",
935+
"",
936+
"",
937+
"",
938+
0,
939+
"span",
940+
"trace",
941+
"",
942+
"",
943+
"",
944+
"",
945+
true,
946+
"1",
947+
0,
948+
"",
949+
[],
950+
"",
951+
"",
952+
"",
953+
null,
954+
"",
955+
"",
956+
"",
957+
null,
958+
];
959+
960+
const rdsSnapshot: TaskRunInsertArray = [...base];
961+
rdsSnapshot[TASK_RUN_INDEX.status] = "PENDING";
962+
rdsSnapshot[TASK_RUN_INDEX._version] = composeTaskRunVersion({
963+
originGeneration: 0,
964+
lsnVersion: 9_000_000_000n,
965+
}).toString();
966+
967+
const psSnapshot: TaskRunInsertArray = [...base];
968+
psSnapshot[TASK_RUN_INDEX.status] = "COMPLETED_SUCCESSFULLY";
969+
psSnapshot[TASK_RUN_INDEX._version] = composeTaskRunVersion({
970+
originGeneration: 1,
971+
lsnVersion: 10n,
972+
}).toString();
973+
974+
const [insertError] = await insert([rdsSnapshot, psSnapshot]);
975+
expect(insertError).toBeNull();
976+
977+
const query = client.query({
978+
name: "q",
979+
query:
980+
"SELECT run_id, status, count() OVER () AS total FROM trigger_dev.task_runs_v2 FINAL",
981+
schema: z.object({ run_id: z.string(), status: z.string(), total: z.number().int() }),
982+
});
983+
const [queryError, result] = await query({});
984+
expect(queryError).toBeNull();
985+
expect(result).toHaveLength(1);
986+
expect(result?.[0]).toEqual(
987+
expect.objectContaining({
988+
run_id: "cma45oli70002qrdy47w0j4n7",
989+
status: "COMPLETED_SUCCESSFULLY",
990+
})
991+
);
992+
}
993+
);
994+
995+
clickhouseTest(
996+
"should keep the latest intra-producer snapshot (same generation, ascending LSN)",
997+
async ({ clickhouseContainer }) => {
998+
const client = new ClickhouseClient({
999+
name: "test",
1000+
url: clickhouseContainer.getConnectionUrl(),
1001+
});
1002+
const insert = insertTaskRunsCompactArrays(client, { async_insert: 0 });
1003+
1004+
const createdAt = new Date("2025-04-30 16:34:04.312").getTime();
1005+
1006+
const base: TaskRunInsertArray = [
1007+
"cm9kddfcs01zqdy88ld9mmrli",
1008+
"cm8zs78wb0002dy616dg75tv3",
1009+
"cm9kddfbz01zpdy88t9dstecu",
1010+
"cma45oli70002qrdy47w0j4n7",
1011+
createdAt,
1012+
createdAt,
1013+
"PENDING",
1014+
"PRODUCTION",
1015+
"run_cma45oli70002qrdy47w0j4n7",
1016+
1,
1017+
"V2",
1018+
"retry-task",
1019+
"task/retry-task",
1020+
"",
1021+
"",
1022+
null,
1023+
null,
1024+
null,
1025+
null,
1026+
createdAt,
1027+
null,
1028+
0,
1029+
0,
1030+
0,
1031+
{ data: null },
1032+
{ data: null },
1033+
"",
1034+
[],
1035+
"",
1036+
"",
1037+
"",
1038+
"",
1039+
"",
1040+
"",
1041+
0,
1042+
"span",
1043+
"trace",
1044+
"",
1045+
"",
1046+
"",
1047+
"",
1048+
true,
1049+
"1",
1050+
0,
1051+
"",
1052+
[],
1053+
"",
1054+
"",
1055+
"",
1056+
null,
1057+
"",
1058+
"",
1059+
"",
1060+
null,
1061+
];
1062+
1063+
const earlier: TaskRunInsertArray = [...base];
1064+
earlier[TASK_RUN_INDEX.status] = "EXECUTING";
1065+
earlier[TASK_RUN_INDEX._version] = composeTaskRunVersion({
1066+
originGeneration: 1,
1067+
lsnVersion: 10n,
1068+
}).toString();
1069+
1070+
const later: TaskRunInsertArray = [...base];
1071+
later[TASK_RUN_INDEX.status] = "COMPLETED_SUCCESSFULLY";
1072+
later[TASK_RUN_INDEX._version] = composeTaskRunVersion({
1073+
originGeneration: 1,
1074+
lsnVersion: 20n,
1075+
}).toString();
1076+
1077+
const [insertError] = await insert([earlier, later]);
1078+
expect(insertError).toBeNull();
1079+
1080+
const query = client.query({
1081+
name: "q",
1082+
query:
1083+
"SELECT run_id, status, count() OVER () AS total FROM trigger_dev.task_runs_v2 FINAL",
1084+
schema: z.object({ run_id: z.string(), status: z.string(), total: z.number().int() }),
1085+
});
1086+
const [queryError, result] = await query({});
1087+
expect(queryError).toBeNull();
1088+
expect(result).toHaveLength(1);
1089+
expect(result?.[0]).toEqual(
1090+
expect.objectContaining({
1091+
run_id: "cma45oli70002qrdy47w0j4n7",
1092+
status: "COMPLETED_SUCCESSFULLY",
1093+
})
1094+
);
1095+
}
1096+
);
1097+
1098+
clickhouseTest(
1099+
"should collapse to the same winner regardless of insert order",
1100+
async ({ clickhouseContainer }) => {
1101+
const client = new ClickhouseClient({
1102+
name: "test",
1103+
url: clickhouseContainer.getConnectionUrl(),
1104+
});
1105+
const insert = insertTaskRunsCompactArrays(client, { async_insert: 0 });
1106+
1107+
const createdAt = new Date("2025-04-30 16:34:04.312").getTime();
1108+
1109+
const base: TaskRunInsertArray = [
1110+
"cm9kddfcs01zqdy88ld9mmrli",
1111+
"cm8zs78wb0002dy616dg75tv3",
1112+
"cm9kddfbz01zpdy88t9dstecu",
1113+
"cma45oli70002qrdy47w0j4n7",
1114+
createdAt,
1115+
createdAt,
1116+
"PENDING",
1117+
"PRODUCTION",
1118+
"run_cma45oli70002qrdy47w0j4n7",
1119+
1,
1120+
"V2",
1121+
"retry-task",
1122+
"task/retry-task",
1123+
"",
1124+
"",
1125+
null,
1126+
null,
1127+
null,
1128+
null,
1129+
createdAt,
1130+
null,
1131+
0,
1132+
0,
1133+
0,
1134+
{ data: null },
1135+
{ data: null },
1136+
"",
1137+
[],
1138+
"",
1139+
"",
1140+
"",
1141+
"",
1142+
"",
1143+
"",
1144+
0,
1145+
"span",
1146+
"trace",
1147+
"",
1148+
"",
1149+
"",
1150+
"",
1151+
true,
1152+
"1",
1153+
0,
1154+
"",
1155+
[],
1156+
"",
1157+
"",
1158+
"",
1159+
null,
1160+
"",
1161+
"",
1162+
"",
1163+
null,
1164+
];
1165+
1166+
const rdsSnapshot: TaskRunInsertArray = [...base];
1167+
rdsSnapshot[TASK_RUN_INDEX.status] = "PENDING";
1168+
rdsSnapshot[TASK_RUN_INDEX._version] = composeTaskRunVersion({
1169+
originGeneration: 0,
1170+
lsnVersion: 9_000_000_000n,
1171+
}).toString();
1172+
1173+
const psSnapshot: TaskRunInsertArray = [...base];
1174+
psSnapshot[TASK_RUN_INDEX.status] = "COMPLETED_SUCCESSFULLY";
1175+
psSnapshot[TASK_RUN_INDEX._version] = composeTaskRunVersion({
1176+
originGeneration: 1,
1177+
lsnVersion: 10n,
1178+
}).toString();
1179+
1180+
const [insertError] = await insert([psSnapshot, rdsSnapshot]);
1181+
expect(insertError).toBeNull();
1182+
1183+
const query = client.query({
1184+
name: "q",
1185+
query:
1186+
"SELECT run_id, status, count() OVER () AS total FROM trigger_dev.task_runs_v2 FINAL",
1187+
schema: z.object({ run_id: z.string(), status: z.string(), total: z.number().int() }),
1188+
});
1189+
const [queryError, result] = await query({});
1190+
expect(queryError).toBeNull();
1191+
expect(result).toHaveLength(1);
1192+
expect(result?.[0]).toEqual(
1193+
expect.objectContaining({
1194+
run_id: "cma45oli70002qrdy47w0j4n7",
1195+
status: "COMPLETED_SUCCESSFULLY",
1196+
})
1197+
);
1198+
}
1199+
);
8901200
});

internal-packages/clickhouse/src/taskRuns.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,32 @@ export function getTaskRunField<K extends TaskRunColumnName>(
203203
return run[TASK_RUN_INDEX[field]] as TaskRunFieldTypes[K];
204204
}
205205

206+
/**
207+
* Compose a globally-comparable ReplacingMergeTree version for task_runs_v2
208+
* when the same run can be replicated from more than one Postgres producer.
209+
*
210+
* Each producer has its own, mutually-incomparable LSN space, so the raw
211+
* LSN-derived version cannot be compared across producers. We reserve the top
212+
* 8 bits for an `originGeneration` epoch (monotonic across producers: the more
213+
* authoritative / later-cutover producer gets the higher generation) and keep
214+
* the producer's own LSN in the low 56 bits to preserve in-producer ordering.
215+
*
216+
* Self-host single-DB never calls this (one producer => generation is constant
217+
* and the existing raw LSN path is sufficient); the split gate skips it.
218+
*/
219+
export function composeTaskRunVersion(opts: {
220+
originGeneration: number;
221+
lsnVersion: bigint;
222+
}): bigint {
223+
const gen = BigInt(opts.originGeneration);
224+
if (gen < BigInt(0) || gen > BigInt(0xff)) {
225+
throw new Error(`originGeneration out of range (0-255): ${opts.originGeneration}`);
226+
}
227+
const LSN_BITS = BigInt(56);
228+
const LSN_MASK = (BigInt(1) << LSN_BITS) - BigInt(1); // low 56 bits
229+
return (gen << LSN_BITS) | (opts.lsnVersion & LSN_MASK);
230+
}
231+
206232
export function insertTaskRunsCompactArrays(ch: ClickhouseWriter, settings?: ClickHouseSettings) {
207233
return ch.insertCompactRaw({
208234
name: "insertTaskRunsCompactArrays",

0 commit comments

Comments
 (0)