Skip to content

Commit d521260

Browse files
committed
feat(webapp): pause environments when a billing limit is reached
Converge billable environments to paused via webhook and a reconciliation worker; block manual resume.
1 parent 8b3da21 commit d521260

21 files changed

Lines changed: 886 additions & 6 deletions

apps/webapp/app/entry.server.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { PassThrough } from "stream";
1010
import * as Worker from "~/services/worker.server";
1111
import { initMollifierDrainerWorker } from "~/v3/mollifierDrainerWorker.server";
1212
import { initMollifierStaleSweepWorker } from "~/v3/mollifierStaleSweepWorker.server";
13+
import "~/v3/billingLimitWorker.server";
1314
import { bootstrap } from "./bootstrap";
1415
import { LocaleContextProvider } from "./components/primitives/LocaleProvider";
1516
import {

apps/webapp/app/env.server.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1471,6 +1471,37 @@ const EnvironmentSchema = z
14711471
ALERTS_WORKER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
14721472
ALERTS_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
14731473

1474+
BILLING_LIMIT_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
1475+
BILLING_LIMIT_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
1476+
BILLING_LIMIT_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(10),
1477+
BILLING_LIMIT_WORKER_POLL_INTERVAL: z.coerce.number().int().default(1000),
1478+
BILLING_LIMIT_WORKER_IMMEDIATE_POLL_INTERVAL: z.coerce.number().int().default(50),
1479+
BILLING_LIMIT_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(20),
1480+
BILLING_LIMIT_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
1481+
BILLING_LIMIT_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
1482+
BILLING_LIMIT_RECONCILE_INTERVAL_MS: z.coerce.number().int().default(90_000),
1483+
BILLING_LIMIT_WORKER_REDIS_HOST: z
1484+
.string()
1485+
.optional()
1486+
.transform((v) => v ?? process.env.REDIS_HOST),
1487+
BILLING_LIMIT_WORKER_REDIS_PORT: z.coerce
1488+
.number()
1489+
.optional()
1490+
.transform(
1491+
(v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)
1492+
),
1493+
BILLING_LIMIT_WORKER_REDIS_USERNAME: z
1494+
.string()
1495+
.optional()
1496+
.transform((v) => v ?? process.env.REDIS_USERNAME),
1497+
BILLING_LIMIT_WORKER_REDIS_PASSWORD: z
1498+
.string()
1499+
.optional()
1500+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
1501+
BILLING_LIMIT_WORKER_REDIS_TLS_DISABLED: z
1502+
.string()
1503+
.default(process.env.REDIS_TLS_DISABLED ?? "false"),
1504+
14741505
SCHEDULE_ENGINE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
14751506
SCHEDULE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
14761507
SCHEDULE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),

apps/webapp/app/models/organization.server.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ import { featuresForUrl } from "~/features.server";
1515
import { createApiKeyForEnv, createPkApiKeyForEnv, envSlug } from "./api-key.server";
1616
import { getDefaultEnvironmentConcurrencyLimit } from "~/services/platform.v3.server";
1717
import { enqueueAttioWorkspaceSync } from "~/services/attio.server";
18+
import {
19+
applyBillingLimitPauseAfterEnvCreate,
20+
getInitialEnvPauseStateForBillingLimit,
21+
} from "~/v3/services/billingLimit/getInitialEnvPauseStateForBillingLimit.server";
1822
export type { Organization };
1923

