Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
285 changes: 257 additions & 28 deletions apps/server/src/cloud/ManagedEndpointRuntime.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { describe, expect, it } from "@effect/vitest";
import { vi } from "vite-plus/test";
import * as Cause from "effect/Cause";
import * as Deferred from "effect/Deferred";
import * as Effect from "effect/Effect";
import * as Fiber from "effect/Fiber";
import * as Layer from "effect/Layer";
import * as Logger from "effect/Logger";
import * as Option from "effect/Option";
import * as PlatformError from "effect/PlatformError";
import * as Sink from "effect/Sink";
Expand Down Expand Up @@ -31,23 +33,26 @@ const relayClientAvailableLayer = Layer.succeed(
const runtimeDependencies = (
spawner: ReturnType<typeof ChildProcessSpawner.make>,
relayClientLayer = relayClientAvailableLayer,
getSecret: ServerSecretStore.ServerSecretStore["Service"]["get"] = () =>
Effect.succeed(Option.none()),
) =>
Layer.mergeAll(
Layer.succeed(ChildProcessSpawner.ChildProcessSpawner, spawner),
relayClientLayer,
Layer.mock(ServerSecretStore.ServerSecretStore)({
get: () => Effect.succeed(Option.none()),
get: getSecret,
}),
);

const buildCloudManagedEndpointRuntime = (
spawner: ReturnType<typeof ChildProcessSpawner.make>,
relayClientLayer = relayClientAvailableLayer,
getSecret?: ServerSecretStore.ServerSecretStore["Service"]["get"],
) =>
Effect.gen(function* () {
const context = yield* Layer.build(
ManagedEndpointRuntime.layer.pipe(
Layer.provide(runtimeDependencies(spawner, relayClientLayer)),
Layer.provide(runtimeDependencies(spawner, relayClientLayer, getSecret)),
),
);
return yield* Effect.service(ManagedEndpointRuntime.CloudManagedEndpointRuntime).pipe(
Expand All @@ -59,12 +64,15 @@ function makeHandle(input: {
readonly pid: number;
readonly onKill: () => void;
readonly isRunning?: () => boolean;
readonly isRunningEffect?: ChildProcessSpawner.ChildProcessHandle["isRunning"];
readonly exitCode?: Effect.Effect<ChildProcessSpawner.ExitCode>;
readonly output?: string;
readonly all?: ChildProcessSpawner.ChildProcessHandle["all"];
}) {
return ChildProcessSpawner.makeHandle({
pid: ChildProcessSpawner.ProcessId(input.pid),
exitCode: input.exitCode ?? Effect.never,
isRunning: Effect.sync(() => input.isRunning?.() ?? true),
isRunning: input.isRunningEffect ?? Effect.sync(() => input.isRunning?.() ?? true),
kill: () =>
Effect.sync(() => {
input.onKill();
Expand All @@ -73,7 +81,11 @@ function makeHandle(input: {
stdin: Sink.drain,
stdout: Stream.empty,
stderr: Stream.empty,
all: Stream.empty,
all:
input.all ??
(input.output === undefined
? Stream.empty
: Stream.make(new TextEncoder().encode(input.output))),
getInputFd: () => Sink.drain,
getOutputFd: () => Stream.empty,
});
Expand Down Expand Up @@ -193,18 +205,29 @@ describe("CloudManagedEndpointRuntime", () => {
}),
);

it.effect("restarts the connector when the active process has exited", () =>
Effect.gen(function* () {
it.effect("restarts after exit or a failed active-process probe", () => {
const logMessages: unknown[] = [];
const logger = Logger.make(({ message }) => {
logMessages.push(message);
});

return Effect.gen(function* () {
const spawned: Array<number> = [];
const killed: Array<number> = [];
let firstRunning = true;
const probeCause = PlatformError.systemError({
_tag: "PermissionDenied",
module: "ChildProcess",
method: "isRunning",
description: "process state is unavailable",
});
let activeProbe: ChildProcessSpawner.ChildProcessHandle["isRunning"] = Effect.succeed(true);
const spawner = ChildProcessSpawner.make(() =>
Effect.gen(function* () {
const pid = spawned.length === 0 ? 300 : 301;
const pid = 300 + spawned.length;
spawned.push(pid);
const handle = makeHandle({
pid,
isRunning: () => (pid === 300 ? firstRunning : true),
isRunningEffect: Effect.suspend(() => activeProbe),
onKill: () => {
killed.push(pid);
},
Expand All @@ -221,15 +244,130 @@ describe("CloudManagedEndpointRuntime", () => {
};

const first = yield* runtime.applyConfig(config);
firstRunning = false;
activeProbe = Effect.succeed(false);
const second = yield* runtime.applyConfig(config);
activeProbe = Effect.fail(probeCause);
const third = yield* runtime.applyConfig(config);

expect(first).toMatchObject({ status: "running", pid: 300 });
expect(second).toMatchObject({ status: "running", pid: 301 });
expect(spawned).toEqual([300, 301]);
expect(killed).toEqual([300]);
}),
);
expect(third).toMatchObject({ status: "running", pid: 302 });
expect(spawned).toEqual([300, 301, 302]);
expect(killed).toEqual([300, 301]);

const warning = logMessages.find(
(message) =>
Array.isArray(message) && message[0] === "Failed to inspect relay client process",
);
expect(warning).toBeDefined();
if (!Array.isArray(warning)) return;
expect(warning[1]).toMatchObject({
errorTag: "PlatformError",
reasonTag: "PermissionDenied",
errorModule: "ChildProcess",
errorMethod: "isRunning",
pid: 301,
tunnelId: "tunnel-1",
});
expect(warning[1]).not.toHaveProperty("cause");
expect(warning[1]).not.toHaveProperty("description");
}).pipe(Effect.provide(Logger.layer([logger], { mergeWithExisting: false })));
});

it.effect("does not report connector scope interruption as a runtime failure", () => {
const logMessages: unknown[] = [];
const logger = Logger.make(({ message }) => {
logMessages.push(message);
});
const spawner = ChildProcessSpawner.make(() =>
Effect.gen(function* () {
const handle = makeHandle({
pid: 325,
all: Stream.never,
onKill: () => undefined,
});
yield* Effect.addFinalizer(() => handle.kill().pipe(Effect.ignore));
return handle;
}),
);

return Effect.gen(function* () {
const runtime = yield* buildCloudManagedEndpointRuntime(spawner);
yield* runtime.applyConfig({
providerKind: "cloudflare_tunnel",
connectorToken: "token",
tunnelId: "tunnel-1",
});
yield* runtime.applyConfig(null);

expect(
logMessages.some(
(message) =>
Array.isArray(message) &&
(message[0] === "Relay client supervisor failed" ||
message[0] === "Relay client output observer failed"),
),
).toBe(false);
}).pipe(Effect.provide(Logger.layer([logger], { mergeWithExisting: false })));
});

it.effect("does not copy relay client output into log annotations", () => {
const connectorToken = "connector-token-sentinel";
const signedUrl = "https://user:password@example.com/private?token=secret#fragment";
const output = `ERR failed request ${signedUrl} ${connectorToken}`;
const logMessages: unknown[] = [];
let resolveObserved!: () => void;
const observed = new Promise<void>((resolve) => {
resolveObserved = resolve;
});
const logger = Logger.make(({ message }) => {
logMessages.push(message);
if (Array.isArray(message) && message[0] === "Relay client reported a transport warning") {
resolveObserved();
}
});
const spawner = ChildProcessSpawner.make(() =>
Effect.gen(function* () {
const handle = makeHandle({
pid: 350,
output,
onKill: () => undefined,
});
yield* Effect.addFinalizer(() => handle.kill().pipe(Effect.ignore));
return handle;
}),
);

return Effect.gen(function* () {
const runtime = yield* buildCloudManagedEndpointRuntime(spawner);
yield* runtime.applyConfig({
providerKind: "cloudflare_tunnel",
connectorToken,
tunnelId: "tunnel-1",
});
yield* Effect.promise(() => observed);

const warning = logMessages.find(
(message) =>
Array.isArray(message) && message[0] === "Relay client reported a transport warning",
);
expect(warning).toBeDefined();
if (!Array.isArray(warning)) return;
expect(warning[1]).toMatchObject({
pid: 350,
tunnelId: "tunnel-1",
outputLength: output.length,
});
expect(warning[1]).not.toHaveProperty("output");
const diagnosticText = Object.values(warning[1] as Record<string, unknown>)
.map(String)
.join("\n");
expect(diagnosticText).not.toContain(connectorToken);
expect(diagnosticText).not.toContain(signedUrl);
expect(diagnosticText).not.toContain("user:password");
expect(diagnosticText).not.toContain("token=secret");
}).pipe(Effect.provide(Logger.layer([logger], { mergeWithExisting: false })));
});

it.effect("supervises the active connector and restarts it after process exit", () =>
Effect.gen(function* () {
Expand Down Expand Up @@ -324,18 +462,21 @@ describe("CloudManagedEndpointRuntime", () => {
}),
);

it.effect("reports connector spawn failures", () =>
Effect.gen(function* () {
const spawner = ChildProcessSpawner.make(() =>
Effect.fail(
PlatformError.systemError({
_tag: "NotFound",
module: "ChildProcess",
method: "spawn",
description: "cloudflared missing",
}),
),
);
it.effect("reports connector spawn failures without logging nested details", () => {
const logMessages: unknown[] = [];
const logger = Logger.make(({ message }) => {
logMessages.push(message);
});
const spawnCause = PlatformError.systemError({
_tag: "NotFound",
module: "ChildProcess",
method: "spawn",
description: "private cloudflared path was missing",
cause: new Error("private nested spawn failure"),
});

return Effect.gen(function* () {
const spawner = ChildProcessSpawner.make(() => Effect.fail(spawnCause));
const runtime = yield* buildCloudManagedEndpointRuntime(spawner);

const status = yield* runtime.applyConfig({
Expand All @@ -347,10 +488,98 @@ describe("CloudManagedEndpointRuntime", () => {
expect(status).toMatchObject({
status: "failed",
providerKind: "cloudflare_tunnel",
reason: "Failed to start the relay client.",
tunnelId: "tunnel-1",
});
}),
);
const warning = logMessages.find(
(message) => Array.isArray(message) && message[0] === "Failed to start relay client",
);
expect(warning).toEqual([
"Failed to start relay client",
{
errorTag: "PlatformError",
reasonTag: "NotFound",
errorModule: "ChildProcess",
errorMethod: "spawn",
tunnelId: "tunnel-1",
tunnelName: undefined,
},
]);
expect(spawnCause.reason.cause).toBeInstanceOf(Error);
}).pipe(Effect.provide(Logger.layer([logger], { mergeWithExisting: false })));
});

it.effect("retains malformed persisted runtime configuration diagnostics", () => {
const logMessages: unknown[] = [];
const logger = Logger.make(({ message }) => {
logMessages.push(message);
});
const spawn = vi.fn();
const spawner = ChildProcessSpawner.make(spawn);

return Effect.gen(function* () {
yield* buildCloudManagedEndpointRuntime(spawner, relayClientAvailableLayer, () =>
Effect.succeed(Option.some(new TextEncoder().encode("not-json"))),
);

expect(spawn).not.toHaveBeenCalled();
const warning = logMessages.find(
(message) =>
Array.isArray(message) && message[0] === "Failed to read managed endpoint runtime config",
);
expect(warning).toBeDefined();
if (!Array.isArray(warning)) return;
expect(warning[1]).toMatchObject({
errorTag: "CloudManagedEndpointRuntimeConfigDecodeError",
resource: "cloud-endpoint-runtime-config",
causeTag: "SchemaError",
});
expect(warning[1]).not.toHaveProperty("cause");
}).pipe(Effect.provide(Logger.layer([logger], { mergeWithExisting: false })));
});

it("retains exact causes on managed endpoint runtime configuration errors", () => {
const readCause = new Error("private read failure");
const decodeCause = new Error("private decode failure");

expect(
new ManagedEndpointRuntime.CloudManagedEndpointRuntimeConfigReadError({
resource: "cloud-endpoint-runtime-config",
cause: readCause,
}).cause,
).toBe(readCause);
expect(
new ManagedEndpointRuntime.CloudManagedEndpointRuntimeConfigDecodeError({
resource: "cloud-endpoint-runtime-config",
cause: decodeCause,
}).cause,
).toBe(decodeCause);
});

it("summarizes managed endpoint causes without nested failure details", () => {
const nestedCause = new Error("private nested platform failure");
const platformFailure = PlatformError.systemError({
_tag: "PermissionDenied",
module: "ChildProcess",
method: "kill",
description: "private process description",
cause: nestedCause,
});
const cause = Cause.combine(
Cause.fail(platformFailure),
Cause.die(new TypeError("private defect detail")),
);

expect(ManagedEndpointRuntime.managedEndpointCauseDiagnostics(cause)).toEqual({
reasonCount: 2,
failureCount: 1,
failureTags: ["PlatformError"],
defectCount: 1,
defectTags: ["TypeError"],
interruptionCount: 0,
});
expect(platformFailure.reason.cause).toBe(nestedCause);
});

it.effect("reports a missing relay client executable without spawning", () =>
Effect.gen(function* () {
Expand Down
Loading
Loading