Skip to content

Commit fee349a

Browse files
committed
feat(webapp): add billing limit recovery and resolve flow
Recovery UI and durable resolve: cancel queued runs before unpausing, with reconciliation as a safety net.
1 parent 03fea35 commit fee349a

11 files changed

Lines changed: 720 additions & 0 deletions
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import { AnimatedCallout } from "~/components/primitives/AnimatedCallout";
2+
3+
export function BillingLimitResolveProgress({
4+
show,
5+
cancellingQueuedRuns,
6+
}: {
7+
show: boolean;
8+
cancellingQueuedRuns: boolean;
9+
}) {
10+
if (!show) {
11+
return null;
12+
}
13+
14+
return (
15+
<div className="space-y-3">
16+
<AnimatedCallout show variant="success">
17+
Billing limit resolved. Environments are being unpaused — this usually takes a few
18+
seconds.
19+
</AnimatedCallout>
20+
{cancellingQueuedRuns && (
21+
<AnimatedCallout show variant="info">
22+
Cancelling queued runs across billable environments…
23+
</AnimatedCallout>
24+
)}
25+
</div>
26+
);
27+
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
import { BulkActionId } from "@trigger.dev/core/v3/isomorphic";
2+
import {
3+
BulkActionNotificationType,
4+
BulkActionType,
5+
Prisma,
6+
type PrismaClient,
7+
type TaskRunStatus,
8+
} from "@trigger.dev/database";
9+
import { QUEUED_STATUSES, RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
10+
import { prisma } from "~/db.server";
11+
import type { RunsRepository } from "~/services/runsRepository/runsRepository.server";
12+
import { commonWorker } from "~/v3/commonWorker.server";
13+
import {
14+
countInProgressRunsForBillableEnvironment,
15+
countQueuedRunsForBillableEnvironment,
16+
createBillingLimitRunsRepository,
17+
getBillableEnvironmentsForBillingLimit,
18+
} from "./billingLimitQueuedRuns.server";
19+
20+
export const BILLING_LIMIT_RESOLVE_CANCEL_SOURCE = "billing_limit_resolve_new_only";
21+
export const BILLING_LIMIT_IN_PROGRESS_CANCEL_SOURCE = "billing_limit_in_progress";
22+
23+
type BulkCancelSource =
24+
| typeof BILLING_LIMIT_RESOLVE_CANCEL_SOURCE
25+
| typeof BILLING_LIMIT_IN_PROGRESS_CANCEL_SOURCE;
26+
27+
export type BillingLimitBulkCancelDeps = {
28+
prismaClient?: PrismaClient;
29+
createRunsRepository?: (organizationId: string) => Promise<RunsRepository>;
30+
enqueueProcessBulkAction?: (bulkActionId: string) => Promise<unknown>;
31+
};
32+
33+
function resolveBulkCancelDeps(deps?: BillingLimitBulkCancelDeps) {
34+
return {
35+
prismaClient: deps?.prismaClient ?? prisma,
36+
createRunsRepository: deps?.createRunsRepository ?? createBillingLimitRunsRepository,
37+
enqueueProcessBulkAction:
38+
deps?.enqueueProcessBulkAction ??
39+
(async (bulkActionId: string) => {
40+
await commonWorker.enqueue({
41+
id: `processBulkAction-${bulkActionId}`,
42+
job: "processBulkAction",
43+
payload: { bulkActionId },
44+
});
45+
}),
46+
};
47+
}
48+
49+
export class BillingLimitBulkCancelService {
50+
static async cancelQueuedRuns(
51+
organizationId: string,
52+
options?: { dedupeKey?: string },
53+
deps?: BillingLimitBulkCancelDeps
54+
): Promise<{ bulkActionIds: string[] }> {
55+
return this.cancelRunsForBillableEnvironments(
56+
organizationId,
57+
{
58+
source: BILLING_LIMIT_RESOLVE_CANCEL_SOURCE,
59+
statuses: [...QUEUED_STATUSES],
60+
name: "Billing limit resolve — cancel queued runs",
61+
countRuns: countQueuedRunsForBillableEnvironment,
62+
dedupeKey: options?.dedupeKey,
63+
},
64+
deps
65+
);
66+
}
67+
68+
static async cancelInProgressRuns(
69+
organizationId: string,
70+
options: { hitAt: string },
71+
deps?: BillingLimitBulkCancelDeps
72+
): Promise<{ bulkActionIds: string[] }> {
73+
return this.cancelRunsForBillableEnvironments(
74+
organizationId,
75+
{
76+
source: BILLING_LIMIT_IN_PROGRESS_CANCEL_SOURCE,
77+
statuses: [...RUNNING_STATUSES],
78+
name: "Billing limit hit — cancel in-progress runs",
79+
countRuns: countInProgressRunsForBillableEnvironment,
80+
dedupeKey: options.hitAt,
81+
},
82+
deps
83+
);
84+
}
85+
86+
private static async cancelRunsForBillableEnvironments(
87+
organizationId: string,
88+
options: {
89+
source: BulkCancelSource;
90+
statuses: TaskRunStatus[];
91+
name: string;
92+
countRuns: typeof countQueuedRunsForBillableEnvironment;
93+
dedupeKey?: string;
94+
},
95+
deps?: BillingLimitBulkCancelDeps
96+
): Promise<{ bulkActionIds: string[] }> {
97+
const { prismaClient, createRunsRepository, enqueueProcessBulkAction } =
98+
resolveBulkCancelDeps(deps);
99+
100+
const environments = await getBillableEnvironmentsForBillingLimit(
101+
organizationId,
102+
prismaClient
103+
);
104+
105+
if (environments.length === 0) {
106+
return { bulkActionIds: [] };
107+
}
108+
109+
const runsRepository = await createRunsRepository(organizationId);
110+
const bulkActionIds: string[] = [];
111+
112+
for (const environment of environments) {
113+
if (options.dedupeKey) {
114+
const existing = await prismaClient.bulkActionGroup.findFirst({
115+
where: {
116+
environmentId: environment.id,
117+
type: BulkActionType.CANCEL,
118+
AND: [
119+
{
120+
params: {
121+
path: ["source"],
122+
equals: options.source,
123+
},
124+
},
125+
{
126+
params: {
127+
path: ["dedupeKey"],
128+
equals: options.dedupeKey,
129+
},
130+
},
131+
],
132+
},
133+
select: { friendlyId: true },
134+
});
135+
136+
if (existing) {
137+
bulkActionIds.push(existing.friendlyId);
138+
continue;
139+
}
140+
}
141+
142+
const count = await options.countRuns(runsRepository, organizationId, environment);
143+
144+
if (count === 0) {
145+
continue;
146+
}
147+
148+
const { id, friendlyId } = BulkActionId.generate();
149+
150+
await prismaClient.bulkActionGroup.create({
151+
data: {
152+
id,
153+
friendlyId,
154+
projectId: environment.projectId,
155+
environmentId: environment.id,
156+
name: options.name,
157+
type: BulkActionType.CANCEL,
158+
params: {
159+
statuses: options.statuses,
160+
finalizeRun: true,
161+
source: options.source,
162+
...(options.dedupeKey ? { dedupeKey: options.dedupeKey } : {}),
163+
} as Prisma.InputJsonValue,
164+
queryName: "bulk_action_v1",
165+
totalCount: count,
166+
completionNotification: BulkActionNotificationType.NONE,
167+
},
168+
});
169+
170+
await enqueueProcessBulkAction(id);
171+
172+
bulkActionIds.push(friendlyId);
173+
}
174+
175+
return { bulkActionIds };
176+
}
177+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import {
2+
bustBillingLimitCaches,
3+
} from "~/services/platform.v3.server";
4+
import { logger } from "~/services/logger.server";
5+
import { BillingLimitBulkCancelService } from "./BillingLimitBulkCancelService.server";
6+
import { buildBillingLimitResolveDedupeKey } from "./billingLimitConstants";
7+
import { convergeBillingLimitEnvironmentsForOrg } from "./billingLimitConvergeEnvironments.server";
8+
import type { PendingBillingLimitResolve } from "./billingLimitPendingResolve.types";
9+
10+
export type { PendingBillingLimitResolve } from "./billingLimitPendingResolve.types";
11+
12+
export async function convergeBillingLimitResolve(
13+
pending: PendingBillingLimitResolve
14+
): Promise<void> {
15+
const { organizationId, resumeMode, resolvedAt } = pending;
16+
17+
bustBillingLimitCaches(organizationId);
18+
19+
if (resumeMode === "new_only") {
20+
await BillingLimitBulkCancelService.cancelQueuedRuns(organizationId, {
21+
dedupeKey: buildBillingLimitResolveDedupeKey(organizationId, resolvedAt),
22+
});
23+
}
24+
25+
await convergeBillingLimitEnvironmentsForOrg(organizationId, "ok");
26+
27+
logger.info("Converged billing limit resolve", {
28+
organizationId,
29+
resumeMode,
30+
resolvedAt,
31+
});
32+
}
33+
34+
export { runPendingBillingLimitResolves } from "./billingLimitPendingResolveCoordinator.server";
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export type PendingBillingLimitResolve = {
2+
organizationId: string;
3+
resumeMode: "queue" | "new_only";
4+
resolvedAt: string;
5+
};
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import { logger } from "~/services/logger.server";
2+
import { classifyPendingBillingLimitResolveConvergeFailure } from "./billingLimitPendingResolveFailure.server";
3+
import type { PendingBillingLimitResolve } from "./billingLimitPendingResolve.types";
4+
5+
export type RunPendingBillingLimitResolveDeps = {
6+
converge?: (pending: PendingBillingLimitResolve) => Promise<void>;
7+
complete?: (organizationId: string) => Promise<{ completed: boolean } | undefined>;
8+
};
9+
10+
export async function runPendingBillingLimitResolves(
11+
pendingResolves: PendingBillingLimitResolve[],
12+
deps: RunPendingBillingLimitResolveDeps = {}
13+
): Promise<Set<string>> {
14+
const converge =
15+
deps.converge ??
16+
(await import("./billingLimitConvergeResolve.server")).convergeBillingLimitResolve;
17+
const complete =
18+
deps.complete ??
19+
(await import("~/services/platform.v3.server")).completeBillingLimitResolve;
20+
21+
const stillPendingOrgIds = new Set<string>();
22+
23+
for (const pending of pendingResolves) {
24+
try {
25+
await converge(pending);
26+
} catch (error) {
27+
logger.error("Failed to converge pending billing limit resolve", {
28+
failureClass: classifyPendingBillingLimitResolveConvergeFailure(pending.resumeMode),
29+
error,
30+
organizationId: pending.organizationId,
31+
resumeMode: pending.resumeMode,
32+
resolvedAt: pending.resolvedAt,
33+
});
34+
stillPendingOrgIds.add(pending.organizationId);
35+
continue;
36+
}
37+
38+
try {
39+
const completion = await complete(pending.organizationId);
40+
if (!completion) {
41+
throw new Error("Billing platform client unavailable");
42+
}
43+
} catch (error) {
44+
logger.error("Failed to ack pending billing limit resolve", {
45+
failureClass: "ack-only",
46+
error,
47+
organizationId: pending.organizationId,
48+
resumeMode: pending.resumeMode,
49+
resolvedAt: pending.resolvedAt,
50+
});
51+
stillPendingOrgIds.add(pending.organizationId);
52+
}
53+
}
54+
55+
return stillPendingOrgIds;
56+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
export type PendingBillingLimitResolveFailureClass =
2+
| "cancel-failing"
3+
| "converge-failing"
4+
| "ack-only";
5+
6+
/** Used in converge logs to classify stuck pending resolves. */
7+
export function classifyPendingBillingLimitResolveConvergeFailure(
8+
resumeMode: "queue" | "new_only"
9+
): Exclude<PendingBillingLimitResolveFailureClass, "ack-only"> {
10+
return resumeMode === "new_only" ? "cancel-failing" : "converge-failing";
11+
}

0 commit comments

Comments
 (0)