Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 71 additions & 31 deletions apps/server/src/cloud/CliTokenManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import * as Clock from "effect/Clock";
import * as Console from "effect/Console";
import * as Context from "effect/Context";
import * as Crypto from "effect/Crypto";
import * as Data from "effect/Data";
import * as Deferred from "effect/Deferred";
import * as Duration from "effect/Duration";
import * as Effect from "effect/Effect";
Expand All @@ -15,10 +14,12 @@ import * as Layer from "effect/Layer";
import * as Option from "effect/Option";
import * as Schema from "effect/Schema";
import * as Semaphore from "effect/Semaphore";
import * as HttpClient from "effect/unstable/http/HttpClient";
import * as HttpClientRequest from "effect/unstable/http/HttpClientRequest";
import * as HttpClientResponse from "effect/unstable/http/HttpClientResponse";
import * as HttpRouter from "effect/unstable/http/HttpRouter";
import * as HttpServerRequest from "effect/unstable/http/HttpServerRequest";
import * as HttpServerResponse from "effect/unstable/http/HttpServerResponse";
import { HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http";

import * as ServerSecretStore from "../auth/ServerSecretStore.ts";
import { cloudCliOAuthConfig, type CloudCliOAuthConfig } from "./publicConfig.ts";
Expand All @@ -45,35 +46,74 @@ const OAuthTokenResponse = Schema.Struct({
token_type: Schema.String,
});

export class CloudCliTokenManagerError extends Data.TaggedError("CloudCliTokenManagerError")<{
readonly message: string;
readonly cause?: unknown;
}> {}
export class CloudCliCredentialRemovalError extends Schema.TaggedErrorClass<CloudCliCredentialRemovalError>()(
"CloudCliCredentialRemovalError",
{ cause: Schema.Defect() },
) {
override get message(): string {
return "Could not remove the stored T3 Connect CLI credential.";
}
}

export class CloudCliCredentialRefreshError extends Schema.TaggedErrorClass<CloudCliCredentialRefreshError>()(
"CloudCliCredentialRefreshError",
{ cause: Schema.Defect() },
) {
override get message(): string {
return "Could not refresh the T3 Connect CLI credential.";
}
}

export class CloudCliCredentialReadError extends Schema.TaggedErrorClass<CloudCliCredentialReadError>()(
"CloudCliCredentialReadError",
{ cause: Schema.Defect() },
) {
override get message(): string {
return "Could not read the stored T3 Connect CLI credential.";
}
}

export class CloudCliAuthorizationError extends Schema.TaggedErrorClass<CloudCliAuthorizationError>()(
"CloudCliAuthorizationError",
{ cause: Schema.Defect() },
) {
override get message(): string {
return "Could not authorize the T3 Connect CLI.";
}
}

export interface CloudCliTokenManagerShape {
readonly get: Effect.Effect<PersistedToken, CloudCliTokenManagerError>;
readonly getExisting: Effect.Effect<Option.Option<PersistedToken>, CloudCliTokenManagerError>;
readonly hasCredential: Effect.Effect<boolean, CloudCliTokenManagerError>;
readonly clear: Effect.Effect<void, CloudCliTokenManagerError>;
export class CloudCliAuthorizationTimeoutError extends Schema.TaggedErrorClass<CloudCliAuthorizationTimeoutError>()(
"CloudCliAuthorizationTimeoutError",
{ cause: Schema.Defect() },
) {
override get message(): string {
return "Timed out waiting for T3 Connect authorization.";
}
}

export const CloudCliTokenManagerError = Schema.Union([
CloudCliCredentialRemovalError,
CloudCliCredentialRefreshError,
CloudCliCredentialReadError,
CloudCliAuthorizationError,
CloudCliAuthorizationTimeoutError,
]);
export type CloudCliTokenManagerError = typeof CloudCliTokenManagerError.Type;

export class CloudCliTokenManager extends Context.Service<
CloudCliTokenManager,
CloudCliTokenManagerShape
{
readonly get: Effect.Effect<PersistedToken, CloudCliTokenManagerError>;
readonly getExisting: Effect.Effect<Option.Option<PersistedToken>, CloudCliTokenManagerError>;
readonly hasCredential: Effect.Effect<boolean, CloudCliTokenManagerError>;
readonly clear: Effect.Effect<void, CloudCliTokenManagerError>;
}
>()("t3/cloud/CliTokenManager/CloudCliTokenManager") {}

const wrapError =
(message: string) =>
<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, CloudCliTokenManagerError, R> =>
effect.pipe(
Effect.mapError(
(cause) =>
new CloudCliTokenManagerError({
message,
cause,
}),
),
);
<WrappedError extends CloudCliTokenManagerError>(makeError: (cause: unknown) => WrappedError) =>
<A, E, R>(effect: Effect.Effect<A, E, R>): Effect.Effect<A, WrappedError, R> =>
effect.pipe(Effect.mapError(makeError));

function stringToBytes(value: string): Uint8Array {
return new TextEncoder().encode(value);
Expand All @@ -83,7 +123,7 @@ function bytesToString(value: Uint8Array): string {
return new TextDecoder().decode(value);
}

const make = Effect.gen(function* () {
export const make = Effect.gen(function* () {
const crypto = yield* Crypto.Crypto;
const httpClient = (yield* HttpClient.HttpClient).pipe(HttpClient.filterStatusOk);
const secrets = yield* ServerSecretStore.ServerSecretStore;
Expand All @@ -96,7 +136,7 @@ const make = Effect.gen(function* () {

const clear = secrets
.remove(CLOUD_CLI_OAUTH_TOKEN_SECRET)
.pipe(wrapError("Could not remove the stored T3 Connect CLI credential."));
.pipe(wrapError((cause) => new CloudCliCredentialRemovalError({ cause })));

const read = Effect.fn("cloud.cli_token.read")(function* () {
const encoded = yield* secrets.get(CLOUD_CLI_OAUTH_TOKEN_SECRET);
Expand Down Expand Up @@ -185,10 +225,10 @@ const make = Effect.gen(function* () {
yield* Console.log(`Open this URL to authorize T3 Connect:\n${authorizationUrl.toString()}\n`);
const code = yield* Deferred.await(callback).pipe(
Effect.timeout(CLOUD_CLI_OAUTH_CALLBACK_TIMEOUT),
Effect.catchTag("TimeoutError", () =>
Effect.catchTag("TimeoutError", (cause) =>
Effect.fail(
new CloudCliTokenManagerError({
message: "Timed out waiting for T3 Connect authorization.",
new CloudCliAuthorizationTimeoutError({
cause,
}),
),
),
Expand All @@ -213,12 +253,12 @@ const make = Effect.gen(function* () {
});

const getExisting = semaphore.withPermits(1)(
getExistingNoLock().pipe(wrapError("Could not refresh the T3 Connect CLI credential.")),
getExistingNoLock().pipe(wrapError((cause) => new CloudCliCredentialRefreshError({ cause }))),
);
const hasCredential = semaphore.withPermits(1)(
read().pipe(
Effect.map(Option.isSome),
wrapError("Could not read the stored T3 Connect CLI credential."),
wrapError((cause) => new CloudCliCredentialReadError({ cause })),
),
);
const get = semaphore.withPermits(1)(
Expand All @@ -227,7 +267,7 @@ const make = Effect.gen(function* () {
return Option.isSome(token)
? token.value
: yield* Effect.scoped(login()).pipe(Effect.flatMap(persist));
}).pipe(wrapError("Could not authorize the T3 Connect CLI.")),
}).pipe(wrapError((cause) => new CloudCliAuthorizationError({ cause }))),
);

return CloudCliTokenManager.of({ get, getExisting, hasCredential, clear });
Expand Down
98 changes: 53 additions & 45 deletions apps/server/src/cloud/ManagedEndpointRuntime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import * as Deferred from "effect/Deferred";
import * as Effect from "effect/Effect";
import * as Fiber from "effect/Fiber";
import * as Layer from "effect/Layer";
import * as Option from "effect/Option";
import * as PlatformError from "effect/PlatformError";
import * as Sink from "effect/Sink";
import * as Stream from "effect/Stream";
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process";
import * as RelayClient from "@t3tools/shared/relayClient";

import {
classifyRelayClientOutput,
makeCloudManagedEndpointRuntime,
} from "./ManagedEndpointRuntime.ts";
import * as ServerSecretStore from "../auth/ServerSecretStore.ts";
import * as ManagedEndpointRuntime from "./ManagedEndpointRuntime.ts";

const relayClientAvailableLayer = Layer.succeed(
RelayClient.RelayClient,
Expand All @@ -29,12 +28,33 @@ const relayClientAvailableLayer = Layer.succeed(
}),
);

const runtimeDependencies = (spawner: ReturnType<typeof ChildProcessSpawner.make>) =>
const runtimeDependencies = (
spawner: ReturnType<typeof ChildProcessSpawner.make>,
relayClientLayer = relayClientAvailableLayer,
) =>
Layer.mergeAll(
Layer.succeed(ChildProcessSpawner.ChildProcessSpawner, spawner),
relayClientAvailableLayer,
relayClientLayer,
Layer.mock(ServerSecretStore.ServerSecretStore)({
get: () => Effect.succeed(Option.none()),
}),
);

const buildCloudManagedEndpointRuntime = (
spawner: ReturnType<typeof ChildProcessSpawner.make>,
relayClientLayer = relayClientAvailableLayer,
) =>
Effect.gen(function* () {
const context = yield* Layer.build(
ManagedEndpointRuntime.layer.pipe(
Layer.provide(runtimeDependencies(spawner, relayClientLayer)),
),
);
return yield* Effect.service(ManagedEndpointRuntime.CloudManagedEndpointRuntime).pipe(
Effect.provide(context),
);
});

function makeHandle(input: {
readonly pid: number;
readonly onKill: () => void;
Expand Down Expand Up @@ -62,16 +82,20 @@ function makeHandle(input: {
describe("CloudManagedEndpointRuntime", () => {
it("classifies Cloudflare connection and warning output", () => {
expect(
classifyRelayClientOutput(
ManagedEndpointRuntime.classifyRelayClientOutput(
"2026-06-17T02:00:00Z INF Registered tunnel connection connIndex=0",
),
).toBe("connected");
expect(
classifyRelayClientOutput("2026-06-17T02:00:00Z ERR Failed to serve tunnel connection"),
ManagedEndpointRuntime.classifyRelayClientOutput(
"2026-06-17T02:00:00Z ERR Failed to serve tunnel connection",
),
).toBe("warning");
expect(classifyRelayClientOutput("2026-06-17T02:00:00Z INF Starting metrics server")).toBe(
"debug",
);
expect(
ManagedEndpointRuntime.classifyRelayClientOutput(
"2026-06-17T02:00:00Z INF Starting metrics server",
),
).toBe("debug");
});

it.effect("starts, deduplicates, rotates, and stops the Cloudflare connector", () =>
Expand All @@ -97,9 +121,7 @@ describe("CloudManagedEndpointRuntime", () => {
return handle;
}),
);
const runtime = yield* makeCloudManagedEndpointRuntime.pipe(
Effect.provide(runtimeDependencies(spawner)),
);
const runtime = yield* buildCloudManagedEndpointRuntime(spawner);

yield* runtime.applyConfig({
providerKind: "cloudflare_tunnel",
Expand Down Expand Up @@ -154,9 +176,7 @@ describe("CloudManagedEndpointRuntime", () => {
return handle;
}),
);
const runtime = yield* makeCloudManagedEndpointRuntime.pipe(
Effect.provide(runtimeDependencies(spawner)),
);
const runtime = yield* buildCloudManagedEndpointRuntime(spawner);

const started = yield* runtime.applyConfig({
providerKind: "cloudflare_tunnel",
Expand Down Expand Up @@ -193,9 +213,7 @@ describe("CloudManagedEndpointRuntime", () => {
return handle;
}),
);
const runtime = yield* makeCloudManagedEndpointRuntime.pipe(
Effect.provide(runtimeDependencies(spawner)),
);
const runtime = yield* buildCloudManagedEndpointRuntime(spawner);
const config = {
providerKind: "cloudflare_tunnel" as const,
connectorToken: "token",
Expand Down Expand Up @@ -240,9 +258,7 @@ describe("CloudManagedEndpointRuntime", () => {
return handle;
}),
);
const runtime = yield* makeCloudManagedEndpointRuntime.pipe(
Effect.provide(runtimeDependencies(spawner)),
);
const runtime = yield* buildCloudManagedEndpointRuntime(spawner);

const started = yield* runtime.applyConfig({
providerKind: "cloudflare_tunnel",
Expand Down Expand Up @@ -282,9 +298,7 @@ describe("CloudManagedEndpointRuntime", () => {
return handle;
}),
);
const runtime = yield* makeCloudManagedEndpointRuntime.pipe(
Effect.provide(runtimeDependencies(spawner)),
);
const runtime = yield* buildCloudManagedEndpointRuntime(spawner);

const first = yield* runtime
.applyConfig({
Expand Down Expand Up @@ -322,9 +336,7 @@ describe("CloudManagedEndpointRuntime", () => {
}),
),
);
const runtime = yield* makeCloudManagedEndpointRuntime.pipe(
Effect.provide(runtimeDependencies(spawner)),
);
const runtime = yield* buildCloudManagedEndpointRuntime(spawner);

const status = yield* runtime.applyConfig({
providerKind: "cloudflare_tunnel",
Expand All @@ -344,22 +356,18 @@ describe("CloudManagedEndpointRuntime", () => {
Effect.gen(function* () {
const spawn = vi.fn();
const spawner = ChildProcessSpawner.make(spawn);
const runtime = yield* makeCloudManagedEndpointRuntime.pipe(
Effect.provide(
Layer.mergeAll(
Layer.succeed(ChildProcessSpawner.ChildProcessSpawner, spawner),
Layer.succeed(
RelayClient.RelayClient,
RelayClient.RelayClient.of({
resolve: Effect.succeed({
status: "missing",
version: RelayClient.CLOUDFLARED_VERSION,
}),
install: Effect.die("unused"),
installWithProgress: () => Effect.die("unused"),
}),
),
),
const runtime = yield* buildCloudManagedEndpointRuntime(
spawner,
Layer.succeed(
RelayClient.RelayClient,
RelayClient.RelayClient.of({
resolve: Effect.succeed({
status: "missing",
version: RelayClient.CLOUDFLARED_VERSION,
}),
install: Effect.die("unused"),
installWithProgress: () => Effect.die("unused"),
}),
),
);

Expand Down
Loading
Loading