diff --git a/.github/workflows/unit-tests-internal.yml b/.github/workflows/unit-tests-internal.yml index e2aae11b84..5a0e562215 100644 --- a/.github/workflows/unit-tests-internal.yml +++ b/.github/workflows/unit-tests-internal.yml @@ -95,7 +95,7 @@ jobs: } echo "Pre-pulling Docker images with authenticated session..." pull postgres:14 - pull clickhouse/clickhouse-server:25.4-alpine + pull clickhouse/clickhouse-server:26.2.19.43-alpine@sha256:c6ad6a7eb2fb5999df3adfb8b69a0c7222c68fa9b8f6b04a088564ebbc959251 pull redis:7.2 pull testcontainers/ryuk:0.14.0 pull electricsql/electric:1.2.4 diff --git a/.github/workflows/unit-tests-packages.yml b/.github/workflows/unit-tests-packages.yml index 6642f2443c..1102864c45 100644 --- a/.github/workflows/unit-tests-packages.yml +++ b/.github/workflows/unit-tests-packages.yml @@ -95,7 +95,7 @@ jobs: } echo "Pre-pulling Docker images with authenticated session..." pull postgres:14 - pull clickhouse/clickhouse-server:25.4-alpine + pull clickhouse/clickhouse-server:26.2.19.43-alpine@sha256:c6ad6a7eb2fb5999df3adfb8b69a0c7222c68fa9b8f6b04a088564ebbc959251 pull redis:7.2 pull testcontainers/ryuk:0.14.0 pull electricsql/electric:1.2.4 diff --git a/.github/workflows/unit-tests-webapp.yml b/.github/workflows/unit-tests-webapp.yml index dc1cc978f3..8566014c49 100644 --- a/.github/workflows/unit-tests-webapp.yml +++ b/.github/workflows/unit-tests-webapp.yml @@ -95,7 +95,7 @@ jobs: } echo "Pre-pulling Docker images with authenticated session..." pull postgres:14 - pull clickhouse/clickhouse-server:25.4-alpine + pull clickhouse/clickhouse-server:26.2.19.43-alpine@sha256:c6ad6a7eb2fb5999df3adfb8b69a0c7222c68fa9b8f6b04a088564ebbc959251 pull redis:7.2 pull testcontainers/ryuk:0.14.0 pull electricsql/electric:1.2.4 diff --git a/.server-changes/logs-search-memory-and-pagination.md b/.server-changes/logs-search-memory-and-pagination.md new file mode 100644 index 0000000000..70d32b32bd --- /dev/null +++ b/.server-changes/logs-search-memory-and-pagination.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Keep logs search within bounded ClickHouse memory when browsing long time ranges, and fix pagination that could skip or duplicate entries sharing a timestamp. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index dcb4eb11db..f793654dd3 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -1642,6 +1642,34 @@ const EnvironmentSchema = z CLICKHOUSE_LOGS_LIST_MAX_THREADS: z.coerce.number().int().default(2), CLICKHOUSE_LOGS_LIST_MAX_ROWS_TO_READ: z.coerce.number().int().default(10_000_000), CLICKHOUSE_LOGS_LIST_MAX_EXECUTION_TIME: z.coerce.number().int().default(120), + // Bound read-in-order memory on object-storage reads: each part opens a per-column read + // stream, and the default ~1 MiB+ S3 buffers dominate peak memory. These two byte sizes + // cap the per-stream buffers and exist on every supported ClickHouse, so they are always on. + CLICKHOUSE_LOGS_LIST_PREFETCH_BUFFER_SIZE: z.coerce.number().int().nonnegative().default(262_144), + CLICKHOUSE_LOGS_LIST_MAX_READ_BUFFER_SIZE: z.coerce.number().int().nonnegative().default(262_144), + // The decisive lever on Cloud SharedMergeTree, but it only exists on newer ClickHouse and + // is a no-op on local-disk MergeTree, so it is opt-in: unset means it is never sent (safe on + // any self-hosted version). Set to 0 on object-storage deployments to get the memory win. + CLICKHOUSE_LOGS_LIST_FILESYSTEM_CACHE_PREFER_BIGGER_BUFFER_SIZE: z.coerce + .number() + .int() + .nonnegative() + .optional(), + + // Logs list pagination tuning (page sizing + recent-first probe windows). + LOGS_LIST_DEFAULT_PAGE_SIZE: z.coerce.number().int().positive().default(50), + LOGS_LIST_MAX_PAGE_SIZE: z.coerce.number().int().positive().default(100), + // Days back from the page ceiling to probe before widening to the full requested window, + // comma-separated. Empty disables narrowing (a single full-window query). + LOGS_LIST_RECENT_FIRST_PROBE_DAYS: z + .string() + .default("1,7") + .transform((s) => + s + .split(",") + .map((v) => Number(v.trim())) + .filter((n) => Number.isFinite(n) && n > 0) + ), // Query feature flag QUERY_FEATURE_ENABLED: z.string().default("1"), diff --git a/apps/webapp/app/presenters/v3/LogsListPresenter.server.ts b/apps/webapp/app/presenters/v3/LogsListPresenter.server.ts index 517c586e4e..a2d20a1ccd 100644 --- a/apps/webapp/app/presenters/v3/LogsListPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/LogsListPresenter.server.ts @@ -1,5 +1,9 @@ import { z } from "zod"; -import { type ClickHouse, type WhereCondition } from "@internal/clickhouse"; +import { + type ClickHouse, + type WhereCondition, + type LogsSearchListResult, +} from "@internal/clickhouse"; import { type PrismaClientOrTransaction } from "@trigger.dev/database"; import { EVENT_STORE_TYPES, getConfiguredEventRepository } from "~/v3/eventRepository/index.server"; @@ -11,6 +15,7 @@ import { getTaskIdentifiers } from "~/models/task.server"; import { ServiceValidationError } from "~/v3/services/baseService.server"; import { kindToLevel, type LogLevel, LogLevelSchema } from "~/utils/logUtils"; import { BasePresenter } from "~/presenters/v3/basePresenter.server"; +import { env } from "~/env.server"; import { convertDateToClickhouseDateTime, convertClickhouseDateTime64ToJsDate, @@ -66,25 +71,33 @@ export const LogsListOptionsSchema = z.object({ pageSize: z.number().int().positive().max(1000).optional(), }); -const DEFAULT_PAGE_SIZE = 50; +const DAY_MS = 24 * 60 * 60 * 1000; export type LogsList = Awaited>; export type LogEntry = LogsList["logs"][0]; export type LogsListAppliedFilters = LogsList["filters"]; +// Bump when the cursor shape changes so stale cursors are ignored (reset to the first page) +// rather than misparsed. +const LOG_CURSOR_VERSION = 2; + // Cursor is a base64 encoded JSON of the pagination keys type LogCursor = { + v: number; organizationId: string; environmentId: string; triggeredTimestamp: string; // DateTime64(9) string traceId: string; + spanId: string; }; const LogCursorSchema = z.object({ + v: z.literal(LOG_CURSOR_VERSION), organizationId: z.string(), environmentId: z.string(), triggeredTimestamp: z.string(), traceId: z.string(), + spanId: z.string(), }); function encodeCursor(cursor: LogCursor): string { @@ -105,6 +118,34 @@ function decodeCursor(cursor: string): LogCursor | null { } } +// Ordered list of lower bounds to try, narrowest (most recent) first, ending at the user's +// requested floor (or undefined for an unbounded-below window). Because rows are returned +// newest-first, a narrow window that already fills a page returns the exact same top rows the +// full window would, so widening only happens when a page comes back short. +function buildProbeFloors( + ceil: Date, + hardFloor: Date | undefined, + stepDays: number[] +): (Date | undefined)[] { + const floors: (Date | undefined)[] = []; + + for (const days of stepDays) { + let candidate = new Date(ceil.getTime() - days * DAY_MS); + if (hardFloor && candidate <= hardFloor) { + candidate = hardFloor; + } + floors.push(candidate); + if (hardFloor && candidate.getTime() === hardFloor.getTime()) { + // Reached the requested floor; nothing wider left to probe. + return floors; + } + } + + // Final probe always covers the full requested window (or unbounded if no floor was given). + floors.push(hardFloor); + return floors; +} + // Convert display level to ClickHouse kinds and statuses function levelToKindsAndStatuses(level: LogLevel): { kinds?: string[]; statuses?: string[] } { switch (level) { @@ -143,7 +184,7 @@ export class LogsListPresenter extends BasePresenter { from, to, cursor, - pageSize = DEFAULT_PAGE_SIZE, + pageSize = env.LOGS_LIST_DEFAULT_PAGE_SIZE, defaultPeriod, retentionLimitDays, }: LogsListOptions @@ -221,125 +262,162 @@ export class LogsListPresenter extends BasePresenter { ); } - const queryBuilder = this.clickhouse.taskEventsSearch.logsListQueryBuilder(); - - // This should be removed once we clear the old inserts, 30 DAYS, the materialized view excludes events without trace_id) - queryBuilder.where("trace_id != ''", { - environmentId, - }); + const effectivePageSize = Math.min(pageSize, env.LOGS_LIST_MAX_PAGE_SIZE); + + // Only honor a cursor scoped to this org+env; one copied from another scope would shift the + // pagination anchor instead of resetting to the first page. + const parsedCursor = cursor ? decodeCursor(cursor) : null; + const decodedCursor = + parsedCursor && + parsedCursor.organizationId === organizationId && + parsedCursor.environmentId === environmentId + ? parsedCursor + : null; + + // Effective upper bound, always clamped to now so a probe never runs [floor, +inf). + const now = new Date(); + const clampedTo = effectiveTo !== undefined ? (effectiveTo > now ? now : effectiveTo) : now; + + const searchTerm = + search && search.trim() !== "" + ? escapeClickHouseString(search.trim()).toLowerCase() + : undefined; + + // Runs the full list query restricted to a single [floor, ceil] window. The recent-first + // probe loop below calls this with progressively wider floors. + const runProbe = (floor: Date | undefined) => { + const queryBuilder = this.clickhouse.taskEventsSearch.logsListQueryBuilder(); + + // The materialized view excludes events without a trace_id; this guards the legacy tail. + queryBuilder.where("trace_id != ''"); + queryBuilder.where("environment_id = {environmentId: String}", { environmentId }); + queryBuilder.where("organization_id = {organizationId: String}", { organizationId }); + queryBuilder.where("project_id = {projectId: String}", { projectId }); + + if (clampedTo) { + queryBuilder.where("triggered_timestamp <= {triggeredAtEnd: DateTime64(3)}", { + triggeredAtEnd: convertDateToClickhouseDateTime(clampedTo), + }); + } - queryBuilder.where("environment_id = {environmentId: String}", { - environmentId, - }); + if (floor) { + queryBuilder.where("triggered_timestamp >= {triggeredAtStart: DateTime64(3)}", { + triggeredAtStart: convertDateToClickhouseDateTime(floor), + }); + } - queryBuilder.where("organization_id = {organizationId: String}", { - organizationId, - }); - queryBuilder.where("project_id = {projectId: String}", { projectId }); + // Task filter (applies directly to ClickHouse) + if (tasks && tasks.length > 0) { + queryBuilder.where("task_identifier IN {tasks: Array(String)}", { tasks }); + } - if (effectiveFrom) { - queryBuilder.where("triggered_timestamp >= {triggeredAtStart: DateTime64(3)}", { - triggeredAtStart: convertDateToClickhouseDateTime(effectiveFrom), - }); - } + // Run ID filter + if (runId && runId !== "") { + queryBuilder.where("run_id = {runId: String}", { runId }); + } - if (effectiveTo) { - const clampedTo = effectiveTo > new Date() ? new Date() : effectiveTo; + // Case-insensitive search in message and attributes + if (searchTerm !== undefined) { + queryBuilder.where( + "(lower(message) like {searchPattern: String} OR lower(attributes_text) like {searchPattern: String})", + { searchPattern: `%${searchTerm}%` } + ); + } - queryBuilder.where("triggered_timestamp <= {triggeredAtEnd: DateTime64(3)}", { - triggeredAtEnd: convertDateToClickhouseDateTime(clampedTo), - }); - } + if (levels && levels.length > 0) { + const conditions: WhereCondition[] = []; - // Task filter (applies directly to ClickHouse) - if (tasks && tasks.length > 0) { - queryBuilder.where("task_identifier IN {tasks: Array(String)}", { - tasks, - }); - } + for (let i = 0; i < levels.length; i++) { + const filter = levelToKindsAndStatuses(levels[i]); - // Run ID filter - if (runId && runId !== "") { - queryBuilder.where("run_id = {runId: String}", { runId }); - } + if (filter.kinds && filter.kinds.length > 0) { + conditions.push({ + clause: `kind IN {kinds_${i}: Array(String)} AND status NOT IN {excluded_statuses: Array(String)}`, + params: { + [`kinds_${i}`]: filter.kinds, + excluded_statuses: ["ERROR", "CANCELLED"], + }, + }); + } - // Case-insensitive search in message, attributes, and status fields - if (search && search.trim() !== "") { - const searchTerm = escapeClickHouseString(search.trim()).toLowerCase(); - queryBuilder.where( - "(lower(message) like {searchPattern: String} OR lower(attributes_text) like {searchPattern: String})", - { - searchPattern: `%${searchTerm}%`, + if (filter.statuses && filter.statuses.length > 0) { + conditions.push({ + clause: `status IN {statuses_${i}: Array(String)}`, + params: { [`statuses_${i}`]: filter.statuses }, + }); + } } - ); - } - if (levels && levels.length > 0) { - const conditions: WhereCondition[] = []; + queryBuilder.whereOr(conditions); + } - for (let i = 0; i < levels.length; i++) { - const filter = levelToKindsAndStatuses(levels[i]); + // Keyset pagination over the full sort key. ORDER BY is DESC, so the next page is the rows + // that sort after the cursor (strictly less-than). (triggered_timestamp, trace_id) is not + // unique because spans of a trace share both, so span_id is the final tiebreaker; without + // it rows at a tie boundary could be skipped or duplicated across pages. + if (decodedCursor) { + queryBuilder.where( + `(triggered_timestamp < {cursorTriggeredTimestamp: String} + OR (triggered_timestamp = {cursorTriggeredTimestamp: String} AND trace_id < {cursorTraceId: String}) + OR (triggered_timestamp = {cursorTriggeredTimestamp: String} AND trace_id = {cursorTraceId: String} AND span_id < {cursorSpanId: String}))`, + { + cursorTriggeredTimestamp: decodedCursor.triggeredTimestamp, + cursorTraceId: decodedCursor.traceId, + cursorSpanId: decodedCursor.spanId, + } + ); + } - if (filter.kinds && filter.kinds.length > 0) { - conditions.push({ - clause: `kind IN {kinds_${i}: Array(String)} AND status NOT IN {excluded_statuses: Array(String)}`, - params: { - [`kinds_${i}`]: filter.kinds, - excluded_statuses: ["ERROR", "CANCELLED"], - }, - }); - } + queryBuilder.orderBy("triggered_timestamp DESC, trace_id DESC, span_id DESC"); + // Limit + 1 to check if there are more results + queryBuilder.limit(effectivePageSize + 1); - if (filter.statuses && filter.statuses.length > 0) { - conditions.push({ - clause: `status IN {statuses_${i}: Array(String)}`, - params: { [`statuses_${i}`]: filter.statuses }, - }); - } - } + return queryBuilder.execute(); + }; - queryBuilder.whereOr(conditions); - } + // Page ceiling: the cursor (deeper pages) or the requested upper bound. Widen the lower + // bound only when a recent window doesn't fill the page. + const ceil = decodedCursor + ? convertClickhouseDateTime64ToJsDate(decodedCursor.triggeredTimestamp) + : clampedTo ?? new Date(); - // Cursor-based pagination using lexicographic comparison on (triggered_timestamp, trace_id). - // Since ORDER BY is DESC, "next page" means rows that sort *after* the cursor, i.e. less-than. - // The OR handles the tiebreaker: rows with an earlier timestamp always qualify, and rows - // with the *same* timestamp only qualify if their trace_id is also smaller. - // Equivalent to: WHERE (triggered_timestamp, trace_id) < (cursor.triggered_timestamp, cursor.trace_id) - const decodedCursor = cursor ? decodeCursor(cursor) : null; - if (decodedCursor) { - queryBuilder.where( - `(triggered_timestamp < {cursorTriggeredTimestamp: String} OR (triggered_timestamp = {cursorTriggeredTimestamp: String} AND trace_id < {cursorTraceId: String}))`, - { - cursorTriggeredTimestamp: decodedCursor.triggeredTimestamp, - cursorTraceId: decodedCursor.traceId, - } - ); - } + const probeFloors = buildProbeFloors( + ceil, + effectiveFrom ?? undefined, + env.LOGS_LIST_RECENT_FIRST_PROBE_DAYS + ); - queryBuilder.orderBy("triggered_timestamp DESC, trace_id DESC"); - // Limit + 1 to check if there are more results - queryBuilder.limit(pageSize + 1); + let records: LogsSearchListResult[] = []; + for (const floor of probeFloors) { + const [queryError, probeRecords] = await runProbe(floor); - const [queryError, records] = await queryBuilder.execute(); + if (queryError) { + throw queryError; + } - if (queryError) { - throw queryError; + records = probeRecords ?? []; + + if (records.length > effectivePageSize) { + // Page is full from this window; older rows can't be in the top page, stop widening. + break; + } } - const results = records || []; - const hasMore = results.length > pageSize; - const logs = results.slice(0, pageSize); + const results = records; + const hasMore = results.length > effectivePageSize; + const logs = results.slice(0, effectivePageSize); // Build next cursor from the last item let nextCursor: string | undefined; if (hasMore && logs.length > 0) { const lastLog = logs[logs.length - 1]; nextCursor = encodeCursor({ + v: LOG_CURSOR_VERSION, organizationId, environmentId, triggeredTimestamp: lastLog.triggered_timestamp, traceId: lastLog.trace_id, + spanId: lastLog.span_id, }); } diff --git a/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts b/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts index 7c20dd3a2a..62580c1fc9 100644 --- a/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts +++ b/apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts @@ -43,6 +43,15 @@ function getLogsListClickhouseSettings() { max_bytes_before_external_sort: env.CLICKHOUSE_LOGS_LIST_MAX_BYTES_BEFORE_EXTERNAL_SORT.toString(), max_threads: env.CLICKHOUSE_LOGS_LIST_MAX_THREADS, + // Cap per-part read buffers so read-in-order memory stays bounded. These exist everywhere. + prefetch_buffer_size: env.CLICKHOUSE_LOGS_LIST_PREFETCH_BUFFER_SIZE.toString(), + max_read_buffer_size: env.CLICKHOUSE_LOGS_LIST_MAX_READ_BUFFER_SIZE.toString(), + // Object-storage only and newer than the buffers above, so only send it when configured to + // avoid UNKNOWN_SETTING failures against older self-hosted ClickHouse that lack it. + ...(env.CLICKHOUSE_LOGS_LIST_FILESYSTEM_CACHE_PREFER_BIGGER_BUFFER_SIZE !== undefined && { + filesystem_cache_prefer_bigger_buffer_size: + env.CLICKHOUSE_LOGS_LIST_FILESYSTEM_CACHE_PREFER_BIGGER_BUFFER_SIZE, + }), ...(env.CLICKHOUSE_LOGS_LIST_MAX_ROWS_TO_READ && { max_rows_to_read: env.CLICKHOUSE_LOGS_LIST_MAX_ROWS_TO_READ.toString(), }), diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index f5d0996fa8..52b0639bb3 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -164,7 +164,7 @@ services: - database clickhouse: - image: clickhouse/clickhouse-server:25.6.2@sha256:97f0fe0f8729569e8c9d11069acee23abadeade4889f56ca3dc3df069f28cb85 + image: clickhouse/clickhouse-server:26.2.19.43@sha256:c2f2605585899d5103a0447daadbc0005f362200d5f0fcca7f40db3ca0dd36dd restart: always container_name: ${CONTAINER_PREFIX:-}clickhouse ulimits: diff --git a/internal-packages/testcontainers/src/clickhouse.ts b/internal-packages/testcontainers/src/clickhouse.ts index 1bd7f758e0..29453b68ac 100644 --- a/internal-packages/testcontainers/src/clickhouse.ts +++ b/internal-packages/testcontainers/src/clickhouse.ts @@ -16,7 +16,9 @@ export class ClickHouseContainer extends GenericContainer { private password = "test"; private database = "test"; - constructor(image = "clickhouse/clickhouse-server:25.4-alpine") { + constructor( + image = "clickhouse/clickhouse-server:26.2.19.43-alpine@sha256:c6ad6a7eb2fb5999df3adfb8b69a0c7222c68fa9b8f6b04a088564ebbc959251" + ) { super(image); this.withExposedPorts(CLICKHOUSE_PORT, CLICKHOUSE_HTTP_PORT); this.withWaitStrategy(