Skip to content
301 changes: 245 additions & 56 deletions apps/api/src/controllers/track.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,32 @@ import type {
IIdentifyPayload,
IIncrementPayload,
IReplayPayload,
ITrackBatchHandlerPayload,
ITrackHandlerPayload,
ITrackPayload,
} from '@openpanel/validation';
import { zTrackHandlerPayload } from '@openpanel/validation';
import type { FastifyReply, FastifyRequest } from 'fastify';
import { assocPath, pathOr, pick } from 'ramda';
import { HttpError } from '@/utils/errors';
import { getDeviceId } from '@/utils/ids';

type Salts = Awaited<ReturnType<typeof getSalts>>;

/**
* Per-request data that is identical for every event in a batch.
* Computed once in the batch handler so we don't re-fetch salts/geo
* or re-parse headers N times.
*/
interface SharedRequestContext {
projectId: string;
requestIp: string;
requestUa: string;
requestHeaders: Record<string, string | undefined>;
requestGeo: GeoLocation;
salts: Salts;
}

export function getStringHeaders(headers: FastifyRequest['headers']) {
return Object.entries(
pick(
Expand Down Expand Up @@ -122,28 +140,65 @@ interface TrackContext {
geo: GeoLocation;
}

async function buildContext(
request: FastifyRequest<{
Body: ITrackHandlerPayload;
}>,
validatedBody: ITrackHandlerPayload
): Promise<TrackContext> {
/**
* Build the per-request shared context. Done once per HTTP request — for
* single-event /track this is just an extra struct; for /track/batch it
* lets N events share one salts + one geo lookup.
*/
async function buildSharedRequestContext(
request: FastifyRequest,
): Promise<SharedRequestContext> {
const projectId = request.client?.projectId;
if (!projectId) {
throw new HttpError('Missing projectId', { status: 400 });
}

const timestamp = getTimestamp(request.timestamp, validatedBody.payload);
const ip =
const requestIp = request.clientIp;
const requestUa = request.headers['user-agent'] ?? 'unknown/1.0';
const requestHeaders = getStringHeaders(request.headers);

const [requestGeo, salts] = await Promise.all([
getGeoLocation(requestIp),
getSalts(),
]);

return {
projectId,
requestIp,
requestUa,
requestHeaders,
requestGeo,
salts,
};
}

/**
* Build a per-event TrackContext from already-fetched shared data.
* Per-event work: timestamp, identity, ip override, deviceId, and a
* second geo lookup *only* if the event overrides __ip.
*/
async function buildEventContext(
shared: SharedRequestContext,
requestTimestamp: FastifyRequest['timestamp'],
validatedBody: ITrackHandlerPayload,
): Promise<TrackContext> {
const timestamp = getTimestamp(requestTimestamp, validatedBody.payload);

const overrideIp =
validatedBody.type === 'track' && validatedBody.payload.properties?.__ip
? (validatedBody.payload.properties.__ip as string)
: request.clientIp;
const ua = request.headers['user-agent'] ?? 'unknown/1.0';
: undefined;
const ip = overrideIp ?? shared.requestIp;

// Only re-fetch geo when the event overrode the IP — the common case
// (browser SDK, no __ip) reuses the request-level geo computed once.
const geo =
overrideIp && overrideIp !== shared.requestIp
? await getGeoLocation(overrideIp)
: shared.requestGeo;

const headers = getStringHeaders(request.headers);
const identity = getIdentity(validatedBody);
const profileId = identity?.profileId;

if (profileId && validatedBody.type === 'track') {
validatedBody.payload.profileId = profileId;
}
Expand All @@ -154,22 +209,19 @@ async function buildContext(
? validatedBody.payload?.properties.__deviceId
: undefined;

// Get geo location (needed for track and identify)
const [geo, salts] = await Promise.all([getGeoLocation(ip), getSalts()]);

const deviceIdResult = await getDeviceId({
projectId,
projectId: shared.projectId,
ip,
ua,
salts,
ua: shared.requestUa,
salts: shared.salts,
overrideDeviceId,
});

return {
projectId,
projectId: shared.projectId,
ip,
ua,
headers,
ua: shared.requestUa,
headers: shared.requestHeaders,
timestamp: {
value: timestamp.timestamp,
isFromPast: timestamp.isTimestampFromThePast,
Expand All @@ -181,6 +233,14 @@ async function buildContext(
};
}

async function buildContext(
request: FastifyRequest,
validatedBody: ITrackHandlerPayload,
): Promise<TrackContext> {
const shared = await buildSharedRequestContext(request);
return buildEventContext(shared, request.timestamp, validatedBody);
}

async function handleTrack(
payload: ITrackPayload,
context: TrackContext
Expand Down Expand Up @@ -361,15 +421,67 @@ async function handleAssignGroup(
});
}

/**
* Dispatch a validated event to the matching per-type handler. Shared by
* /track and /track/batch. Throws HttpError(400) for the unsupported `alias`
* type so single-event and batch can both surface it consistently.
*/
async function dispatchEvent(
body: ITrackHandlerPayload,
context: TrackContext,
): Promise<void> {
switch (body.type) {
case 'alias':
throw new HttpError('Alias is not supported', { status: 400 });
case 'track':
await handleTrack(body.payload, context);
return;
case 'identify':
await handleIdentify(body.payload, context);
return;
case 'increment':
await handleIncrement(body.payload, context);
return;
case 'decrement':
await handleDecrement(body.payload, context);
return;
case 'replay':
await handleReplay(body.payload, context);
return;
case 'group':
await handleGroup(body.payload, context);
return;
case 'assign_group':
await handleAssignGroup(body.payload, context);
return;
default: {
// Exhaustiveness check: `body` narrows to `never` when every variant
// of ITrackHandlerPayload['type'] is handled. Adding a new variant
// makes this assignment fail to compile.
const exhaustive: never = body;
throw new HttpError(`Unhandled event type: ${exhaustive}`, {
status: 400,
});
}
}
}

export async function handler(
request: FastifyRequest<{
Body: ITrackHandlerPayload;
Body: ITrackHandlerPayload | ITrackBatchHandlerPayload;
}>,
reply: FastifyReply
reply: FastifyReply,
) {
const validatedBody = request.body;

// Handle alias (not supported)
// Batch envelope: `{ type: 'batch', payload: [event, ...] }` — fan each
// item through the same per-event pipeline as a single-event request.
if (validatedBody.type === 'batch') {
return handleBatch(request, reply, validatedBody.payload);
}

// Reject `alias` before building context — saves the salts/geo/deviceId work
// for a request that's going to fail anyway.
if (validatedBody.type === 'alias') {
return reply.status(400).send({
status: 400,
Expand All @@ -378,46 +490,123 @@ export async function handler(
});
}

// Build request context
const context = await buildContext(request, validatedBody);

// Dispatch to appropriate handler
switch (validatedBody.type) {
case 'track':
await handleTrack(validatedBody.payload, context);
break;
case 'identify':
await handleIdentify(validatedBody.payload, context);
break;
case 'increment':
await handleIncrement(validatedBody.payload, context);
break;
case 'decrement':
await handleDecrement(validatedBody.payload, context);
break;
case 'replay':
await handleReplay(validatedBody.payload, context);
break;
case 'group':
await handleGroup(validatedBody.payload, context);
break;
case 'assign_group':
await handleAssignGroup(validatedBody.payload, context);
break;
default:
return reply.status(400).send({
status: 400,
error: 'Bad Request',
message: 'Invalid type',
});
}
await dispatchEvent(validatedBody, context);

reply.status(200).send({
deviceId: context.deviceId,
sessionId: context.sessionId,
});
}

type BatchItemResult =
| { index: number; status: 'accepted' }
| {
index: number;
status: 'rejected';
reason: 'validation' | 'internal';
error: string;
};

/**
* `POST /track` with `{ type: 'batch', payload: [...] }` — accepts up to
* TRACK_BATCH_MAX_EVENTS payloads in one request and dispatches each through
* the same per-event pipeline as a single-event request.
*
* Per-event validation failures do NOT fail the whole batch: the response is
* always 202 (assuming envelope + auth pass) with `{ accepted, rejected[] }`
* so callers can fix and retry just the bad indices.
*
* Optimization: salts + request-IP geo are fetched once and shared across
* all events. Events that override `__ip` still get their own geo lookup.
*/
// Bounded concurrency for per-event processing inside a batch. Each event
// hits Redis (session lookup + groupmq queue add) and may trigger a geo
// lookup if it overrides `__ip`, so an unbounded `Promise.all` over 2000
// events can spike Redis pool usage and the geo provider's rate budget on
// smaller self-hosted instances. 50 keeps the pipeline saturated without
// turning a single big batch into a thundering herd.
const BATCH_CONCURRENCY = 50;

async function handleBatch(
request: FastifyRequest<{
Body: ITrackHandlerPayload | ITrackBatchHandlerPayload;
}>,
reply: FastifyReply,
events: unknown[],
) {
const shared = await buildSharedRequestContext(request);

const processOne = async (
raw: unknown,
index: number,
): Promise<BatchItemResult> => {
const parsed = zTrackHandlerPayload.safeParse(raw);
if (!parsed.success) {
const issue = parsed.error.issues[0];
const path = issue?.path?.join('.') ?? '';
const error = path
? `${path}: ${issue?.message}`
: issue?.message ?? 'invalid payload';
return { index, status: 'rejected', reason: 'validation', error };
}

try {
const context = await buildEventContext(
shared,
request.timestamp,
parsed.data,
);
await dispatchEvent(parsed.data, context);
return { index, status: 'accepted' };
} catch (err) {
// HttpError with 4xx → caller's fault (validation-style: alias,
// unknown type, replay without session). Anything else → ours.
const isClientError =
err instanceof HttpError && err.status >= 400 && err.status < 500;
const reason: 'validation' | 'internal' = isClientError
? 'validation'
: 'internal';
const message = err instanceof Error ? err.message : 'unknown error';
if (!isClientError) {
request.log.error(
{ err, index },
'Batch event dispatch failed',
);
}
return { index, status: 'rejected', reason, error: message };
}
};

// Process in chunks of BATCH_CONCURRENCY. We keep results aligned with
// input indices via the `index` field on each BatchItemResult.
const results: BatchItemResult[] = new Array(events.length);
for (let start = 0; start < events.length; start += BATCH_CONCURRENCY) {
const end = Math.min(start + BATCH_CONCURRENCY, events.length);
const chunk = await Promise.all(
events.slice(start, end).map((raw, i) => processOne(raw, start + i)),
);
for (const r of chunk) {
results[r.index] = r;
}
}

let accepted = 0;
const rejected: Extract<BatchItemResult, { status: 'rejected' }>[] = [];
for (const result of results) {
if (result.status === 'accepted') {
accepted += 1;
} else {
rejected.push(result);
}
}

reply.status(202).send({
accepted,
rejected,
});
}

export async function fetchDeviceId(
request: FastifyRequest,
reply: FastifyReply
Expand Down
Loading