diff --git a/apps/local/package.json b/apps/local/package.json index 2d89e733b..fffc92226 100644 --- a/apps/local/package.json +++ b/apps/local/package.json @@ -19,6 +19,7 @@ }, "dependencies": { "@effect-atom/atom-react": "^0.5.0", + "@effect/opentelemetry": "^0.63.0", "@effect/platform": "catalog:", "@effect/platform-node": "catalog:", "@executor/api": "workspace:*", diff --git a/apps/local/src/server/main.ts b/apps/local/src/server/main.ts index d7abe464b..3a4002ada 100644 --- a/apps/local/src/server/main.ts +++ b/apps/local/src/server/main.ts @@ -35,6 +35,7 @@ import { import { getExecutor } from "./executor"; import { createMcpRequestHandler, type McpRequestHandler } from "./mcp"; import { ErrorCaptureLive } from "./observability"; +import { startMetricsExport } from "./telemetry"; // --------------------------------------------------------------------------- // Local server API — core + all plugin groups @@ -91,6 +92,11 @@ export const createServerHandlers = async (): Promise => { const executor = await getExecutor(); const engine = createExecutionEngine({ executor, codeExecutor: makeQuickJsExecutor() }); + // Boot the OTLP metrics exporter if `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT` + // is set. No-op otherwise — the in-process metric registry still + // accumulates counters/histograms for `GET /api/metrics`. + await Effect.runPromise(startMetricsExport()); + // Handlers wrap their own bodies with `capture(...)` — the edge // translation lives per-handler, not at service construction. const pluginExtensions = Layer.mergeAll( diff --git a/apps/local/src/server/telemetry.ts b/apps/local/src/server/telemetry.ts new file mode 100644 index 000000000..4687f3063 --- /dev/null +++ b/apps/local/src/server/telemetry.ts @@ -0,0 +1,70 @@ +// --------------------------------------------------------------------------- +// Metrics export for the local daemon. +// +// Gated on `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT` (OTel standard env var). +// When set, wires `@effect/opentelemetry/OtlpMetrics` into a module-scope +// `ManagedRuntime` so the exporter's timer fiber keeps ticking across +// requests — per-request scoping would shut down the exporter before the +// first batch leaves. +// +// When unset, `MetricsRuntime` is `null` and nothing pushes; the +// in-process registry keeps accumulating counters + histograms so +// `GET /api/metrics` still serves a complete snapshot on demand. +// +// Auth: `OTEL_EXPORTER_OTLP_METRICS_HEADERS` follows the OTel spec +// format — comma-separated `key=value` pairs. For Axiom specifically, +// that's `Authorization=Bearer xxx,X-Axiom-Dataset=executor-local`. +// --------------------------------------------------------------------------- + +import * as FetchHttpClient from "@effect/platform/FetchHttpClient"; +import * as OtlpMetrics from "@effect/opentelemetry/OtlpMetrics"; +import * as OtlpSerialization from "@effect/opentelemetry/OtlpSerialization"; +import { Effect, Layer, ManagedRuntime } from "effect"; + +const SERVICE_NAME = "executor-local"; +const SERVICE_VERSION = "1.0.0"; + +const parseHeaders = (raw: string | undefined): Record => { + if (!raw) return {}; + const out: Record = {}; + for (const entry of raw.split(",")) { + const idx = entry.indexOf("="); + if (idx === -1) continue; + const key = entry.slice(0, idx).trim(); + const value = entry.slice(idx + 1).trim(); + if (key) out[key] = value; + } + return out; +}; + +const endpoint = process.env.OTEL_EXPORTER_OTLP_METRICS_ENDPOINT; +const headers = parseHeaders(process.env.OTEL_EXPORTER_OTLP_METRICS_HEADERS); + +/** + * Module-scope runtime holding the OTLP exporter. Never disposed for the + * daemon's lifetime — the exporter's fiber ticks every `exportInterval` + * (10s default) regardless of per-request activity. Set to `null` when + * no push endpoint is configured, making every `useMetrics` call a no-op. + */ +export const MetricsRuntime = endpoint + ? ManagedRuntime.make( + OtlpMetrics.layer({ + url: endpoint, + resource: { serviceName: SERVICE_NAME, serviceVersion: SERVICE_VERSION }, + headers, + }).pipe( + Layer.provide(OtlpSerialization.layerJson), + Layer.provide(FetchHttpClient.layer), + ), + ) + : null; + +/** + * Idempotent startup hook. Call once from the daemon's main() to force + * the module-scope runtime to boot (which starts the exporter's push + * timer). No-op when OTLP is not configured. + */ +export const startMetricsExport = (): Effect.Effect => + MetricsRuntime + ? Effect.promise(() => MetricsRuntime!.runPromise(Effect.void)) + : Effect.void; diff --git a/bun.lock b/bun.lock index 89db25bd4..2679340bb 100644 --- a/bun.lock +++ b/bun.lock @@ -127,6 +127,7 @@ "version": "1.4.4", "dependencies": { "@effect-atom/atom-react": "^0.5.0", + "@effect/opentelemetry": "^0.63.0", "@effect/platform": "catalog:", "@effect/platform-node": "catalog:", "@executor/api": "workspace:*", diff --git a/packages/core/api/src/api.ts b/packages/core/api/src/api.ts index 973967c01..8f21856c5 100644 --- a/packages/core/api/src/api.ts +++ b/packages/core/api/src/api.ts @@ -6,6 +6,7 @@ import { SourcesApi } from "./sources/api"; import { SecretsApi } from "./secrets/api"; import { ConnectionsApi } from "./connections/api"; import { ExecutionsApi } from "./executions/api"; +import { MetricsApi } from "./metrics/api"; import { ScopeApi } from "./scope/api"; export const CoreExecutorApi = HttpApi.make("executor") @@ -14,6 +15,7 @@ export const CoreExecutorApi = HttpApi.make("executor") .add(SecretsApi) .add(ConnectionsApi) .add(ExecutionsApi) + .add(MetricsApi) .add(ScopeApi) .annotateContext( OpenApi.annotations({ diff --git a/packages/core/api/src/handlers/index.ts b/packages/core/api/src/handlers/index.ts index 2223b226b..03607742d 100644 --- a/packages/core/api/src/handlers/index.ts +++ b/packages/core/api/src/handlers/index.ts @@ -6,6 +6,7 @@ import { SecretsHandlers } from "./secrets"; import { ConnectionsHandlers } from "./connections"; import { ScopeHandlers } from "./scope"; import { ExecutionsHandlers } from "./executions"; +import { MetricsHandlers } from "./metrics"; export { ToolsHandlers } from "./tools"; export { SourcesHandlers } from "./sources"; @@ -13,6 +14,7 @@ export { SecretsHandlers } from "./secrets"; export { ConnectionsHandlers } from "./connections"; export { ScopeHandlers } from "./scope"; export { ExecutionsHandlers } from "./executions"; +export { MetricsHandlers } from "./metrics"; export const CoreHandlers = Layer.mergeAll( ToolsHandlers, @@ -21,4 +23,5 @@ export const CoreHandlers = Layer.mergeAll( ConnectionsHandlers, ScopeHandlers, ExecutionsHandlers, + MetricsHandlers, ); diff --git a/packages/core/api/src/handlers/metrics.ts b/packages/core/api/src/handlers/metrics.ts new file mode 100644 index 000000000..8617d7119 --- /dev/null +++ b/packages/core/api/src/handlers/metrics.ts @@ -0,0 +1,12 @@ +import { HttpApiBuilder } from "@effect/platform"; +import { Effect } from "effect"; + +import { ExecutorApi } from "../api"; +import { renderPrometheus } from "../metrics/prometheus"; + +// `renderPrometheus` reads the process-wide Effect metric registry via +// `Metric.unsafeSnapshot()`. Synchronous, side-effect-free, no executor +// services required — handler is minimal. +export const MetricsHandlers = HttpApiBuilder.group(ExecutorApi, "metrics", (handlers) => + handlers.handle("scrape", () => Effect.sync(() => renderPrometheus())), +); diff --git a/packages/core/api/src/index.ts b/packages/core/api/src/index.ts index 06f2c820e..9de19bb7a 100644 --- a/packages/core/api/src/index.ts +++ b/packages/core/api/src/index.ts @@ -4,6 +4,7 @@ export { SourcesApi } from "./sources/api"; export { SecretsApi } from "./secrets/api"; export { ConnectionsApi } from "./connections/api"; export { ExecutionsApi } from "./executions/api"; +export { MetricsApi } from "./metrics/api"; export { ScopeApi } from "./scope/api"; export { InternalError, diff --git a/packages/core/api/src/metrics/api.ts b/packages/core/api/src/metrics/api.ts new file mode 100644 index 000000000..e0dd7e33d --- /dev/null +++ b/packages/core/api/src/metrics/api.ts @@ -0,0 +1,22 @@ +import { HttpApiEndpoint, HttpApiGroup, HttpApiSchema } from "@effect/platform"; + +import { InternalError } from "../observability"; + +// --------------------------------------------------------------------------- +// Prometheus text exposition on `GET /api/metrics`. +// +// Returns the current in-process `Effect.Metric` snapshot in the spec +// format: https://prometheus.io/docs/instrumenting/exposition_formats/. +// +// Mount notes for hosts: +// - Self-hosted local daemon: mount unconditionally (operator can scrape). +// - Cloud: mount under the protected API so each org only sees their own +// metrics. The core API group doesn't carry auth middleware itself; +// the host app composes that above. +// --------------------------------------------------------------------------- + +const PrometheusResponse = HttpApiSchema.Text({ contentType: "text/plain" }); + +export class MetricsApi extends HttpApiGroup.make("metrics") + .add(HttpApiEndpoint.get("scrape")`/metrics`.addSuccess(PrometheusResponse)) + .addError(InternalError) {} diff --git a/packages/core/api/src/metrics/prometheus.test.ts b/packages/core/api/src/metrics/prometheus.test.ts new file mode 100644 index 000000000..1cfbc5008 --- /dev/null +++ b/packages/core/api/src/metrics/prometheus.test.ts @@ -0,0 +1,74 @@ +import { describe, expect, it } from "@effect/vitest"; +import { Effect, Metric, MetricBoundaries } from "effect"; + +import { renderPrometheus } from "./prometheus"; + +// --------------------------------------------------------------------------- +// Integration-style tests against Effect's live `Metric.unsafeSnapshot()` +// registry. Each test stamps a metric with a unique name so assertions +// don't collide with values accumulated in prior tests — the registry +// is process-wide and not reset between tests. +// --------------------------------------------------------------------------- + +const uniqueName = (prefix: string) => + `${prefix}_${Math.random().toString(36).slice(2, 10)}`; + +describe("renderPrometheus", () => { + it.effect("emits counter HELP + TYPE + sanitized name", () => + Effect.gen(function* () { + const name = uniqueName("pr_counter.ticks"); + const counter = Metric.counter(name, { description: "example counter" }); + yield* Metric.update(counter, 7); + + const output = renderPrometheus(); + const sanitized = name.replace(/\./g, "_"); + expect(output).toMatch(new RegExp(`# HELP ${sanitized} example counter`)); + expect(output).toMatch(new RegExp(`# TYPE ${sanitized} counter`)); + expect(output).toMatch(new RegExp(`^${sanitized} 7$`, "m")); + }), + ); + + it.effect("emits histogram bucket lines + +Inf + count + sum", () => + Effect.gen(function* () { + const name = uniqueName("pr_hist.ms"); + const histogram = Metric.histogram( + name, + MetricBoundaries.linear({ start: 0, width: 10, count: 3 }), + ); + yield* Metric.update(histogram, 5); + yield* Metric.update(histogram, 15); + + const output = renderPrometheus(); + const sanitized = name.replace(/\./g, "_"); + expect(output).toMatch(new RegExp(`# TYPE ${sanitized} histogram`)); + expect(output).toMatch( + new RegExp(`^${sanitized}_bucket{le="\\+Inf"} 2$`, "m"), + ); + expect(output).toMatch(new RegExp(`^${sanitized}_count 2$`, "m")); + expect(output).toMatch(new RegExp(`^${sanitized}_sum 20$`, "m")); + }), + ); + + it.effect("escapes label values with quotes + backslashes", () => + Effect.gen(function* () { + const name = uniqueName("pr_labeled"); + const counter = Metric.counter(name).pipe( + Metric.tagged("path", `github.io/search"broken\\"`), + ); + yield* Metric.update(counter, 1); + + const output = renderPrometheus(); + const sanitized = name.replace(/\./g, "_"); + // Label values preserve all characters except `\` → `\\`, `"` → `\"`, + // `\n` → `\n` (literal). Dots stay; only names are sanitized. + expect(output).toContain(`${sanitized}{path="github.io/search\\"broken\\\\\\""} 1`); + }), + ); + + it("emits a single trailing newline for empty snapshots when nothing ever registered", () => { + // Can't actually test the empty path in a shared process (other tests + // register metrics), so just assert the output always ends in \n. + const output = renderPrometheus(); + expect(output.endsWith("\n")).toBe(true); + }); +}); diff --git a/packages/core/api/src/metrics/prometheus.ts b/packages/core/api/src/metrics/prometheus.ts new file mode 100644 index 000000000..64090fe73 --- /dev/null +++ b/packages/core/api/src/metrics/prometheus.ts @@ -0,0 +1,186 @@ +// --------------------------------------------------------------------------- +// Serialize Effect's in-process `Metric.unsafeSnapshot()` into Prometheus +// text exposition format (spec: +// https://prometheus.io/docs/instrumenting/exposition_formats/). +// +// Powers `GET /api/metrics`. No external deps; the format is small enough +// to hand-roll, and keeping this in-house avoids pulling in a Prometheus +// client library for a feature that's secondary to the OTLP push path. +// --------------------------------------------------------------------------- + +import * as Metric from "effect/Metric"; +import type * as MetricKey from "effect/MetricKey"; +import * as MetricState from "effect/MetricState"; +import * as Option from "effect/Option"; + +/** + * Prometheus forbids `.` / `-` / other punctuation in metric names. + * Converts `executor.execution.duration_ms` → `executor_execution_duration_ms`. + */ +const sanitizeName = (raw: string): string => + raw.replace(/[^a-zA-Z0-9_:]/g, "_"); + +/** + * Prometheus label value escaping: backslash, double-quote, newline. + */ +const escapeLabelValue = (raw: string): string => + raw.replace(/\\/g, "\\\\").replace(/\n/g, "\\n").replace(/"/g, '\\"'); + +const formatLabels = ( + tags: ReadonlyArray<{ readonly key: string; readonly value: string }>, + extra?: Record, +): string => { + const pairs: string[] = []; + for (const tag of tags) { + pairs.push(`${sanitizeName(tag.key)}="${escapeLabelValue(tag.value)}"`); + } + if (extra) { + for (const [k, v] of Object.entries(extra)) { + pairs.push(`${sanitizeName(k)}="${escapeLabelValue(v)}"`); + } + } + return pairs.length === 0 ? "" : `{${pairs.join(",")}}`; +}; + +/** + * Format a number in Prometheus-friendly form. `Infinity` becomes `+Inf` / + * `-Inf`; `NaN` stays as `NaN`. Bigints flattened to numbers. + */ +const formatValue = (value: number | bigint): string => { + if (typeof value === "bigint") return value.toString(10); + if (value === Number.POSITIVE_INFINITY) return "+Inf"; + if (value === Number.NEGATIVE_INFINITY) return "-Inf"; + return String(value); +}; + +type PrometheusLine = string; + +/** + * Emit the `# HELP` + `# TYPE` header block for a given metric family, + * deduplicated by name (every data point of the same family shares one + * header, per Prometheus spec). + */ +const emitHeader = ( + name: string, + type: "counter" | "gauge" | "histogram" | "summary", + description: string, + seen: Set, +): PrometheusLine[] => { + if (seen.has(name)) return []; + seen.add(name); + const safeDescription = description + .replace(/\\/g, "\\\\") + .replace(/\n/g, "\\n"); + return [ + `# HELP ${name} ${safeDescription || "(no description)"}`, + `# TYPE ${name} ${type}`, + ]; +}; + +/** + * Serialize a single `MetricPair` (key + state) to zero or more + * Prometheus exposition lines. Behavior by state type: + * + * - `Counter` → single `{tags} ` line. + * - `Gauge` → single line, TYPE=gauge. + * - `Histogram` → one `_bucket{le="X"}` line per bucket (cumulative, + * Prometheus-style), plus `_bucket{le="+Inf"}`, `_count`, + * `_sum`. + * - `Frequency` → one counter row per occurrence key, with a synthetic + * `bucket` tag carrying the key. + * - `Summary` → `{quantile="X"}` for each quantile, plus + * `_count` and `_sum`. + */ +type SnapshotPair = { + readonly metricKey: MetricKey.MetricKey.Untyped; + readonly metricState: MetricState.MetricState.Untyped; +}; + +const formatPair = (pair: SnapshotPair, seen: Set): PrometheusLine[] => { + const rawName = pair.metricKey.name; + const name = sanitizeName(rawName); + const description = Option.getOrElse(pair.metricKey.description, () => ""); + const tags = pair.metricKey.tags; + const state = pair.metricState; + const lines: PrometheusLine[] = []; + + if (MetricState.isCounterState(state)) { + lines.push(...emitHeader(name, "counter", description, seen)); + lines.push(`${name}${formatLabels(tags)} ${formatValue(state.count)}`); + return lines; + } + + if (MetricState.isGaugeState(state)) { + lines.push(...emitHeader(name, "gauge", description, seen)); + lines.push(`${name}${formatLabels(tags)} ${formatValue(state.value)}`); + return lines; + } + + if (MetricState.isHistogramState(state)) { + lines.push(...emitHeader(name, "histogram", description, seen)); + // Effect's buckets are `[upperBound, cumulativeCount]`. Prometheus + // expects cumulative counts already; emit as-is. + for (const [upperBound, count] of state.buckets) { + lines.push( + `${name}_bucket${formatLabels(tags, { le: formatValue(upperBound) })} ${count}`, + ); + } + // +Inf bucket required by the spec; equal to total count. + lines.push( + `${name}_bucket${formatLabels(tags, { le: "+Inf" })} ${state.count}`, + ); + lines.push(`${name}_count${formatLabels(tags)} ${state.count}`); + lines.push(`${name}_sum${formatLabels(tags)} ${formatValue(state.sum)}`); + return lines; + } + + if (MetricState.isSummaryState(state)) { + lines.push(...emitHeader(name, "summary", description, seen)); + for (const [quantile, value] of state.quantiles) { + const numeric = Option.getOrElse(value, () => Number.NaN); + lines.push( + `${name}${formatLabels(tags, { quantile: formatValue(quantile) })} ${formatValue(numeric)}`, + ); + } + lines.push(`${name}_count${formatLabels(tags)} ${state.count}`); + lines.push(`${name}_sum${formatLabels(tags)} ${formatValue(state.sum)}`); + return lines; + } + + if (MetricState.isFrequencyState(state)) { + // Frequency isn't a first-class Prometheus type — represent it as a + // counter family with a synthetic `bucket` label so each occurrence + // becomes a distinct time series. + lines.push(...emitHeader(name, "counter", description, seen)); + for (const [bucket, count] of state.occurrences) { + lines.push( + `${name}${formatLabels(tags, { bucket })} ${count}`, + ); + } + return lines; + } + + // Unknown metric type — skip silently. Shouldn't happen with current + // Effect versions but future additions (e.g. exponential histograms) + // would land here. + return lines; +}; + +/** + * Render the current Effect metric snapshot as Prometheus exposition text. + * The snapshot is read via `Metric.unsafeSnapshot()` — synchronous, lives + * in the process-wide registry. + */ +export const renderPrometheus = (): string => { + const pairs = Metric.unsafeSnapshot(); + const seen = new Set(); + const lines: PrometheusLine[] = []; + + for (const pair of pairs) { + lines.push(...formatPair(pair, seen)); + } + + // Prometheus requires a trailing newline; most scrapers are tolerant + // but the spec is specific about it. + return lines.length === 0 ? "\n" : lines.join("\n") + "\n"; +}; diff --git a/packages/core/api/src/server.ts b/packages/core/api/src/server.ts index 0ed5ca2a9..257924203 100644 --- a/packages/core/api/src/server.ts +++ b/packages/core/api/src/server.ts @@ -6,4 +6,5 @@ export { SecretsHandlers, ScopeHandlers, ExecutionsHandlers, + MetricsHandlers, } from "./handlers";