diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index bf6b2d4e3..7ce852ced 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -22,14 +22,32 @@ import type { IIdentifyPayload, IIncrementPayload, IReplayPayload, + ITrackBatchHandlerPayload, ITrackHandlerPayload, ITrackPayload, } from '@openpanel/validation'; +import { zTrackHandlerPayload } from '@openpanel/validation'; import type { FastifyReply, FastifyRequest } from 'fastify'; import { assocPath, pathOr, pick } from 'ramda'; import { HttpError } from '@/utils/errors'; import { getDeviceId } from '@/utils/ids'; +type Salts = Awaited>; + +/** + * Per-request data that is identical for every event in a batch. + * Computed once in the batch handler so we don't re-fetch salts/geo + * or re-parse headers N times. + */ +interface SharedRequestContext { + projectId: string; + requestIp: string; + requestUa: string; + requestHeaders: Record; + requestGeo: GeoLocation; + salts: Salts; +} + export function getStringHeaders(headers: FastifyRequest['headers']) { return Object.entries( pick( @@ -122,28 +140,65 @@ interface TrackContext { geo: GeoLocation; } -async function buildContext( - request: FastifyRequest<{ - Body: ITrackHandlerPayload; - }>, - validatedBody: ITrackHandlerPayload -): Promise { +/** + * Build the per-request shared context. Done once per HTTP request — for + * single-event /track this is just an extra struct; for /track/batch it + * lets N events share one salts + one geo lookup. + */ +async function buildSharedRequestContext( + request: FastifyRequest, +): Promise { const projectId = request.client?.projectId; if (!projectId) { throw new HttpError('Missing projectId', { status: 400 }); } - const timestamp = getTimestamp(request.timestamp, validatedBody.payload); - const ip = + const requestIp = request.clientIp; + const requestUa = request.headers['user-agent'] ?? 'unknown/1.0'; + const requestHeaders = getStringHeaders(request.headers); + + const [requestGeo, salts] = await Promise.all([ + getGeoLocation(requestIp), + getSalts(), + ]); + + return { + projectId, + requestIp, + requestUa, + requestHeaders, + requestGeo, + salts, + }; +} + +/** + * Build a per-event TrackContext from already-fetched shared data. + * Per-event work: timestamp, identity, ip override, deviceId, and a + * second geo lookup *only* if the event overrides __ip. + */ +async function buildEventContext( + shared: SharedRequestContext, + requestTimestamp: FastifyRequest['timestamp'], + validatedBody: ITrackHandlerPayload, +): Promise { + const timestamp = getTimestamp(requestTimestamp, validatedBody.payload); + + const overrideIp = validatedBody.type === 'track' && validatedBody.payload.properties?.__ip ? (validatedBody.payload.properties.__ip as string) - : request.clientIp; - const ua = request.headers['user-agent'] ?? 'unknown/1.0'; + : undefined; + const ip = overrideIp ?? shared.requestIp; + + // Only re-fetch geo when the event overrode the IP — the common case + // (browser SDK, no __ip) reuses the request-level geo computed once. + const geo = + overrideIp && overrideIp !== shared.requestIp + ? await getGeoLocation(overrideIp) + : shared.requestGeo; - const headers = getStringHeaders(request.headers); const identity = getIdentity(validatedBody); const profileId = identity?.profileId; - if (profileId && validatedBody.type === 'track') { validatedBody.payload.profileId = profileId; } @@ -154,22 +209,19 @@ async function buildContext( ? validatedBody.payload?.properties.__deviceId : undefined; - // Get geo location (needed for track and identify) - const [geo, salts] = await Promise.all([getGeoLocation(ip), getSalts()]); - const deviceIdResult = await getDeviceId({ - projectId, + projectId: shared.projectId, ip, - ua, - salts, + ua: shared.requestUa, + salts: shared.salts, overrideDeviceId, }); return { - projectId, + projectId: shared.projectId, ip, - ua, - headers, + ua: shared.requestUa, + headers: shared.requestHeaders, timestamp: { value: timestamp.timestamp, isFromPast: timestamp.isTimestampFromThePast, @@ -181,6 +233,14 @@ async function buildContext( }; } +async function buildContext( + request: FastifyRequest, + validatedBody: ITrackHandlerPayload, +): Promise { + const shared = await buildSharedRequestContext(request); + return buildEventContext(shared, request.timestamp, validatedBody); +} + async function handleTrack( payload: ITrackPayload, context: TrackContext @@ -361,15 +421,67 @@ async function handleAssignGroup( }); } +/** + * Dispatch a validated event to the matching per-type handler. Shared by + * /track and /track/batch. Throws HttpError(400) for the unsupported `alias` + * type so single-event and batch can both surface it consistently. + */ +async function dispatchEvent( + body: ITrackHandlerPayload, + context: TrackContext, +): Promise { + switch (body.type) { + case 'alias': + throw new HttpError('Alias is not supported', { status: 400 }); + case 'track': + await handleTrack(body.payload, context); + return; + case 'identify': + await handleIdentify(body.payload, context); + return; + case 'increment': + await handleIncrement(body.payload, context); + return; + case 'decrement': + await handleDecrement(body.payload, context); + return; + case 'replay': + await handleReplay(body.payload, context); + return; + case 'group': + await handleGroup(body.payload, context); + return; + case 'assign_group': + await handleAssignGroup(body.payload, context); + return; + default: { + // Exhaustiveness check: `body` narrows to `never` when every variant + // of ITrackHandlerPayload['type'] is handled. Adding a new variant + // makes this assignment fail to compile. + const exhaustive: never = body; + throw new HttpError(`Unhandled event type: ${exhaustive}`, { + status: 400, + }); + } + } +} + export async function handler( request: FastifyRequest<{ - Body: ITrackHandlerPayload; + Body: ITrackHandlerPayload | ITrackBatchHandlerPayload; }>, - reply: FastifyReply + reply: FastifyReply, ) { const validatedBody = request.body; - // Handle alias (not supported) + // Batch envelope: `{ type: 'batch', payload: [event, ...] }` — fan each + // item through the same per-event pipeline as a single-event request. + if (validatedBody.type === 'batch') { + return handleBatch(request, reply, validatedBody.payload); + } + + // Reject `alias` before building context — saves the salts/geo/deviceId work + // for a request that's going to fail anyway. if (validatedBody.type === 'alias') { return reply.status(400).send({ status: 400, @@ -378,39 +490,8 @@ export async function handler( }); } - // Build request context const context = await buildContext(request, validatedBody); - - // Dispatch to appropriate handler - switch (validatedBody.type) { - case 'track': - await handleTrack(validatedBody.payload, context); - break; - case 'identify': - await handleIdentify(validatedBody.payload, context); - break; - case 'increment': - await handleIncrement(validatedBody.payload, context); - break; - case 'decrement': - await handleDecrement(validatedBody.payload, context); - break; - case 'replay': - await handleReplay(validatedBody.payload, context); - break; - case 'group': - await handleGroup(validatedBody.payload, context); - break; - case 'assign_group': - await handleAssignGroup(validatedBody.payload, context); - break; - default: - return reply.status(400).send({ - status: 400, - error: 'Bad Request', - message: 'Invalid type', - }); - } + await dispatchEvent(validatedBody, context); reply.status(200).send({ deviceId: context.deviceId, @@ -418,6 +499,114 @@ export async function handler( }); } +type BatchItemResult = + | { index: number; status: 'accepted' } + | { + index: number; + status: 'rejected'; + reason: 'validation' | 'internal'; + error: string; + }; + +/** + * `POST /track` with `{ type: 'batch', payload: [...] }` — accepts up to + * TRACK_BATCH_MAX_EVENTS payloads in one request and dispatches each through + * the same per-event pipeline as a single-event request. + * + * Per-event validation failures do NOT fail the whole batch: the response is + * always 202 (assuming envelope + auth pass) with `{ accepted, rejected[] }` + * so callers can fix and retry just the bad indices. + * + * Optimization: salts + request-IP geo are fetched once and shared across + * all events. Events that override `__ip` still get their own geo lookup. + */ +// Bounded concurrency for per-event processing inside a batch. Each event +// hits Redis (session lookup + groupmq queue add) and may trigger a geo +// lookup if it overrides `__ip`, so an unbounded `Promise.all` over 2000 +// events can spike Redis pool usage and the geo provider's rate budget on +// smaller self-hosted instances. 50 keeps the pipeline saturated without +// turning a single big batch into a thundering herd. +const BATCH_CONCURRENCY = 50; + +async function handleBatch( + request: FastifyRequest<{ + Body: ITrackHandlerPayload | ITrackBatchHandlerPayload; + }>, + reply: FastifyReply, + events: unknown[], +) { + const shared = await buildSharedRequestContext(request); + + const processOne = async ( + raw: unknown, + index: number, + ): Promise => { + const parsed = zTrackHandlerPayload.safeParse(raw); + if (!parsed.success) { + const issue = parsed.error.issues[0]; + const path = issue?.path?.join('.') ?? ''; + const error = path + ? `${path}: ${issue?.message}` + : issue?.message ?? 'invalid payload'; + return { index, status: 'rejected', reason: 'validation', error }; + } + + try { + const context = await buildEventContext( + shared, + request.timestamp, + parsed.data, + ); + await dispatchEvent(parsed.data, context); + return { index, status: 'accepted' }; + } catch (err) { + // HttpError with 4xx → caller's fault (validation-style: alias, + // unknown type, replay without session). Anything else → ours. + const isClientError = + err instanceof HttpError && err.status >= 400 && err.status < 500; + const reason: 'validation' | 'internal' = isClientError + ? 'validation' + : 'internal'; + const message = err instanceof Error ? err.message : 'unknown error'; + if (!isClientError) { + request.log.error( + { err, index }, + 'Batch event dispatch failed', + ); + } + return { index, status: 'rejected', reason, error: message }; + } + }; + + // Process in chunks of BATCH_CONCURRENCY. We keep results aligned with + // input indices via the `index` field on each BatchItemResult. + const results: BatchItemResult[] = new Array(events.length); + for (let start = 0; start < events.length; start += BATCH_CONCURRENCY) { + const end = Math.min(start + BATCH_CONCURRENCY, events.length); + const chunk = await Promise.all( + events.slice(start, end).map((raw, i) => processOne(raw, start + i)), + ); + for (const r of chunk) { + results[r.index] = r; + } + } + + let accepted = 0; + const rejected: Extract[] = []; + for (const result of results) { + if (result.status === 'accepted') { + accepted += 1; + } else { + rejected.push(result); + } + } + + reply.status(202).send({ + accepted, + rejected, + }); +} + export async function fetchDeviceId( request: FastifyRequest, reply: FastifyReply diff --git a/apps/api/src/routes/track-batch.router.test.ts b/apps/api/src/routes/track-batch.router.test.ts new file mode 100644 index 000000000..4df7eaca5 --- /dev/null +++ b/apps/api/src/routes/track-batch.router.test.ts @@ -0,0 +1,449 @@ +/** + * Integration tests for batch ingestion via POST /track with + * `{ type: 'batch', payload: [event, ...] }`. + * + * Side effects (queue, db, geo, redis) are mocked so the test runs without + * Docker. Auth uses the same getClientByIdCached mock as the insights + * router tests, except here we don't need real fixtures — we never read + * from PG/CH, we only verify the controller dispatches each item correctly. + */ + +import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; + +// ─── Module mocks (hoisted before imports) ──────────────────────────────────── +// +// `vi.mock` is hoisted above all top-level statements, so any spies the factory +// references must be created via `vi.hoisted(...)` (also hoisted) — otherwise +// the factory runs first and hits a temporal-dead-zone ReferenceError. + +const { queueAdd, upsertProfileMock } = vi.hoisted(() => ({ + queueAdd: vi.fn().mockResolvedValue(undefined), + upsertProfileMock: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock('@openpanel/db', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + getClientByIdCached: vi.fn(), + getSalts: vi.fn().mockResolvedValue({ current: 'salt-a', previous: 'salt-b' }), + getProfileById: vi.fn().mockResolvedValue(null), + upsertProfile: upsertProfileMock, + groupBuffer: { add: vi.fn().mockResolvedValue(undefined) }, + replayBuffer: { add: vi.fn().mockResolvedValue(undefined) }, + }; +}); + +vi.mock('@openpanel/queue', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + getEventsGroupQueueShard: vi.fn(() => ({ add: queueAdd })), + }; +}); + +vi.mock('@openpanel/geo', () => ({ + getGeoLocation: vi.fn().mockResolvedValue({ + country: 'US', + city: 'San Francisco', + region: 'CA', + longitude: -122.4, + latitude: 37.77, + }), +})); + +vi.mock('@openpanel/common/server', async (importOriginal) => { + const actual = + await importOriginal(); + return { + ...actual, + verifyPassword: vi.fn().mockResolvedValue(true), + generateDeviceId: vi.fn().mockReturnValue('device-test'), + }; +}); + +vi.mock('@openpanel/redis', async (importOriginal) => { + const actual = await importOriginal(); + const fakeRedisClient = new Proxy( + {}, + { + get: (_t, p) => { + if (p === 'status') return 'ready'; + if (p === 'multi') { + return () => ({ + hget: vi.fn().mockReturnThis(), + exec: vi.fn().mockResolvedValue([]), + }); + } + return vi.fn().mockResolvedValue(null); + }, + }, + ); + return { + ...actual, + getCache: async (_key: string, _ttl: number, fn: () => Promise) => + fn(), + getLock: vi.fn().mockResolvedValue(true), + getRedisCache: vi.fn().mockReturnValue(fakeRedisClient), + }; +}); + +// ─── Imports (after mocks) ──────────────────────────────────────────────────── + +import { ClientType, getClientByIdCached } from '@openpanel/db'; +import type { FastifyInstance } from 'fastify'; +import { buildApp } from '../app'; + +// ─── Test client constants ──────────────────────────────────────────────────── + +const CLIENT_ID = '00000000-0000-0000-0000-0000000000aa'; +const CLIENT_SECRET = 'test-secret'; +const PROJECT_ID = 'test-project'; +const ORG_ID = 'test-org'; + +const AUTH = { + 'openpanel-client-id': CLIENT_ID, + 'openpanel-client-secret': CLIENT_SECRET, + 'user-agent': 'Mozilla/5.0 (Macintosh) Chrome/120.0.0.0', + 'content-type': 'application/json', +}; + +const WRITE_CLIENT = { + id: CLIENT_ID, + type: ClientType.write, + projectId: PROJECT_ID, + organizationId: ORG_ID, + secret: 'hashed-secret', + name: 'Batch Test Client', + cors: ['*'], + description: '', + ignoreCorsAndSecret: true, + createdAt: new Date(), + updatedAt: new Date(), + project: { + id: PROJECT_ID, + organizationId: ORG_ID, + cors: ['*'], + filters: [], + allowUnsafeRevenueTracking: true, + }, +}; + +// ─── Lifecycle ──────────────────────────────────────────────────────────────── + +let app: FastifyInstance; + +beforeAll(async () => { + vi.mocked(getClientByIdCached).mockResolvedValue(WRITE_CLIENT as any); + app = await buildApp({ testing: true }); + await app.ready(); +}, 30_000); + +afterAll(async () => { + await app.close(); +}, 10_000); + +beforeEach(() => { + queueAdd.mockClear(); + upsertProfileMock.mockClear(); +}); + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +function postTrack(body: unknown, headers: Record = AUTH) { + return app.inject({ + method: 'POST', + url: '/track', + headers, + payload: body as any, + }); +} + +function postBatch(events: unknown, headers: Record = AUTH) { + return postTrack({ type: 'batch', payload: events }, headers); +} + +const validTrack = (name = 'page_view') => ({ + type: 'track' as const, + payload: { name, properties: { __path: '/home' } }, +}); + +const validIdentify = (profileId = 'user-1') => ({ + type: 'identify' as const, + payload: { profileId, email: 'a@b.com' }, +}); + +// ─── Tests ──────────────────────────────────────────────────────────────────── + +describe('POST /track type=batch — auth & envelope', () => { + it('returns 401 without client-id', async () => { + const res = await postBatch([validTrack()], { + 'content-type': 'application/json', + }); + expect(res.statusCode).toBe(401); + }); + + it('returns 400 on empty payload array', async () => { + const res = await postBatch([]); + expect(res.statusCode).toBe(400); + }); + + it('returns 400 on missing payload field', async () => { + const res = await postTrack({ type: 'batch' }); + expect(res.statusCode).toBe(400); + }); + + it('returns 400 when array exceeds the per-request cap', async () => { + const events = Array.from({ length: 2001 }, () => validTrack()); + const res = await postBatch(events); + expect(res.statusCode).toBe(400); + }); +}); + +describe('POST /track type=batch — happy path', () => { + it('accepts a single track event and queues it', async () => { + const res = await postBatch([validTrack()]); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body).toEqual({ accepted: 1, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(1); + }); + + it('accepts a mixed batch (track + identify) and dispatches each', async () => { + const res = await postBatch([ + validTrack('signup'), + validIdentify('alice'), + validTrack('purchase'), + ]); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 3, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(2); // two `track` events + expect(upsertProfileMock).toHaveBeenCalledTimes(1); // one `identify` + }); + + it('treats each event as if sent one by one (per-event queue add)', async () => { + const events = Array.from({ length: 5 }, (_, i) => validTrack(`event_${i}`)); + const res = await postBatch(events); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 5, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(5); + }); + + it('still handles a single-event body (non-batch) with a 200', async () => { + // Regression guard: adding the batch variant to the /track body schema + // must not change the single-event contract. + const res = await postTrack(validTrack('single_event_probe')); + expect(res.statusCode).toBe(200); + const body = res.json(); + expect(body).toHaveProperty('deviceId'); + expect(body).toHaveProperty('sessionId'); + expect(queueAdd).toHaveBeenCalledTimes(1); + }); + + it('still rejects a single-event alias body with a 400', async () => { + const res = await postTrack({ + type: 'alias', + payload: { profileId: 'user-1', alias: 'u1' }, + }); + expect(res.statusCode).toBe(400); + }); + + it('marks dispatch failures as internal rejections', async () => { + queueAdd.mockRejectedValueOnce(new Error('queue unavailable')); + const res = await postBatch([validTrack('internal_error_probe')]); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(0); + expect(body.rejected).toHaveLength(1); + expect(body.rejected[0]).toMatchObject({ index: 0, reason: 'internal' }); + }); + + it('dispatches every supported event type through the shared pipeline', async () => { + const now = new Date().toISOString(); + const res = await postBatch([ + { type: 'group', payload: { id: 'g-1', type: 'company', name: 'Acme' } }, + { + type: 'assign_group', + payload: { groupIds: ['g-1'], profileId: 'user-1' }, + }, + // getProfileById is mocked to null → 404 → per-row validation reject + { + type: 'increment', + payload: { profileId: 'missing', property: 'visits' }, + }, + { + type: 'decrement', + payload: { profileId: 'missing', property: 'visits' }, + }, + // sessionId falls back to the deterministic bucket → replay accepted + { + type: 'replay', + payload: { + chunk_index: 0, + events_count: 1, + is_full_snapshot: true, + started_at: now, + ended_at: now, + payload: 'chunk', + }, + }, + ]); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(3); // group + assign_group + replay + expect(body.rejected).toHaveLength(2); // increment + decrement + expect( + body.rejected.every((r: { reason: string }) => r.reason === 'validation'), + ).toBe(true); + }); + + it('fetches geo per event when __ip overrides the request ip', async () => { + const res = await postBatch([ + { + type: 'track' as const, + payload: { name: 'ip_probe', properties: { __ip: '203.0.113.9' } }, + }, + ]); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 1, rejected: [] }); + }); + + it('rejects non-object items with a top-level validation message', async () => { + const res = await postBatch([42]); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(0); + expect(body.rejected).toHaveLength(1); + expect(body.rejected[0].reason).toBe('validation'); + expect(body.rejected[0].error).toBeTruthy(); + }); + + it('flags historical events with isTimestampFromThePast and keeps __timestamp', async () => { + // Batch items go through the exact same timestamp rules as single + // events: a __timestamp older than 15 minutes is accepted and flagged + // so the worker skips live-session handling for it. + const twoHoursAgo = Date.now() - 2 * 60 * 60 * 1000; + const res = await postBatch([ + { + type: 'track' as const, + payload: { + name: 'probe_historical', + properties: { __timestamp: new Date(twoHoursAgo).toISOString() }, + }, + }, + ]); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 1, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(1); + const queuedJob = queueAdd.mock.calls[0]?.[0]; + expect(queuedJob.data.event.timestamp).toBe(twoHoursAgo); + expect(queuedJob.data.event.isTimestampFromThePast).toBe(true); + }); + + it('passes a client-supplied __deviceId through to the queue', async () => { + const res = await postBatch([ + { + type: 'track' as const, + payload: { + name: 'probe_device_override', + properties: { __deviceId: 'mobile-device-abc', __path: '/home' }, + }, + }, + ]); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 1, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(1); + const queuedJob = queueAdd.mock.calls[0]?.[0]; + expect(queuedJob.data.deviceId).toBe('mobile-device-abc'); + }); +}); + +describe('POST /track type=batch — per-item validation', () => { + it('rejects bad rows by index without failing the batch', async () => { + const res = await postBatch([ + validTrack('good_1'), + { type: 'track', payload: { name: '' } }, // empty name → invalid + validTrack('good_2'), + { type: 'wrong-type', payload: {} }, // unknown discriminator + ]); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(2); + expect(body.rejected).toHaveLength(2); + expect(body.rejected.map((r: { index: number }) => r.index).sort()).toEqual([1, 3]); + expect(body.rejected.every((r: { reason: string }) => r.reason === 'validation')).toBe(true); + expect(queueAdd).toHaveBeenCalledTimes(2); + }); + + it('rejects alias as per-item validation (does not 400 the whole batch)', async () => { + const res = await postBatch([ + validTrack(), + { type: 'alias', payload: { profileId: 'user-1', alias: 'u1' } }, + ]); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(1); + expect(body.rejected).toHaveLength(1); + expect(body.rejected[0]).toMatchObject({ + index: 1, + reason: 'validation', + }); + expect(body.rejected[0].error).toMatch(/alias/i); + }); + + it('rejects a nested batch envelope as a per-item validation error', async () => { + // `batch` is only valid at the top level — items are validated against + // the single-event union, so recursion is impossible by construction. + const res = await postBatch([ + validTrack(), + { type: 'batch', payload: [validTrack()] }, + ]); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(1); + expect(body.rejected).toHaveLength(1); + expect(body.rejected[0]).toMatchObject({ + index: 1, + reason: 'validation', + }); + expect(queueAdd).toHaveBeenCalledTimes(1); + }); + + it('returns 202 with accepted=0 when every event fails validation', async () => { + const res = await postBatch([ + { type: 'track', payload: { name: '' } }, + { type: 'track', payload: {} }, + { type: 'identify', payload: {} }, + ]); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(0); + expect(body.rejected).toHaveLength(3); + expect(queueAdd).not.toHaveBeenCalled(); + }); + + // Regression: per-event processing is chunked (BATCH_CONCURRENCY = 50). + // A 200-event batch spans 4 chunks. Verifies that rejected indices land in + // the right positions across chunk boundaries — including the very first + // event in chunk 1, the last event in chunk 2, and one in chunk 4 — which + // would catch off-by-one slicing or out-of-order result accumulation. + it('preserves per-index results across chunk boundaries', async () => { + const SIZE = 200; + const badIndices = new Set([0, 50, 99, 100, 149, 199]); + const events = Array.from({ length: SIZE }, (_, i) => + badIndices.has(i) + ? { type: 'track', payload: { name: '' } } // invalid + : validTrack(`chunked_${i}`), + ); + const res = await postBatch(events); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(SIZE - badIndices.size); + expect(body.rejected).toHaveLength(badIndices.size); + const rejectedIndices = new Set( + body.rejected.map((r: { index: number }) => r.index), + ); + expect(rejectedIndices).toEqual(badIndices); + expect(queueAdd).toHaveBeenCalledTimes(SIZE - badIndices.size); + }); +}); diff --git a/apps/api/src/routes/track.router.ts b/apps/api/src/routes/track.router.ts index 07afbcdc2..4b51fefa9 100644 --- a/apps/api/src/routes/track.router.ts +++ b/apps/api/src/routes/track.router.ts @@ -1,4 +1,8 @@ -import { zTrackHandlerPayload } from '@openpanel/validation'; +import { + TRACK_BATCH_MAX_EVENTS, + zTrackBatchHandlerPayload, + zTrackHandlerPayload, +} from '@openpanel/validation'; import type { FastifyPluginAsyncZodOpenApi } from 'fastify-zod-openapi'; import { z } from 'zod'; import { fetchDeviceId, handler } from '@/controllers/track.controller'; @@ -6,6 +10,11 @@ import { clientHook } from '@/hooks/client.hook'; import { duplicateHook } from '@/hooks/duplicate.hook'; import { isBotHook } from '@/hooks/is-bot.hook'; +// Body limit for POST /track: 10 MB uncompressed, sized for batch requests +// ("up to 2000 events and 10 MB per request"). Single events are unaffected +// in practice — they stay far below the previous default limit. +const TRACK_BODY_LIMIT_BYTES = 10 * 1024 * 1024; + const trackRouter: FastifyPluginAsyncZodOpenApi = async (fastify) => { fastify.addHook('preValidation', duplicateHook); fastify.addHook('preHandler', clientHook); @@ -14,21 +23,33 @@ const trackRouter: FastifyPluginAsyncZodOpenApi = async (fastify) => { await fastify.route({ method: 'POST', url: '/', + bodyLimit: TRACK_BODY_LIMIT_BYTES, schema: { - body: zTrackHandlerPayload.and( - z.object({ - clientId: z.string().optional(), - clientSecret: z.string().optional(), - }) - ), + body: z + .union([zTrackHandlerPayload, zTrackBatchHandlerPayload]) + .and( + z.object({ + clientId: z.string().optional(), + clientSecret: z.string().optional(), + }) + ), tags: ['Track'], - description: - 'Ingest a tracking event (track, identify, group, increment, decrement, replay).', + description: `Ingest a tracking event (track, identify, group, increment, decrement, replay) or a batch of events ({ "type": "batch", "payload": [event, ...] }). Batch requests accept up to ${TRACK_BATCH_MAX_EVENTS} events and 10MB uncompressed per request; each event is dispatched through the same pipeline as a single-event request. Per-event validation failures are returned in the rejected[] array — the whole batch does not fail on a single bad row.`, response: { 200: z.object({ deviceId: z.string(), sessionId: z.string(), }), + 202: z.object({ + accepted: z.number().int().min(0), + rejected: z.array( + z.object({ + index: z.number().int().min(0), + reason: z.enum(['validation', 'internal']), + error: z.string(), + }) + ), + }), }, }, handler, diff --git a/packages/validation/src/track.validation.ts b/packages/validation/src/track.validation.ts index eae1f055e..bd9b7adb8 100644 --- a/packages/validation/src/track.validation.ts +++ b/packages/validation/src/track.validation.ts @@ -220,6 +220,25 @@ export const zTrackHandlerPayload = z.discriminatedUnion('type', [ .meta({ title: 'Assign Group' }), ]) satisfies z.ZodType; +// Batch ingestion: `POST /track` with `{ type: 'batch', payload: [...] }`. +// The envelope is validated strictly (array length only); per-event validation +// runs inside the controller via `safeParse(zTrackHandlerPayload)` so invalid +// items can be rejected per-index without failing the whole batch. +// +// Per-request caps: up to 2000 events and 10 MB uncompressed body. +export const TRACK_BATCH_MAX_EVENTS = 2000; + +export const zTrackBatchHandlerPayload = z + .object({ + type: z.literal('batch'), + payload: z.array(z.unknown()).min(1).max(TRACK_BATCH_MAX_EVENTS), + }) + .meta({ title: 'Batch' }); + +export type ITrackBatchHandlerPayload = z.infer< + typeof zTrackBatchHandlerPayload +>; + // Deprecated types for beta version of the SDKs export interface DeprecatedOpenpanelEventOptions {