Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/local/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
},
"dependencies": {
"@effect-atom/atom-react": "^0.5.0",
"@effect/opentelemetry": "^0.63.0",
"@effect/platform": "catalog:",
"@effect/platform-node": "catalog:",
"@executor/api": "workspace:*",
Expand Down
6 changes: 6 additions & 0 deletions apps/local/src/server/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import {
import { getExecutor } from "./executor";
import { createMcpRequestHandler, type McpRequestHandler } from "./mcp";
import { ErrorCaptureLive } from "./observability";
import { startMetricsExport } from "./telemetry";

// ---------------------------------------------------------------------------
// Local server API — core + all plugin groups
Expand Down Expand Up @@ -91,6 +92,11 @@ export const createServerHandlers = async (): Promise<ServerHandlers> => {
const executor = await getExecutor();
const engine = createExecutionEngine({ executor, codeExecutor: makeQuickJsExecutor() });

// Boot the OTLP metrics exporter if `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT`
// is set. No-op otherwise — the in-process metric registry still
// accumulates counters/histograms for `GET /api/metrics`.
await Effect.runPromise(startMetricsExport());

// Handlers wrap their own bodies with `capture(...)` — the edge
// translation lives per-handler, not at service construction.
const pluginExtensions = Layer.mergeAll(
Expand Down
70 changes: 70 additions & 0 deletions apps/local/src/server/telemetry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// ---------------------------------------------------------------------------
// Metrics export for the local daemon.
//
// Gated on `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT` (OTel standard env var).
// When set, wires `@effect/opentelemetry/OtlpMetrics` into a module-scope
// `ManagedRuntime` so the exporter's timer fiber keeps ticking across
// requests — per-request scoping would shut down the exporter before the
// first batch leaves.
//
// When unset, `MetricsRuntime` is `null` and nothing pushes; the
// in-process registry keeps accumulating counters + histograms so
// `GET /api/metrics` still serves a complete snapshot on demand.
//
// Auth: `OTEL_EXPORTER_OTLP_METRICS_HEADERS` follows the OTel spec
// format — comma-separated `key=value` pairs. For Axiom specifically,
// that's `Authorization=Bearer xxx,X-Axiom-Dataset=executor-local`.
// ---------------------------------------------------------------------------

import * as FetchHttpClient from "@effect/platform/FetchHttpClient";
import * as OtlpMetrics from "@effect/opentelemetry/OtlpMetrics";
import * as OtlpSerialization from "@effect/opentelemetry/OtlpSerialization";
import { Effect, Layer, ManagedRuntime } from "effect";

const SERVICE_NAME = "executor-local";
const SERVICE_VERSION = "1.0.0";

const parseHeaders = (raw: string | undefined): Record<string, string> => {
if (!raw) return {};
const out: Record<string, string> = {};
for (const entry of raw.split(",")) {
const idx = entry.indexOf("=");
if (idx === -1) continue;
const key = entry.slice(0, idx).trim();
const value = entry.slice(idx + 1).trim();
if (key) out[key] = value;
}
return out;
};

const endpoint = process.env.OTEL_EXPORTER_OTLP_METRICS_ENDPOINT;
const headers = parseHeaders(process.env.OTEL_EXPORTER_OTLP_METRICS_HEADERS);

/**
* Module-scope runtime holding the OTLP exporter. Never disposed for the
* daemon's lifetime — the exporter's fiber ticks every `exportInterval`
* (10s default) regardless of per-request activity. Set to `null` when
* no push endpoint is configured, making every `useMetrics` call a no-op.
*/
export const MetricsRuntime = endpoint
? ManagedRuntime.make(
OtlpMetrics.layer({
url: endpoint,
resource: { serviceName: SERVICE_NAME, serviceVersion: SERVICE_VERSION },
headers,
}).pipe(
Layer.provide(OtlpSerialization.layerJson),
Layer.provide(FetchHttpClient.layer),
),
)
: null;

/**
* Idempotent startup hook. Call once from the daemon's main() to force
* the module-scope runtime to boot (which starts the exporter's push
* timer). No-op when OTLP is not configured.
*/
export const startMetricsExport = (): Effect.Effect<void> =>
MetricsRuntime
? Effect.promise(() => MetricsRuntime!.runPromise(Effect.void))
: Effect.void;
1 change: 1 addition & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages/core/api/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { SourcesApi } from "./sources/api";
import { SecretsApi } from "./secrets/api";
import { ConnectionsApi } from "./connections/api";
import { ExecutionsApi } from "./executions/api";
import { MetricsApi } from "./metrics/api";
import { ScopeApi } from "./scope/api";

export const CoreExecutorApi = HttpApi.make("executor")
Expand All @@ -14,6 +15,7 @@ export const CoreExecutorApi = HttpApi.make("executor")
.add(SecretsApi)
.add(ConnectionsApi)
.add(ExecutionsApi)
.add(MetricsApi)
.add(ScopeApi)
.annotateContext(
OpenApi.annotations({
Expand Down
3 changes: 3 additions & 0 deletions packages/core/api/src/handlers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import { SecretsHandlers } from "./secrets";
import { ConnectionsHandlers } from "./connections";
import { ScopeHandlers } from "./scope";
import { ExecutionsHandlers } from "./executions";
import { MetricsHandlers } from "./metrics";

export { ToolsHandlers } from "./tools";
export { SourcesHandlers } from "./sources";
export { SecretsHandlers } from "./secrets";
export { ConnectionsHandlers } from "./connections";
export { ScopeHandlers } from "./scope";
export { ExecutionsHandlers } from "./executions";
export { MetricsHandlers } from "./metrics";

export const CoreHandlers = Layer.mergeAll(
ToolsHandlers,
Expand All @@ -21,4 +23,5 @@ export const CoreHandlers = Layer.mergeAll(
ConnectionsHandlers,
ScopeHandlers,
ExecutionsHandlers,
MetricsHandlers,
);
12 changes: 12 additions & 0 deletions packages/core/api/src/handlers/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { HttpApiBuilder } from "@effect/platform";
import { Effect } from "effect";

import { ExecutorApi } from "../api";
import { renderPrometheus } from "../metrics/prometheus";

// `renderPrometheus` reads the process-wide Effect metric registry via
// `Metric.unsafeSnapshot()`. Synchronous, side-effect-free, no executor
// services required — handler is minimal.
export const MetricsHandlers = HttpApiBuilder.group(ExecutorApi, "metrics", (handlers) =>
handlers.handle("scrape", () => Effect.sync(() => renderPrometheus())),
);
1 change: 1 addition & 0 deletions packages/core/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export { SourcesApi } from "./sources/api";
export { SecretsApi } from "./secrets/api";
export { ConnectionsApi } from "./connections/api";
export { ExecutionsApi } from "./executions/api";
export { MetricsApi } from "./metrics/api";
export { ScopeApi } from "./scope/api";
export {
InternalError,
Expand Down
22 changes: 22 additions & 0 deletions packages/core/api/src/metrics/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { HttpApiEndpoint, HttpApiGroup, HttpApiSchema } from "@effect/platform";

import { InternalError } from "../observability";

// ---------------------------------------------------------------------------
// Prometheus text exposition on `GET /api/metrics`.
//
// Returns the current in-process `Effect.Metric` snapshot in the spec
// format: https://prometheus.io/docs/instrumenting/exposition_formats/.
//
// Mount notes for hosts:
// - Self-hosted local daemon: mount unconditionally (operator can scrape).
// - Cloud: mount under the protected API so each org only sees their own
// metrics. The core API group doesn't carry auth middleware itself;
// the host app composes that above.
// ---------------------------------------------------------------------------

const PrometheusResponse = HttpApiSchema.Text({ contentType: "text/plain" });

export class MetricsApi extends HttpApiGroup.make("metrics")
.add(HttpApiEndpoint.get("scrape")`/metrics`.addSuccess(PrometheusResponse))
.addError(InternalError) {}
74 changes: 74 additions & 0 deletions packages/core/api/src/metrics/prometheus.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { describe, expect, it } from "@effect/vitest";
import { Effect, Metric, MetricBoundaries } from "effect";

import { renderPrometheus } from "./prometheus";

// ---------------------------------------------------------------------------
// Integration-style tests against Effect's live `Metric.unsafeSnapshot()`
// registry. Each test stamps a metric with a unique name so assertions
// don't collide with values accumulated in prior tests — the registry
// is process-wide and not reset between tests.
// ---------------------------------------------------------------------------

const uniqueName = (prefix: string) =>
`${prefix}_${Math.random().toString(36).slice(2, 10)}`;

describe("renderPrometheus", () => {
it.effect("emits counter HELP + TYPE + sanitized name", () =>
Effect.gen(function* () {
const name = uniqueName("pr_counter.ticks");
const counter = Metric.counter(name, { description: "example counter" });
yield* Metric.update(counter, 7);

const output = renderPrometheus();
const sanitized = name.replace(/\./g, "_");
expect(output).toMatch(new RegExp(`# HELP ${sanitized} example counter`));
expect(output).toMatch(new RegExp(`# TYPE ${sanitized} counter`));
expect(output).toMatch(new RegExp(`^${sanitized} 7$`, "m"));
}),
);

it.effect("emits histogram bucket lines + +Inf + count + sum", () =>
Effect.gen(function* () {
const name = uniqueName("pr_hist.ms");
const histogram = Metric.histogram(
name,
MetricBoundaries.linear({ start: 0, width: 10, count: 3 }),
);
yield* Metric.update(histogram, 5);
yield* Metric.update(histogram, 15);

const output = renderPrometheus();
const sanitized = name.replace(/\./g, "_");
expect(output).toMatch(new RegExp(`# TYPE ${sanitized} histogram`));
expect(output).toMatch(
new RegExp(`^${sanitized}_bucket{le="\\+Inf"} 2$`, "m"),
);
expect(output).toMatch(new RegExp(`^${sanitized}_count 2$`, "m"));
expect(output).toMatch(new RegExp(`^${sanitized}_sum 20$`, "m"));
}),
);

it.effect("escapes label values with quotes + backslashes", () =>
Effect.gen(function* () {
const name = uniqueName("pr_labeled");
const counter = Metric.counter(name).pipe(
Metric.tagged("path", `github.io/search"broken\\"`),
);
yield* Metric.update(counter, 1);

const output = renderPrometheus();
const sanitized = name.replace(/\./g, "_");
// Label values preserve all characters except `\` → `\\`, `"` → `\"`,
// `\n` → `\n` (literal). Dots stay; only names are sanitized.
expect(output).toContain(`${sanitized}{path="github.io/search\\"broken\\\\\\""} 1`);
}),
);

it("emits a single trailing newline for empty snapshots when nothing ever registered", () => {
// Can't actually test the empty path in a shared process (other tests
// register metrics), so just assert the output always ends in \n.
const output = renderPrometheus();
expect(output.endsWith("\n")).toBe(true);
});
});
Loading
Loading