2024
const nanoid = customAlphabet("1234567890abcdef", 4);
@@ -139,15 +143,18 @@ export async function createEnvironment({
139143
const shortcode = createShortcode().join("-");
140144

141145
const limit = await getDefaultEnvironmentConcurrencyLimit(organization.id, type);
146+
const billingPause = await getInitialEnvPauseStateForBillingLimit(organization.id, type);
142147

143-
return await prismaClient.runtimeEnvironment.create({
148+
const environment = await prismaClient.runtimeEnvironment.create({
144149
data: {
145150
slug,
146151
apiKey,
147152
pkApiKey,
148153
shortcode,
149154
autoEnableInternalSources: type !== "DEVELOPMENT",
150155
maximumConcurrencyLimit: limit,
156+
paused: billingPause.paused,
157+
pauseSource: billingPause.pauseSource,
151158
organization: {
152159
connect: {
153160
id: organization.id,
@@ -162,7 +169,15 @@ export async function createEnvironment({
162169
type,
163170
isBranchableEnvironment,
164171
},
172+
include: {
173+
organization: true,
174+
project: true,
175+
},
165176
});
177+
178+
await applyBillingLimitPauseAfterEnvCreate(environment);
179+
180+
return environment;
166181
}
167182

168183
function createShortcode() {

apps/webapp/app/presenters/OrganizationsPresenter.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ export class OrganizationsPresenter {
7979
type: true,
8080
slug: true,
8181
paused: true,
82+
pauseSource: true,
8283
isBranchableEnvironment: true,
8384
branchName: true,
8485
parentEnvironmentId: true,
@@ -207,6 +208,7 @@ export class OrganizationsPresenter {
207208
| "type"
208209
| "branchName"
209210
| "paused"
211+
| "pauseSource"
210212
| "parentEnvironmentId"
211213
| "isBranchableEnvironment"
212214
| "archivedAt"

apps/webapp/app/presenters/v3/EnvironmentVariablesPresenter.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { $replica, PrismaClient, PrismaReplicaClient, prisma } from "~/db.server";
2-
import { Project } from "~/models/project.server";
3-
import { User } from "~/models/user.server";
2+
import type { Project } from "~/models/project.server";
3+
import type { User } from "~/models/user.server";
44
import { EnvironmentVariablesRepository } from "~/v3/environmentVariables/environmentVariablesRepository.server";
55
import type { EnvironmentVariableUpdater } from "~/v3/environmentVariables/repository";
66
import {

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
import { DialogClose } from "@radix-ui/react-dialog";
1111
import { Form, useNavigation, useSearchParams, type MetaFunction } from "@remix-run/react";
1212
import { type ActionFunctionArgs, type LoaderFunctionArgs } from "@remix-run/server-runtime";
13+
import { EnvironmentPauseSource } from "@trigger.dev/database";
1314
import type { RuntimeEnvironmentType } from "@trigger.dev/database";
1415
import type { QueueItem } from "@trigger.dev/core/v3/schemas";
1516
import { useEffect, useState } from "react";
@@ -184,11 +185,21 @@ export const action = async ({ request, params }: ActionFunctionArgs) => {
184185
switch (action) {
185186
case "environment-pause":
186187
const pauseService = new PauseEnvironmentService();
187-
await pauseService.call(environment, "paused");
188+
{
189+
const result = await pauseService.call(environment, "paused");
190+
if (!result.success) {
191+
return redirectWithErrorMessage(redirectPath, request, result.error);
192+
}
193+
}
188194
return redirectWithSuccessMessage(redirectPath, request, "Environment paused");
189195
case "environment-resume":
190196
const resumeService = new PauseEnvironmentService();
191-
await resumeService.call(environment, "resumed");
197+
{
198+
const result = await resumeService.call(environment, "resumed");
199+
if (!result.success) {
200+
return redirectWithErrorMessage(redirectPath, request, result.error);
201+
}
202+
}
192203
return redirectWithSuccessMessage(redirectPath, request, "Environment resumed");
193204
case "queue-pause":
194205
case "queue-resume": {
@@ -346,7 +357,9 @@ export default function Page() {
346357
animate
347358
accessory={
348359
<div className="flex items-start gap-1">
349-
{environment.runsEnabled ? <EnvironmentPauseResumeButton env={env} /> : null}
360+
{environment.runsEnabled && env.pauseSource !== EnvironmentPauseSource.BILLING_LIMIT ? (
361+
<EnvironmentPauseResumeButton env={env} />
362+
) : null}
350363
<LinkButton
351364
variant="secondary/small"
352365
LeadingIcon={RunsIcon}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { prisma } from "~/db.server";
4+
import { BillingLimitHitWebhookBodySchema } from "~/services/billingLimit.schemas";
5+
import { requireAdminApiRequest } from "~/services/personalAccessToken.server";
6+
import { bustBillingLimitCaches } from "~/services/platform.v3.server";
7+
import {
8+
enqueueBillingLimitCancelInProgressRuns,
9+
enqueueBillingLimitConverge,
10+
} from "~/v3/billingLimitWorker.server";
11+
import { BillingLimitConvergeEnvironmentsService } from "~/v3/services/billingLimit/billingLimitConvergeEnvironmentsService.server";
12+
import { processBillingLimitHit } from "~/v3/services/billingLimit/billingLimitHit.server";
13+
14+
const ParamsSchema = z.object({
15+
organizationId: z.string(),
16+
});
17+
18+
/** Billing platform webhook: org entered billing limit grace. Idempotent — returns 202. */
19+
export async function action({ request, params }: ActionFunctionArgs) {
20+
await requireAdminApiRequest(request);
21+
22+
if (request.method.toLowerCase() !== "post") {
23+
return json({ error: "Method not allowed" }, { status: 405 });
24+
}
25+
26+
const { organizationId } = ParamsSchema.parse(params);
27+
const body = BillingLimitHitWebhookBodySchema.parse(await request.json());
28+
29+
const organization = await prisma.organization.findFirst({
30+
where: { id: organizationId },
31+
select: { id: true },
32+
});
33+
34+
if (!organization) {
35+
return json({ error: "Organization not found" }, { status: 404 });
36+
}
37+
38+
await processBillingLimitHit(
39+
{
40+
organizationId,
41+
hitAt: body.hitAt,
42+
cancelInProgressRuns: body.cancelInProgressRuns,
43+
},
44+
{
45+
bustCaches: bustBillingLimitCaches,
46+
seedReconcileQueue: BillingLimitConvergeEnvironmentsService.seedReconcileQueue,
47+
enqueueConverge: enqueueBillingLimitConverge,
48+
enqueueCancelInProgressRuns: enqueueBillingLimitCancelInProgressRuns,
49+
}
50+
);
51+
52+
return json({ success: true, accepted: true }, { status: 202 });
53+
}

apps/webapp/app/services/upsertBranch.server.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ import { type CreateBranchOptions } from "~/routes/_app.orgs.$organizationSlug.p
66
import { isValidGitBranchName, sanitizeBranchName } from "@trigger.dev/core/v3/utils/gitBranch";
77
import { logger } from "./logger.server";
88
import { getCurrentPlan, getLimit } from "./platform.v3.server";
9+
import {
10+
applyBillingLimitPauseAfterEnvCreate,
11+
getInitialEnvPauseStateForBillingLimit,
12+
} from "~/v3/services/billingLimit/getInitialEnvPauseStateForBillingLimit.server";
913

1014
export class UpsertBranchService {
1115
#prismaClient: PrismaClient;
@@ -103,6 +107,10 @@ export class UpsertBranchService {
103107
const apiKey = createApiKeyForEnv(parentEnvironment.type);
104108
const pkApiKey = createPkApiKeyForEnv(parentEnvironment.type);
105109
const shortcode = branchSlug;
110+
const billingPause = await getInitialEnvPauseStateForBillingLimit(
111+
parentEnvironment.organization.id,
112+
parentEnvironment.type
113+
);
106114

107115
const now = new Date();
108116

@@ -119,6 +127,8 @@ export class UpsertBranchService {
119127
pkApiKey,
120128
shortcode,
121129
maximumConcurrencyLimit: parentEnvironment.maximumConcurrencyLimit,
130+
paused: billingPause.paused,
131+
pauseSource: billingPause.pauseSource,
122132
organization: {
123133
connect: {
124134
id: parentEnvironment.organization.id,
@@ -137,10 +147,18 @@ export class UpsertBranchService {
137147
update: {
138148
git: git ?? undefined,
139149
},
150+
include: {
151+
organization: true,
152+
project: true,
153+
},
140154
});
141155

142156
const alreadyExisted = branch.createdAt < now;
143157

158+
if (!alreadyExisted) {
159+
await applyBillingLimitPauseAfterEnvCreate(branch);
160+
}
161+
144162
return {
145163
success: true as const,
146164
alreadyExisted: alreadyExisted,
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import { Logger } from "@trigger.dev/core/logger";
2+
import { Worker as RedisWorker } from "@trigger.dev/redis-worker";
3+
import { z } from "zod";
4+
import { env } from "~/env.server";
5+
import { logger } from "~/services/logger.server";
6+
import { singleton } from "~/utils/singleton";
7+
import { BillingLimitConvergeEnvironmentsService } from "./services/billingLimit/billingLimitConvergeEnvironmentsService.server";
8+
import type { BillingLimitConvergeTargetState } from "./services/billingLimit/billingLimitConstants";
9+
import { buildBillingLimitInProgressCancelJobId } from "./services/billingLimit/billingLimitConstants";
10+
import { runBillingLimitCancelInProgressRuns } from "./services/billingLimit/billingLimitCancelInProgressRuns.server";
11+
12+
function initializeWorker() {
13+
const redisOptions = {
14+
keyPrefix: "billing-limit:worker:",
15+
host: env.BILLING_LIMIT_WORKER_REDIS_HOST,
16+
port: env.BILLING_LIMIT_WORKER_REDIS_PORT,
17+
username: env.BILLING_LIMIT_WORKER_REDIS_USERNAME,
18+
password: env.BILLING_LIMIT_WORKER_REDIS_PASSWORD,
19+
enableAutoPipelining: true,
20+
...(env.BILLING_LIMIT_WORKER_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
21+
};
22+
23+
logger.debug(`👨‍🏭 Initializing billing limit worker at host ${env.BILLING_LIMIT_WORKER_REDIS_HOST}`);
24+
25+
const worker = new RedisWorker({
26+
name: "billing-limit-worker",
27+
redisOptions,
28+
catalog: {
29+
"billingLimit.convergeEnvironments": {
30+
schema: z.object({
31+
organizationId: z.string(),
32+
targetState: z.enum(["grace", "rejected", "ok"]),
33+
}),
34+
visibilityTimeoutMs: 60_000 * 10,
35+
retry: {
36+
maxAttempts: 5,
37+
},
38+
},
39+
"billingLimit.reconcileTick": {
40+
schema: z.object({}),
41+
visibilityTimeoutMs: 60_000 * 5,
42+
retry: {
43+
maxAttempts: 3,
44+
},
45+
},
46+
"billingLimit.cancelInProgressRuns": {
47+
schema: z.object({
48+
organizationId: z.string(),
49+
hitAt: z.string(),
50+
}),
51+
visibilityTimeoutMs: 60_000 * 10,
52+
retry: {
53+
maxAttempts: 5,
54+
},
55+
},
56+
},
57+
concurrency: {
58+
workers: env.BILLING_LIMIT_WORKER_CONCURRENCY_WORKERS,
59+
tasksPerWorker: env.BILLING_LIMIT_WORKER_CONCURRENCY_TASKS_PER_WORKER,
60+
limit: env.BILLING_LIMIT_WORKER_CONCURRENCY_LIMIT,
61+
},
62+
pollIntervalMs: env.BILLING_LIMIT_WORKER_POLL_INTERVAL,
63+
immediatePollIntervalMs: env.BILLING_LIMIT_WORKER_IMMEDIATE_POLL_INTERVAL,
64+
shutdownTimeoutMs: env.BILLING_LIMIT_WORKER_SHUTDOWN_TIMEOUT_MS,
65+
logger: new Logger("BillingLimitWorker", env.BILLING_LIMIT_WORKER_LOG_LEVEL),
66+
jobs: {
67+
"billingLimit.convergeEnvironments": async ({ payload }) => {
68+
await BillingLimitConvergeEnvironmentsService.runConverge(payload);
69+
},
70+
"billingLimit.reconcileTick": async () => {
71+
await BillingLimitConvergeEnvironmentsService.runReconcileTick();
72+
await scheduleBillingLimitReconcileTick(worker);
73+
},
74+
"billingLimit.cancelInProgressRuns": async ({ payload }) => {
75+
await runBillingLimitCancelInProgressRuns(payload.organizationId, payload.hitAt);
76+
},
77+
},
78+
});
79+
80+
if (env.BILLING_LIMIT_WORKER_ENABLED === "true") {
81+
logger.debug(
82+
`👨‍🏭 Starting billing limit worker at host ${env.BILLING_LIMIT_WORKER_REDIS_HOST}, reconcileIntervalMs = ${env.BILLING_LIMIT_RECONCILE_INTERVAL_MS}`
83+
);
84+
worker.start();
85+
void scheduleBillingLimitReconcileTick(worker);
86+
}
87+
88+
return worker;
89+
}
90+
91+
async function scheduleBillingLimitReconcileTick(worker: ReturnType<typeof initializeWorker>) {
92+
await worker.enqueue({
93+
id: "billingLimit.reconcileTick",
94+
job: "billingLimit.reconcileTick",
95+
payload: {},
96+
availableAt: new Date(Date.now() + env.BILLING_LIMIT_RECONCILE_INTERVAL_MS),
97+
});
98+
}
99+
100+
export const billingLimitWorker = singleton("billingLimitWorker", initializeWorker);
101+
102+
export async function enqueueBillingLimitConverge(
103+
organizationId: string,
104+
targetState: BillingLimitConvergeTargetState
105+
) {
106+
return billingLimitWorker.enqueue({
107+
id: `billingLimit.converge:${organizationId}:${targetState}`,
108+
job: "billingLimit.convergeEnvironments",
109+
payload: { organizationId, targetState },
110+
});
111+
}
112+
113+
export async function enqueueBillingLimitCancelInProgressRuns(
114+
organizationId: string,
115+
hitAt: string
116+
) {
117+
return billingLimitWorker.enqueue({
118+
id: buildBillingLimitInProgressCancelJobId(organizationId, hitAt),
119+
job: "billingLimit.cancelInProgressRuns",
120+
payload: { organizationId, hitAt },
121+
});
122+
}

0 commit comments

Comments
 (0)