@@ -47,79 +47,88 @@ describe("RunsReplicationService (task_run_v2)", () => {
4747
4848 await runsReplicationService . start ( ) ;
4949
50- const organization = await prisma . organization . create ( {
51- data : { title : "test" , slug : "test" } ,
52- } ) ;
53- const project = await prisma . project . create ( {
54- data : {
55- name : "test" ,
56- slug : "test" ,
57- organizationId : organization . id ,
58- externalRef : "test" ,
59- } ,
60- } ) ;
61- const runtimeEnvironment = await prisma . runtimeEnvironment . create ( {
62- data : {
63- slug : "test" ,
64- type : "DEVELOPMENT" ,
65- projectId : project . id ,
66- organizationId : organization . id ,
67- apiKey : "test" ,
68- pkApiKey : "test" ,
69- shortcode : "test" ,
70- } ,
71- } ) ;
72-
73- // A v2 run lives in task_run_v2, keyed by a KSUID id.
74- const ksuid = RunId . generateKsuid ( ) ;
75- const run = await prisma . taskRunV2 . create ( {
76- data : {
77- id : ksuid . id ,
78- friendlyId : ksuid . friendlyId ,
79- taskIdentifier : "my-task" ,
80- payload : JSON . stringify ( { foo : "bar" } ) ,
81- payloadType : "application/json" ,
82- traceId : "v2trace" ,
83- spanId : "v2span" ,
84- queue : "test" ,
85- workerQueue : "us-east-1-next" ,
86- region : "us-east-1" ,
87- planType : "free" ,
88- runtimeEnvironmentId : runtimeEnvironment . id ,
89- projectId : project . id ,
90- organizationId : organization . id ,
91- environmentType : "DEVELOPMENT" ,
92- engine : "V2" ,
93- } ,
94- } ) ;
95-
96- await setTimeout ( 1000 ) ;
50+ try {
51+ const organization = await prisma . organization . create ( {
52+ data : { title : "test" , slug : "test" } ,
53+ } ) ;
54+ const project = await prisma . project . create ( {
55+ data : {
56+ name : "test" ,
57+ slug : "test" ,
58+ organizationId : organization . id ,
59+ externalRef : "test" ,
60+ } ,
61+ } ) ;
62+ const runtimeEnvironment = await prisma . runtimeEnvironment . create ( {
63+ data : {
64+ slug : "test" ,
65+ type : "DEVELOPMENT" ,
66+ projectId : project . id ,
67+ organizationId : organization . id ,
68+ apiKey : "test" ,
69+ pkApiKey : "test" ,
70+ shortcode : "test" ,
71+ } ,
72+ } ) ;
9773
98- const queryRuns = clickhouse . reader . query ( {
99- name : "runs-replication" ,
100- query : "SELECT * FROM trigger_dev.task_runs_v2 WHERE run_id = {runId: String}" ,
101- schema : z . any ( ) ,
102- params : z . object ( { runId : z . string ( ) } ) ,
103- } ) ;
74+ // A v2 run lives in task_run_v2, keyed by a KSUID id.
75+ const ksuid = RunId . generateKsuid ( ) ;
76+ const run = await prisma . taskRunV2 . create ( {
77+ data : {
78+ id : ksuid . id ,
79+ friendlyId : ksuid . friendlyId ,
80+ taskIdentifier : "my-task" ,
81+ payload : JSON . stringify ( { foo : "bar" } ) ,
82+ payloadType : "application/json" ,
83+ traceId : "v2trace" ,
84+ spanId : "v2span" ,
85+ queue : "test" ,
86+ workerQueue : "us-east-1-next" ,
87+ region : "us-east-1" ,
88+ planType : "free" ,
89+ runtimeEnvironmentId : runtimeEnvironment . id ,
90+ projectId : project . id ,
91+ organizationId : organization . id ,
92+ environmentType : "DEVELOPMENT" ,
93+ engine : "V2" ,
94+ } ,
95+ } ) ;
10496
105- const [ queryError , result ] = await queryRuns ( { runId : run . id } ) ;
97+ const queryRuns = clickhouse . reader . query ( {
98+ name : "runs-replication" ,
99+ query : "SELECT * FROM trigger_dev.task_runs_v2 WHERE run_id = {runId: String}" ,
100+ schema : z . any ( ) ,
101+ params : z . object ( { runId : z . string ( ) } ) ,
102+ } ) ;
106103
107- expect ( queryError ) . toBeNull ( ) ;
108- expect ( result ?. length ) . toBe ( 1 ) ;
109- expect ( result ?. [ 0 ] ) . toEqual (
110- expect . objectContaining ( {
111- run_id : run . id ,
112- friendly_id : run . friendlyId ,
113- task_identifier : "my-task" ,
114- environment_id : runtimeEnvironment . id ,
115- project_id : project . id ,
116- organization_id : organization . id ,
117- environment_type : "DEVELOPMENT" ,
118- engine : "V2" ,
119- } )
120- ) ;
104+ // ClickHouse replication is asynchronous: poll until the row lands
105+ // (bounded) instead of a fixed sleep, which is flaky under lag variance.
106+ let queryError : unknown = null ;
107+ let result : Array < Record < string , unknown > > | undefined ;
108+ const deadline = Date . now ( ) + 10_000 ;
109+ do {
110+ [ queryError , result ] = await queryRuns ( { runId : run . id } ) ;
111+ if ( ! queryError && result ?. length === 1 ) break ;
112+ await setTimeout ( 200 ) ;
113+ } while ( Date . now ( ) < deadline ) ;
121114
122- await runsReplicationService . stop ( ) ;
115+ expect ( queryError ) . toBeNull ( ) ;
116+ expect ( result ?. length ) . toBe ( 1 ) ;
117+ expect ( result ?. [ 0 ] ) . toEqual (
118+ expect . objectContaining ( {
119+ run_id : run . id ,
120+ friendly_id : run . friendlyId ,
121+ task_identifier : "my-task" ,
122+ environment_id : runtimeEnvironment . id ,
123+ project_id : project . id ,
124+ organization_id : organization . id ,
125+ environment_type : "DEVELOPMENT" ,
126+ engine : "V2" ,
127+ } )
128+ ) ;
129+ } finally {
130+ await runsReplicationService . stop ( ) ;
131+ }
123132 }
124133 ) ;
125134} ) ;
0 commit comments