@@ -27,6 +27,10 @@ import { PodCleaner } from "./services/podCleaner.js";
2727import { FailedPodHandler } from "./services/failedPodHandler.js" ;
2828import { getWorkerToken } from "./workerToken.js" ;
2929import { OtlpTraceService } from "./services/otlpTraceService.js" ;
30+ import {
31+ WarmStartVerificationService ,
32+ type WarmStartTimings ,
33+ } from "./services/warmStartVerificationService.js" ;
3034import { extractTraceparent , getRestoreRunnerId } from "./util.js" ;
3135import { Redis } from "ioredis" ;
3236import { BackpressureMonitor } from "./backpressure/backpressureMonitor.js" ;
@@ -63,6 +67,7 @@ class ManagedSupervisor {
6367 private readonly logger = new SimpleStructuredLogger ( "managed-supervisor" ) ;
6468 private readonly resourceMonitor : ResourceMonitor ;
6569 private readonly checkpointClient ?: CheckpointClient ;
70+ private readonly warmStartVerifier ?: WarmStartVerificationService ;
6671
6772 private readonly podCleaner ?: PodCleaner ;
6873 private readonly failedPodHandler ?: FailedPodHandler ;
@@ -311,6 +316,19 @@ class ManagedSupervisor {
311316 } ) ;
312317 }
313318
319+ if ( env . TRIGGER_WARM_START_VERIFY_ENABLED && this . warmStartUrl ) {
320+ this . logger . log ( "Warm-start delivery verification enabled" , {
321+ delayMs : env . TRIGGER_WARM_START_VERIFY_DELAY_MS ,
322+ } ) ;
323+
324+ this . warmStartVerifier = new WarmStartVerificationService ( {
325+ workerClient : this . workerSession . httpClient ,
326+ delayMs : env . TRIGGER_WARM_START_VERIFY_DELAY_MS ,
327+ createWorkload : ( message , timings ) => this . createWorkload ( message , timings ) ,
328+ wideEventOpts : this . wideEventOpts ,
329+ } ) ;
330+ }
331+
314332 this . workerSession . on ( "runNotification" , async ( { time, run } ) => {
315333 this . logger . verbose ( "runNotification" , { time, run } ) ;
316334
@@ -467,66 +485,24 @@ class ManagedSupervisor {
467485 if ( didWarmStart ) {
468486 setExtra ( fromContext ( ) , "path_taken" , "warm_start" ) ;
469487 this . logger . debug ( "Warm start successful" , { runId : message . run . id } ) ;
470- return ;
471- }
472-
473- setExtra ( fromContext ( ) , "path_taken" , "cold_create" ) ;
474-
475- const createStart = performance . now ( ) ;
476- try {
477- if ( ! message . deployment . friendlyId ) {
478- // mostly a type guard, deployments always exists for deployed environments
479- // a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments.
480- throw new Error ( "Deployment is missing" ) ;
481- }
482-
483- await this . workloadManager . create ( {
484- dequeuedAt : message . dequeuedAt ,
488+ // A hit only means the response was written to the long-poll
489+ // socket, not that the runner received it. Schedule a delivery
490+ // verification that cold-starts the run if nobody acts on it.
491+ this . warmStartVerifier ?. schedule ( message , {
485492 dequeueResponseMs,
486493 pollingIntervalMs,
487494 warmStartCheckMs,
488- envId : message . environment . id ,
489- envType : message . environment . type ,
490- image : message . image ,
491- machine : message . run . machine ,
492- orgId : message . organization . id ,
493- projectId : message . project . id ,
494- deploymentFriendlyId : message . deployment . friendlyId ,
495- deploymentVersion : message . backgroundWorker . version ,
496- runId : message . run . id ,
497- runFriendlyId : message . run . friendlyId ,
498- version : message . version ,
499- nextAttemptNumber : message . run . attemptNumber ,
500- snapshotId : message . snapshot . id ,
501- snapshotFriendlyId : message . snapshot . friendlyId ,
502- placementTags : message . placementTags ,
503- traceContext : message . run . traceContext ,
504- annotations : message . run . annotations ,
505- hasPrivateLink : message . organization . hasPrivateLink ,
506495 } ) ;
507- recordPhaseSince ( "workload_create" , createStart , undefined ) ;
508- workloadCreateDuration . observe (
509- { backend : this . workloadManagerBackend , outcome : "success" } ,
510- ( performance . now ( ) - createStart ) / 1000
511- ) ;
512-
513- // Disabled for now
514- // this.resourceMonitor.blockResources({
515- // cpu: message.run.machine.cpu,
516- // memory: message.run.machine.memory,
517- // });
518- } catch ( error ) {
519- recordPhaseSince (
520- "workload_create" ,
521- createStart ,
522- error instanceof Error ? error : new Error ( String ( error ) )
523- ) ;
524- workloadCreateDuration . observe (
525- { backend : this . workloadManagerBackend , outcome : "error" } ,
526- ( performance . now ( ) - createStart ) / 1000
527- ) ;
528- this . logger . error ( "Failed to create workload" , { error } ) ;
496+ return ;
529497 }
498+
499+ setExtra ( fromContext ( ) , "path_taken" , "cold_create" ) ;
500+
501+ await this . createWorkload ( message , {
502+ dequeueResponseMs,
503+ pollingIntervalMs,
504+ warmStartCheckMs,
505+ } ) ;
530506 }
531507 ) ;
532508 }
@@ -561,6 +537,8 @@ class ManagedSupervisor {
561537
562538 async onRunConnected ( { run } : { run : { friendlyId : string } } ) {
563539 this . logger . debug ( "Run connected" , { run } ) ;
540+ // The dispatched run reached a runner on this node - no fallback needed.
541+ this . warmStartVerifier ?. cancel ( run . friendlyId ) ;
564542 this . workerSession . subscribeToRunNotifications ( [ run . friendlyId ] ) ;
565543 }
566544
@@ -569,6 +547,69 @@ class ManagedSupervisor {
569547 this . workerSession . unsubscribeFromRunNotifications ( [ run . friendlyId ] ) ;
570548 }
571549
550+ private async createWorkload ( message : DequeuedMessage , timings : WarmStartTimings ) {
551+ const createStart = performance . now ( ) ;
552+ try {
553+ if ( ! message . deployment . friendlyId ) {
554+ // mostly a type guard, deployments always exists for deployed environments
555+ // a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments.
556+ throw new Error ( "Deployment is missing" ) ;
557+ }
558+
559+ if ( ! message . image ) {
560+ // same type-guard situation as deployment above
561+ throw new Error ( "Image is missing" ) ;
562+ }
563+
564+ await this . workloadManager . create ( {
565+ dequeuedAt : message . dequeuedAt ,
566+ dequeueResponseMs : timings . dequeueResponseMs ,
567+ pollingIntervalMs : timings . pollingIntervalMs ,
568+ warmStartCheckMs : timings . warmStartCheckMs ,
569+ envId : message . environment . id ,
570+ envType : message . environment . type ,
571+ image : message . image ,
572+ machine : message . run . machine ,
573+ orgId : message . organization . id ,
574+ projectId : message . project . id ,
575+ deploymentFriendlyId : message . deployment . friendlyId ,
576+ deploymentVersion : message . backgroundWorker . version ,
577+ runId : message . run . id ,
578+ runFriendlyId : message . run . friendlyId ,
579+ version : message . version ,
580+ nextAttemptNumber : message . run . attemptNumber ,
581+ snapshotId : message . snapshot . id ,
582+ snapshotFriendlyId : message . snapshot . friendlyId ,
583+ placementTags : message . placementTags ,
584+ traceContext : message . run . traceContext ,
585+ annotations : message . run . annotations ,
586+ hasPrivateLink : message . organization . hasPrivateLink ,
587+ } ) ;
588+ recordPhaseSince ( "workload_create" , createStart , undefined ) ;
589+ workloadCreateDuration . observe (
590+ { backend : this . workloadManagerBackend , outcome : "success" } ,
591+ ( performance . now ( ) - createStart ) / 1000
592+ ) ;
593+
594+ // Disabled for now
595+ // this.resourceMonitor.blockResources({
596+ // cpu: message.run.machine.cpu,
597+ // memory: message.run.machine.memory,
598+ // });
599+ } catch ( error ) {
600+ recordPhaseSince (
601+ "workload_create" ,
602+ createStart ,
603+ error instanceof Error ? error : new Error ( String ( error ) )
604+ ) ;
605+ workloadCreateDuration . observe (
606+ { backend : this . workloadManagerBackend , outcome : "error" } ,
607+ ( performance . now ( ) - createStart ) / 1000
608+ ) ;
609+ this . logger . error ( "Failed to create workload" , { error } ) ;
610+ }
611+ }
612+
572613 private async tryWarmStart (
573614 dequeuedMessage : DequeuedMessage ,
574615 traceparent : string | undefined
@@ -652,6 +693,7 @@ class ManagedSupervisor {
652693 this . logger . log ( "Shutting down" ) ;
653694 await this . workloadServer . stop ( ) ;
654695 await this . workerSession . stop ( ) ;
696+ this . warmStartVerifier ?. stop ( ) ;
655697
656698 // Optional services
657699 this . backpressureMonitor ?. stop ( ) ;
0 commit comments