From 5b7709702722af352128a40cc9cc22c3dab23b4f Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 20 Jun 2026 06:22:42 -0700 Subject: [PATCH 1/4] Preserve managed endpoint failure diagnostics Co-authored-by: codex --- .../src/cloud/ManagedEndpointRuntime.test.ts | 89 ++++++++++++++++--- .../src/cloud/ManagedEndpointRuntime.ts | 45 +++++++++- apps/server/src/cloud/config.ts | 2 +- 3 files changed, 118 insertions(+), 18 deletions(-) diff --git a/apps/server/src/cloud/ManagedEndpointRuntime.test.ts b/apps/server/src/cloud/ManagedEndpointRuntime.test.ts index e0d5924fcc2..1bdc66e5b70 100644 --- a/apps/server/src/cloud/ManagedEndpointRuntime.test.ts +++ b/apps/server/src/cloud/ManagedEndpointRuntime.test.ts @@ -4,6 +4,7 @@ import * as Deferred from "effect/Deferred"; import * as Effect from "effect/Effect"; import * as Fiber from "effect/Fiber"; import * as Layer from "effect/Layer"; +import * as Logger from "effect/Logger"; import * as Option from "effect/Option"; import * as PlatformError from "effect/PlatformError"; import * as Sink from "effect/Sink"; @@ -31,23 +32,26 @@ const relayClientAvailableLayer = Layer.succeed( const runtimeDependencies = ( spawner: ReturnType, relayClientLayer = relayClientAvailableLayer, + getSecret: ServerSecretStore.ServerSecretStore["Service"]["get"] = () => + Effect.succeed(Option.none()), ) => Layer.mergeAll( Layer.succeed(ChildProcessSpawner.ChildProcessSpawner, spawner), relayClientLayer, Layer.mock(ServerSecretStore.ServerSecretStore)({ - get: () => Effect.succeed(Option.none()), + get: getSecret, }), ); const buildCloudManagedEndpointRuntime = ( spawner: ReturnType, relayClientLayer = relayClientAvailableLayer, + getSecret?: ServerSecretStore.ServerSecretStore["Service"]["get"], ) => Effect.gen(function* () { const context = yield* Layer.build( ManagedEndpointRuntime.layer.pipe( - Layer.provide(runtimeDependencies(spawner, relayClientLayer)), + Layer.provide(runtimeDependencies(spawner, relayClientLayer, getSecret)), ), ); return yield* Effect.service(ManagedEndpointRuntime.CloudManagedEndpointRuntime).pipe( @@ -59,12 +63,13 @@ function makeHandle(input: { readonly pid: number; readonly onKill: () => void; readonly isRunning?: () => boolean; + readonly isRunningEffect?: ChildProcessSpawner.ChildProcessHandle["isRunning"]; readonly exitCode?: Effect.Effect; }) { return ChildProcessSpawner.makeHandle({ pid: ChildProcessSpawner.ProcessId(input.pid), exitCode: input.exitCode ?? Effect.never, - isRunning: Effect.sync(() => input.isRunning?.() ?? true), + isRunning: input.isRunningEffect ?? Effect.sync(() => input.isRunning?.() ?? true), kill: () => Effect.sync(() => { input.onKill(); @@ -193,18 +198,29 @@ describe("CloudManagedEndpointRuntime", () => { }), ); - it.effect("restarts the connector when the active process has exited", () => - Effect.gen(function* () { + it.effect("restarts after exit or a failed active-process probe", () => { + const logMessages: unknown[] = []; + const logger = Logger.make(({ message }) => { + logMessages.push(message); + }); + + return Effect.gen(function* () { const spawned: Array = []; const killed: Array = []; - let firstRunning = true; + const probeCause = PlatformError.systemError({ + _tag: "PermissionDenied", + module: "ChildProcess", + method: "isRunning", + description: "process state is unavailable", + }); + let activeProbe: ChildProcessSpawner.ChildProcessHandle["isRunning"] = Effect.succeed(true); const spawner = ChildProcessSpawner.make(() => Effect.gen(function* () { - const pid = spawned.length === 0 ? 300 : 301; + const pid = 300 + spawned.length; spawned.push(pid); const handle = makeHandle({ pid, - isRunning: () => (pid === 300 ? firstRunning : true), + isRunningEffect: Effect.suspend(() => activeProbe), onKill: () => { killed.push(pid); }, @@ -221,15 +237,30 @@ describe("CloudManagedEndpointRuntime", () => { }; const first = yield* runtime.applyConfig(config); - firstRunning = false; + activeProbe = Effect.succeed(false); const second = yield* runtime.applyConfig(config); + activeProbe = Effect.fail(probeCause); + const third = yield* runtime.applyConfig(config); expect(first).toMatchObject({ status: "running", pid: 300 }); expect(second).toMatchObject({ status: "running", pid: 301 }); - expect(spawned).toEqual([300, 301]); - expect(killed).toEqual([300]); - }), - ); + expect(third).toMatchObject({ status: "running", pid: 302 }); + expect(spawned).toEqual([300, 301, 302]); + expect(killed).toEqual([300, 301]); + + const warning = logMessages.find( + (message) => + Array.isArray(message) && message[0] === "Failed to inspect relay client process", + ); + expect(warning).toBeDefined(); + if (!Array.isArray(warning)) return; + expect(warning[1]).toMatchObject({ + pid: 301, + tunnelId: "tunnel-1", + }); + expect((warning[1] as { cause: unknown }).cause).toBe(probeCause); + }).pipe(Effect.provide(Logger.layer([logger], { mergeWithExisting: false }))); + }); it.effect("supervises the active connector and restarts it after process exit", () => Effect.gen(function* () { @@ -347,11 +378,43 @@ describe("CloudManagedEndpointRuntime", () => { expect(status).toMatchObject({ status: "failed", providerKind: "cloudflare_tunnel", + reason: "Failed to start the relay client.", tunnelId: "tunnel-1", }); }), ); + it.effect("retains malformed persisted runtime configuration diagnostics", () => { + const logMessages: unknown[] = []; + const logger = Logger.make(({ message }) => { + logMessages.push(message); + }); + const spawn = vi.fn(); + const spawner = ChildProcessSpawner.make(spawn); + + return Effect.gen(function* () { + yield* buildCloudManagedEndpointRuntime(spawner, relayClientAvailableLayer, () => + Effect.succeed(Option.some(new TextEncoder().encode("not-json"))), + ); + + expect(spawn).not.toHaveBeenCalled(); + const warning = logMessages.find( + (message) => + Array.isArray(message) && message[0] === "Failed to read managed endpoint runtime config", + ); + expect(warning).toBeDefined(); + if (!Array.isArray(warning)) return; + const cause = (warning[1] as { cause: unknown }).cause; + expect(cause).toMatchObject({ + _tag: "CloudManagedEndpointRuntimeConfigDecodeError", + resource: "cloud-endpoint-runtime-config", + }); + expect( + (cause as ManagedEndpointRuntime.CloudManagedEndpointRuntimeConfigDecodeError).cause, + ).toBeDefined(); + }).pipe(Effect.provide(Logger.layer([logger], { mergeWithExisting: false }))); + }); + it.effect("reports a missing relay client executable without spawning", () => Effect.gen(function* () { const spawn = vi.fn(); diff --git a/apps/server/src/cloud/ManagedEndpointRuntime.ts b/apps/server/src/cloud/ManagedEndpointRuntime.ts index a1d7112a929..1e5566b8c1b 100644 --- a/apps/server/src/cloud/ManagedEndpointRuntime.ts +++ b/apps/server/src/cloud/ManagedEndpointRuntime.ts @@ -7,6 +7,7 @@ import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; import * as Ref from "effect/Ref"; import * as Result from "effect/Result"; +import * as Schema from "effect/Schema"; import * as Semaphore from "effect/Semaphore"; import * as Scope from "effect/Scope"; import * as Stream from "effect/Stream"; @@ -16,6 +17,18 @@ import * as ChildProcessSpawner from "effect/unstable/process/ChildProcessSpawne import * as ServerSecretStore from "../auth/ServerSecretStore.ts"; import { CLOUD_ENDPOINT_RUNTIME_CONFIG, decodeRuntimeConfig } from "./config.ts"; +export class CloudManagedEndpointRuntimeConfigDecodeError extends Schema.TaggedErrorClass()( + "CloudManagedEndpointRuntimeConfigDecodeError", + { + resource: Schema.Literal(CLOUD_ENDPOINT_RUNTIME_CONFIG), + cause: Schema.Defect(), + }, +) { + override get message(): string { + return `Failed to decode managed endpoint runtime configuration from ${this.resource}.`; + } +} + function bytesToString(bytes: Uint8Array): string { return new TextDecoder().decode(bytes); } @@ -26,7 +39,15 @@ const readRuntimeConfig = Effect.gen(function* () { if (Option.isNone(bytes)) { return null; } - return Option.getOrNull(decodeRuntimeConfig(bytesToString(bytes.value))); + return yield* decodeRuntimeConfig(bytesToString(bytes.value)).pipe( + Effect.mapError( + (cause) => + new CloudManagedEndpointRuntimeConfigDecodeError({ + resource: CLOUD_ENDPOINT_RUNTIME_CONFIG, + cause, + }), + ), + ); }); export type CloudManagedEndpointRuntimeStatus = @@ -92,7 +113,14 @@ const stopConnector = (connector: ActiveConnector | null) => pid: Number(connector.child.pid), }), ), - Effect.ignore, + Effect.catchCause((cause) => + Effect.logWarning("Failed to stop relay client", { + cause, + pid: Number(connector.child.pid), + tunnelId: connector.config.tunnelId, + tunnelName: connector.config.tunnelName, + }), + ), ) : Effect.void; @@ -192,7 +220,16 @@ export const make = Effect.gen(function* () { const nextConfigKey = runtimeConfigKey(config); const active = yield* Ref.get(activeRef); if (active?.configKey === nextConfigKey) { - const isRunning = yield* active.child.isRunning.pipe(Effect.orElseSucceed(() => false)); + const isRunning = yield* active.child.isRunning.pipe( + Effect.catch((cause) => + Effect.logWarning("Failed to inspect relay client process", { + cause, + pid: Number(active.child.pid), + tunnelId: active.config.tunnelId, + tunnelName: active.config.tunnelName, + }).pipe(Effect.as(false)), + ), + ); if (isRunning) { return { status: "running", @@ -253,7 +290,7 @@ export const make = Effect.gen(function* () { Effect.as({ status: "failed", providerKind: "cloudflare_tunnel", - reason: String(cause), + reason: "Failed to start the relay client.", ...(config.tunnelId ? { tunnelId: config.tunnelId } : {}), ...(config.tunnelName ? { tunnelName: config.tunnelName } : {}), } satisfies CloudManagedEndpointRuntimeStatus), diff --git a/apps/server/src/cloud/config.ts b/apps/server/src/cloud/config.ts index f5642393abf..0f446e9b977 100644 --- a/apps/server/src/cloud/config.ts +++ b/apps/server/src/cloud/config.ts @@ -13,6 +13,6 @@ export const encodeEndpointRuntimeConfigJson = Schema.encodeEffect( Schema.fromJsonString(RelayManagedEndpointRuntimeConfig), ); -export const decodeRuntimeConfig = Schema.decodeUnknownOption( +export const decodeRuntimeConfig = Schema.decodeUnknownEffect( Schema.fromJsonString(RelayManagedEndpointRuntimeConfig), ); From bf555bd9fa8f0eb0c414b1cb534acede94d19cc2 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 20 Jun 2026 10:06:01 -0700 Subject: [PATCH 2/4] fix(server): redact managed endpoint output Co-authored-by: codex --- .../src/cloud/ManagedEndpointRuntime.test.ts | 64 ++++++++++++++++++- .../src/cloud/ManagedEndpointRuntime.ts | 3 +- 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/apps/server/src/cloud/ManagedEndpointRuntime.test.ts b/apps/server/src/cloud/ManagedEndpointRuntime.test.ts index 1bdc66e5b70..d54cab739dd 100644 --- a/apps/server/src/cloud/ManagedEndpointRuntime.test.ts +++ b/apps/server/src/cloud/ManagedEndpointRuntime.test.ts @@ -65,6 +65,7 @@ function makeHandle(input: { readonly isRunning?: () => boolean; readonly isRunningEffect?: ChildProcessSpawner.ChildProcessHandle["isRunning"]; readonly exitCode?: Effect.Effect; + readonly output?: string; }) { return ChildProcessSpawner.makeHandle({ pid: ChildProcessSpawner.ProcessId(input.pid), @@ -78,7 +79,10 @@ function makeHandle(input: { stdin: Sink.drain, stdout: Stream.empty, stderr: Stream.empty, - all: Stream.empty, + all: + input.output === undefined + ? Stream.empty + : Stream.make(new TextEncoder().encode(input.output)), getInputFd: () => Sink.drain, getOutputFd: () => Stream.empty, }); @@ -262,6 +266,64 @@ describe("CloudManagedEndpointRuntime", () => { }).pipe(Effect.provide(Logger.layer([logger], { mergeWithExisting: false }))); }); + it.effect("does not copy relay client output into log annotations", () => { + const connectorToken = "connector-token-sentinel"; + const signedUrl = "https://user:password@example.com/private?token=secret#fragment"; + const output = `ERR failed request ${signedUrl} ${connectorToken}`; + const logMessages: unknown[] = []; + let resolveObserved!: () => void; + const observed = new Promise((resolve) => { + resolveObserved = resolve; + }); + const logger = Logger.make(({ message }) => { + logMessages.push(message); + if (Array.isArray(message) && message[0] === "Relay client reported a transport warning") { + resolveObserved(); + } + }); + const spawner = ChildProcessSpawner.make(() => + Effect.gen(function* () { + const handle = makeHandle({ + pid: 350, + output, + onKill: () => undefined, + }); + yield* Effect.addFinalizer(() => handle.kill().pipe(Effect.ignore)); + return handle; + }), + ); + + return Effect.gen(function* () { + const runtime = yield* buildCloudManagedEndpointRuntime(spawner); + yield* runtime.applyConfig({ + providerKind: "cloudflare_tunnel", + connectorToken, + tunnelId: "tunnel-1", + }); + yield* Effect.promise(() => observed); + + const warning = logMessages.find( + (message) => + Array.isArray(message) && message[0] === "Relay client reported a transport warning", + ); + expect(warning).toBeDefined(); + if (!Array.isArray(warning)) return; + expect(warning[1]).toMatchObject({ + pid: 350, + tunnelId: "tunnel-1", + outputLength: output.length, + }); + expect(warning[1]).not.toHaveProperty("output"); + const diagnosticText = Object.values(warning[1] as Record) + .map(String) + .join("\n"); + expect(diagnosticText).not.toContain(connectorToken); + expect(diagnosticText).not.toContain(signedUrl); + expect(diagnosticText).not.toContain("user:password"); + expect(diagnosticText).not.toContain("token=secret"); + }).pipe(Effect.provide(Logger.layer([logger], { mergeWithExisting: false }))); + }); + it.effect("supervises the active connector and restarts it after process exit", () => Effect.gen(function* () { const spawned: Array = []; diff --git a/apps/server/src/cloud/ManagedEndpointRuntime.ts b/apps/server/src/cloud/ManagedEndpointRuntime.ts index 1e5566b8c1b..8114b95be1a 100644 --- a/apps/server/src/cloud/ManagedEndpointRuntime.ts +++ b/apps/server/src/cloud/ManagedEndpointRuntime.ts @@ -183,12 +183,11 @@ export const make = Effect.gen(function* () { Stream.map((line) => line.trim()), Stream.filter((line) => line.length > 0), Stream.runForEach((line) => { - const output = line.replaceAll(connector.config.connectorToken, ""); const attributes = { pid: Number(connector.child.pid), tunnelId: connector.config.tunnelId, tunnelName: connector.config.tunnelName, - output, + outputLength: line.length, }; switch (classifyRelayClientOutput(line)) { case "connected": From 2418121e05d1d4971f26c455573fc4dbc4f9a3e1 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 20 Jun 2026 11:00:40 -0700 Subject: [PATCH 3/4] fix(server): bound managed endpoint diagnostics Co-authored-by: codex --- .../src/cloud/ManagedEndpointRuntime.test.ts | 150 +++++++++++-- .../src/cloud/ManagedEndpointRuntime.ts | 197 ++++++++++++++---- 2 files changed, 288 insertions(+), 59 deletions(-) diff --git a/apps/server/src/cloud/ManagedEndpointRuntime.test.ts b/apps/server/src/cloud/ManagedEndpointRuntime.test.ts index d54cab739dd..a9e1eb8d378 100644 --- a/apps/server/src/cloud/ManagedEndpointRuntime.test.ts +++ b/apps/server/src/cloud/ManagedEndpointRuntime.test.ts @@ -1,5 +1,6 @@ import { describe, expect, it } from "@effect/vitest"; import { vi } from "vite-plus/test"; +import * as Cause from "effect/Cause"; import * as Deferred from "effect/Deferred"; import * as Effect from "effect/Effect"; import * as Fiber from "effect/Fiber"; @@ -66,6 +67,7 @@ function makeHandle(input: { readonly isRunningEffect?: ChildProcessSpawner.ChildProcessHandle["isRunning"]; readonly exitCode?: Effect.Effect; readonly output?: string; + readonly all?: ChildProcessSpawner.ChildProcessHandle["all"]; }) { return ChildProcessSpawner.makeHandle({ pid: ChildProcessSpawner.ProcessId(input.pid), @@ -80,9 +82,10 @@ function makeHandle(input: { stdout: Stream.empty, stderr: Stream.empty, all: - input.output === undefined + input.all ?? + (input.output === undefined ? Stream.empty - : Stream.make(new TextEncoder().encode(input.output)), + : Stream.make(new TextEncoder().encode(input.output))), getInputFd: () => Sink.drain, getOutputFd: () => Stream.empty, }); @@ -259,10 +262,52 @@ describe("CloudManagedEndpointRuntime", () => { expect(warning).toBeDefined(); if (!Array.isArray(warning)) return; expect(warning[1]).toMatchObject({ + errorTag: "PlatformError", + reasonTag: "PermissionDenied", + errorModule: "ChildProcess", + errorMethod: "isRunning", pid: 301, tunnelId: "tunnel-1", }); - expect((warning[1] as { cause: unknown }).cause).toBe(probeCause); + expect(warning[1]).not.toHaveProperty("cause"); + expect(warning[1]).not.toHaveProperty("description"); + }).pipe(Effect.provide(Logger.layer([logger], { mergeWithExisting: false }))); + }); + + it.effect("does not report connector scope interruption as a runtime failure", () => { + const logMessages: unknown[] = []; + const logger = Logger.make(({ message }) => { + logMessages.push(message); + }); + const spawner = ChildProcessSpawner.make(() => + Effect.gen(function* () { + const handle = makeHandle({ + pid: 325, + all: Stream.never, + onKill: () => undefined, + }); + yield* Effect.addFinalizer(() => handle.kill().pipe(Effect.ignore)); + return handle; + }), + ); + + return Effect.gen(function* () { + const runtime = yield* buildCloudManagedEndpointRuntime(spawner); + yield* runtime.applyConfig({ + providerKind: "cloudflare_tunnel", + connectorToken: "token", + tunnelId: "tunnel-1", + }); + yield* runtime.applyConfig(null); + + expect( + logMessages.some( + (message) => + Array.isArray(message) && + (message[0] === "Relay client supervisor failed" || + message[0] === "Relay client output observer failed"), + ), + ).toBe(false); }).pipe(Effect.provide(Logger.layer([logger], { mergeWithExisting: false }))); }); @@ -417,18 +462,21 @@ describe("CloudManagedEndpointRuntime", () => { }), ); - it.effect("reports connector spawn failures", () => - Effect.gen(function* () { - const spawner = ChildProcessSpawner.make(() => - Effect.fail( - PlatformError.systemError({ - _tag: "NotFound", - module: "ChildProcess", - method: "spawn", - description: "cloudflared missing", - }), - ), - ); + it.effect("reports connector spawn failures without logging nested details", () => { + const logMessages: unknown[] = []; + const logger = Logger.make(({ message }) => { + logMessages.push(message); + }); + const spawnCause = PlatformError.systemError({ + _tag: "NotFound", + module: "ChildProcess", + method: "spawn", + description: "private cloudflared path was missing", + cause: new Error("private nested spawn failure"), + }); + + return Effect.gen(function* () { + const spawner = ChildProcessSpawner.make(() => Effect.fail(spawnCause)); const runtime = yield* buildCloudManagedEndpointRuntime(spawner); const status = yield* runtime.applyConfig({ @@ -443,8 +491,23 @@ describe("CloudManagedEndpointRuntime", () => { reason: "Failed to start the relay client.", tunnelId: "tunnel-1", }); - }), - ); + const warning = logMessages.find( + (message) => Array.isArray(message) && message[0] === "Failed to start relay client", + ); + expect(warning).toEqual([ + "Failed to start relay client", + { + errorTag: "PlatformError", + reasonTag: "NotFound", + errorModule: "ChildProcess", + errorMethod: "spawn", + tunnelId: "tunnel-1", + tunnelName: undefined, + }, + ]); + expect(spawnCause.reason.cause).toBeInstanceOf(Error); + }).pipe(Effect.provide(Logger.layer([logger], { mergeWithExisting: false }))); + }); it.effect("retains malformed persisted runtime configuration diagnostics", () => { const logMessages: unknown[] = []; @@ -466,17 +529,58 @@ describe("CloudManagedEndpointRuntime", () => { ); expect(warning).toBeDefined(); if (!Array.isArray(warning)) return; - const cause = (warning[1] as { cause: unknown }).cause; - expect(cause).toMatchObject({ - _tag: "CloudManagedEndpointRuntimeConfigDecodeError", + expect(warning[1]).toMatchObject({ + errorTag: "CloudManagedEndpointRuntimeConfigDecodeError", resource: "cloud-endpoint-runtime-config", + causeTag: "SchemaError", }); - expect( - (cause as ManagedEndpointRuntime.CloudManagedEndpointRuntimeConfigDecodeError).cause, - ).toBeDefined(); + expect(warning[1]).not.toHaveProperty("cause"); }).pipe(Effect.provide(Logger.layer([logger], { mergeWithExisting: false }))); }); + it("retains exact causes on managed endpoint runtime configuration errors", () => { + const readCause = new Error("private read failure"); + const decodeCause = new Error("private decode failure"); + + expect( + new ManagedEndpointRuntime.CloudManagedEndpointRuntimeConfigReadError({ + resource: "cloud-endpoint-runtime-config", + cause: readCause, + }).cause, + ).toBe(readCause); + expect( + new ManagedEndpointRuntime.CloudManagedEndpointRuntimeConfigDecodeError({ + resource: "cloud-endpoint-runtime-config", + cause: decodeCause, + }).cause, + ).toBe(decodeCause); + }); + + it("summarizes managed endpoint causes without nested failure details", () => { + const nestedCause = new Error("private nested platform failure"); + const platformFailure = PlatformError.systemError({ + _tag: "PermissionDenied", + module: "ChildProcess", + method: "kill", + description: "private process description", + cause: nestedCause, + }); + const cause = Cause.combine( + Cause.fail(platformFailure), + Cause.die(new TypeError("private defect detail")), + ); + + expect(ManagedEndpointRuntime.managedEndpointCauseDiagnostics(cause)).toEqual({ + reasonCount: 2, + failureCount: 1, + failureTags: ["PlatformError"], + defectCount: 1, + defectTags: ["TypeError"], + interruptionCount: 0, + }); + expect(platformFailure.reason.cause).toBe(nestedCause); + }); + it.effect("reports a missing relay client executable without spawning", () => Effect.gen(function* () { const spawn = vi.fn(); diff --git a/apps/server/src/cloud/ManagedEndpointRuntime.ts b/apps/server/src/cloud/ManagedEndpointRuntime.ts index 8114b95be1a..db9c4f9b2da 100644 --- a/apps/server/src/cloud/ManagedEndpointRuntime.ts +++ b/apps/server/src/cloud/ManagedEndpointRuntime.ts @@ -1,10 +1,12 @@ import type { RelayManagedEndpointRuntimeConfig } from "@t3tools/contracts/relay"; import * as RelayClient from "@t3tools/shared/relayClient"; +import * as Cause from "effect/Cause"; import * as Context from "effect/Context"; import * as Effect from "effect/Effect"; import * as Exit from "effect/Exit"; import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; +import * as PlatformError from "effect/PlatformError"; import * as Ref from "effect/Ref"; import * as Result from "effect/Result"; import * as Schema from "effect/Schema"; @@ -17,6 +19,105 @@ import * as ChildProcessSpawner from "effect/unstable/process/ChildProcessSpawne import * as ServerSecretStore from "../auth/ServerSecretStore.ts"; import { CLOUD_ENDPOINT_RUNTIME_CONFIG, decodeRuntimeConfig } from "./config.ts"; +const MAX_CAUSE_DIAGNOSTIC_TAGS = 8; +const MAX_DIAGNOSTIC_VALUE_LENGTH = 128; + +function boundedDiagnosticValue(value: string): string { + return value.slice(0, MAX_DIAGNOSTIC_VALUE_LENGTH); +} + +function diagnosticValueTag(value: unknown): string { + try { + if ( + typeof value === "object" && + value !== null && + "_tag" in value && + typeof value._tag === "string" + ) { + return boundedDiagnosticValue(value._tag); + } + if (value instanceof Error) { + return boundedDiagnosticValue(value.name); + } + return typeof value; + } catch { + return "Uninspectable"; + } +} + +function addUniqueDiagnosticTag(tags: Array, tag: string): void { + if (tags.length < MAX_CAUSE_DIAGNOSTIC_TAGS && !tags.includes(tag)) { + tags.push(tag); + } +} + +export function managedEndpointCauseDiagnostics(cause: Cause.Cause) { + const failureTags: Array = []; + const defectTags: Array = []; + let failureCount = 0; + let defectCount = 0; + let interruptionCount = 0; + + for (const reason of cause.reasons) { + if (Cause.isFailReason(reason)) { + failureCount += 1; + addUniqueDiagnosticTag(failureTags, diagnosticValueTag(reason.error)); + continue; + } + if (Cause.isDieReason(reason)) { + defectCount += 1; + addUniqueDiagnosticTag(defectTags, diagnosticValueTag(reason.defect)); + continue; + } + interruptionCount += 1; + } + + return { + reasonCount: cause.reasons.length, + failureCount, + failureTags, + defectCount, + defectTags, + interruptionCount, + }; +} + +function logManagedEndpointCause( + message: string, + cause: Cause.Cause, + attributes: Readonly>, +) { + const interruptionReasons = cause.reasons.filter(Cause.isInterruptReason); + if (interruptionReasons.length > 0) { + return Effect.failCause(Cause.fromReasons(interruptionReasons)); + } + return Effect.logWarning(message, { + ...attributes, + ...managedEndpointCauseDiagnostics(cause), + }); +} + +function platformErrorDiagnostics(error: PlatformError.PlatformError) { + return { + errorTag: error._tag, + reasonTag: error.reason._tag, + errorModule: boundedDiagnosticValue(error.reason.module), + errorMethod: boundedDiagnosticValue(error.reason.method), + }; +} + +export class CloudManagedEndpointRuntimeConfigReadError extends Schema.TaggedErrorClass()( + "CloudManagedEndpointRuntimeConfigReadError", + { + resource: Schema.Literal(CLOUD_ENDPOINT_RUNTIME_CONFIG), + cause: Schema.Defect(), + }, +) { + override get message(): string { + return `Failed to read managed endpoint runtime configuration from ${this.resource}.`; + } +} + export class CloudManagedEndpointRuntimeConfigDecodeError extends Schema.TaggedErrorClass()( "CloudManagedEndpointRuntimeConfigDecodeError", { @@ -35,7 +136,15 @@ function bytesToString(bytes: Uint8Array): string { const readRuntimeConfig = Effect.gen(function* () { const secrets = yield* ServerSecretStore.ServerSecretStore; - const bytes = yield* secrets.get(CLOUD_ENDPOINT_RUNTIME_CONFIG); + const bytes = yield* secrets.get(CLOUD_ENDPOINT_RUNTIME_CONFIG).pipe( + Effect.mapError( + (cause) => + new CloudManagedEndpointRuntimeConfigReadError({ + resource: CLOUD_ENDPOINT_RUNTIME_CONFIG, + cause, + }), + ), + ); if (Option.isNone(bytes)) { return null; } @@ -114,8 +223,7 @@ const stopConnector = (connector: ActiveConnector | null) => }), ), Effect.catchCause((cause) => - Effect.logWarning("Failed to stop relay client", { - cause, + logManagedEndpointCause("Failed to stop relay client", cause, { pid: Number(connector.child.pid), tunnelId: connector.config.tunnelId, tunnelName: connector.config.tunnelName, @@ -165,7 +273,12 @@ export const make = Effect.gen(function* () { pid: Number(connector.child.pid), ...(Result.isSuccess(result) ? { exitCode: Number(result.success) } - : { cause: result.failure }), + : { + exitErrorTag: result.failure._tag, + exitReasonTag: result.failure.reason._tag, + exitErrorModule: boundedDiagnosticValue(result.failure.reason.module), + exitErrorMethod: boundedDiagnosticValue(result.failure.reason.method), + }), tunnelId: connector.config.tunnelId, tunnelName: connector.config.tunnelName, }); @@ -173,7 +286,9 @@ export const make = Effect.gen(function* () { }), ); }).pipe( - Effect.catchCause((cause) => Effect.logWarning("Relay client supervisor failed", { cause })), + Effect.catchCause((cause) => + logManagedEndpointCause("Relay client supervisor failed", cause, {}), + ), ); const observeConnectorOutput = (connector: ActiveConnector) => @@ -199,8 +314,7 @@ export const make = Effect.gen(function* () { } }), Effect.catchCause((cause) => - Effect.logWarning("Relay client output observer failed", { - cause, + logManagedEndpointCause("Relay client output observer failed", cause, { pid: Number(connector.child.pid), tunnelId: connector.config.tunnelId, tunnelName: connector.config.tunnelName, @@ -220,14 +334,15 @@ export const make = Effect.gen(function* () { const active = yield* Ref.get(activeRef); if (active?.configKey === nextConfigKey) { const isRunning = yield* active.child.isRunning.pipe( - Effect.catch((cause) => - Effect.logWarning("Failed to inspect relay client process", { - cause, - pid: Number(active.child.pid), - tunnelId: active.config.tunnelId, - tunnelName: active.config.tunnelName, - }).pipe(Effect.as(false)), - ), + Effect.catchTags({ + PlatformError: (error) => + Effect.logWarning("Failed to inspect relay client process", { + ...platformErrorDiagnostics(error), + pid: Number(active.child.pid), + tunnelId: active.config.tunnelId, + tunnelName: active.config.tunnelName, + }).pipe(Effect.as(false)), + }), ); if (isRunning) { return { @@ -279,22 +394,23 @@ export const make = Effect.gen(function* () { tunnelName: config.tunnelName, }), ), - Effect.catch((cause) => - Effect.logWarning("Failed to start relay client", { - cause, - tunnelId: config.tunnelId, - tunnelName: config.tunnelName, - }).pipe( - Effect.andThen(Scope.close(connectorScope, Exit.void).pipe(Effect.ignore)), - Effect.as({ - status: "failed", - providerKind: "cloudflare_tunnel", - reason: "Failed to start the relay client.", - ...(config.tunnelId ? { tunnelId: config.tunnelId } : {}), - ...(config.tunnelName ? { tunnelName: config.tunnelName } : {}), - } satisfies CloudManagedEndpointRuntimeStatus), - ), - ), + Effect.catchTags({ + PlatformError: (error) => + Effect.logWarning("Failed to start relay client", { + ...platformErrorDiagnostics(error), + tunnelId: config.tunnelId, + tunnelName: config.tunnelName, + }).pipe( + Effect.andThen(Scope.close(connectorScope, Exit.void).pipe(Effect.ignore)), + Effect.as({ + status: "failed", + providerKind: "cloudflare_tunnel", + reason: "Failed to start the relay client.", + ...(config.tunnelId ? { tunnelId: config.tunnelId } : {}), + ...(config.tunnelName ? { tunnelName: config.tunnelName } : {}), + } satisfies CloudManagedEndpointRuntimeStatus), + ), + }), ); if ("status" in child && child.status === "failed") { @@ -340,12 +456,21 @@ export const make = Effect.gen(function* () { applyConfig, }); + const recoverRuntimeConfigError = ( + error: + | CloudManagedEndpointRuntimeConfigReadError + | CloudManagedEndpointRuntimeConfigDecodeError, + ) => + Effect.logWarning("Failed to read managed endpoint runtime config", { + errorTag: error._tag, + resource: error.resource, + causeTag: diagnosticValueTag(error.cause), + }).pipe(Effect.as(null)); const initialConfig = yield* readRuntimeConfig.pipe( - Effect.catch((cause) => - Effect.logWarning("Failed to read managed endpoint runtime config", { cause }).pipe( - Effect.as(null), - ), - ), + Effect.catchTags({ + CloudManagedEndpointRuntimeConfigReadError: recoverRuntimeConfigError, + CloudManagedEndpointRuntimeConfigDecodeError: recoverRuntimeConfigError, + }), ); yield* runtime.applyConfig(initialConfig); yield* Effect.addFinalizer(() => runtime.applyConfig(null)); From 96f192e97a753340165f48454776550db67d07d8 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Sat, 20 Jun 2026 16:07:23 -0700 Subject: [PATCH 4/4] Preserve mixed managed endpoint diagnostics Co-authored-by: codex --- apps/server/src/cloud/ManagedEndpointRuntime.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/apps/server/src/cloud/ManagedEndpointRuntime.ts b/apps/server/src/cloud/ManagedEndpointRuntime.ts index db9c4f9b2da..a7454178bd6 100644 --- a/apps/server/src/cloud/ManagedEndpointRuntime.ts +++ b/apps/server/src/cloud/ManagedEndpointRuntime.ts @@ -88,13 +88,19 @@ function logManagedEndpointCause( attributes: Readonly>, ) { const interruptionReasons = cause.reasons.filter(Cause.isInterruptReason); - if (interruptionReasons.length > 0) { + if (interruptionReasons.length > 0 && interruptionReasons.length === cause.reasons.length) { return Effect.failCause(Cause.fromReasons(interruptionReasons)); } - return Effect.logWarning(message, { + const log = Effect.logWarning(message, { ...attributes, ...managedEndpointCauseDiagnostics(cause), }); + if (interruptionReasons.length > 0) { + return log.pipe( + Effect.andThen(Effect.failCause(Cause.fromReasons(interruptionReasons))), + ); + } + return log; } function platformErrorDiagnostics(error: PlatformError.PlatformError) {