Skip to content

Commit adbc3f7

Browse files
committed
add max concurrency to bulk replays
1 parent 29c6e8a commit adbc3f7

4 files changed

Lines changed: 33 additions & 0 deletions

File tree

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1939,6 +1939,8 @@ const EnvironmentSchema = z
19391939
BULK_ACTION_BATCH_SIZE: z.coerce.number().int().default(100),
19401940
BULK_ACTION_BATCH_DELAY_MS: z.coerce.number().int().default(200),
19411941
BULK_ACTION_SUBBATCH_CONCURRENCY: z.coerce.number().int().default(5),
1942+
/// Max number of concurrent in-flight (PENDING) bulk replays per environment.
1943+
BULK_ACTION_MAX_CONCURRENT_REPLAYS: z.coerce.number().int().default(3),
19421944

19431945
// AI Run Filter
19441946
AI_RUN_FILTER_MODEL: z.string().optional(),

apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,27 @@ export class BulkActionService extends BaseService {
5353
const { organizationId, projectId, environmentId, userId } = input;
5454
const filters = freezeRunListFilters(input.filters);
5555

56+
// Concurrency guard for replays
57+
// The count is backed by the (environmentId, status, type) index, so it only
58+
// touches this env's PENDING replays and stays cheap.
59+
if (input.action === "replay") {
60+
const maxConcurrentReplays = env.BULK_ACTION_MAX_CONCURRENT_REPLAYS;
61+
const inFlightReplays = await this._replica.bulkActionGroup.count({
62+
where: {
63+
environmentId,
64+
type: BulkActionType.REPLAY,
65+
status: BulkActionStatus.PENDING,
66+
},
67+
});
68+
69+
if (inFlightReplays >= maxConcurrentReplays) {
70+
throw new ServiceValidationError(
71+
`You can only run ${maxConcurrentReplays} bulk replays at a time in this environment. Wait for an in-progress replay to finish before starting another.`,
72+
429
73+
);
74+
}
75+
}
76+
5677
// Region is a replay-only override that re-routes the replayed runs. It's
5778
// stored alongside the run-list filters under a dedicated key so it isn't
5879
// mistaken for a `regions` selection filter when the params are parsed.
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- Backs the per-environment concurrent-replay limit: count of PENDING REPLAY groups.
2+
-- Not partial (e.g. WHERE status = 'PENDING' AND type = 'REPLAY') as wouldn't be used
3+
-- with the bind params from prisma.
4+
CREATE INDEX CONCURRENTLY IF NOT EXISTS "BulkActionGroup_environmentId_status_type_idx"
5+
ON "BulkActionGroup" ("environmentId", "status", "type");

internal-packages/database/prisma/schema.prisma

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2530,6 +2530,11 @@ model BulkActionGroup {
25302530
// the INCLUDE/fillfactor indexes elsewhere in this schema). `migrate dev` will report
25312531
// drift here; do NOT accept its drop/recreate — keep the hand-written migration.
25322532
@@index([environmentId, type, dedupeKey])
2533+
// Backs the per-environment concurrent-replay limit (count of PENDING REPLAY groups).
2534+
// Plain composite (not partial) so Prisma's parameterized count reliably uses it; the
2535+
// migration creates it CONCURRENTLY to avoid locking writes. See migration
2536+
// 20260702120000_bulk_action_group_pending_replay_index.
2537+
@@index([environmentId, status, type])
25332538
}
25342539

25352540
enum BulkActionType {

0 commit comments

Comments
 (0)