Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion packages/client-runtime/src/state/assets.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,33 @@ import * as Layer from "effect/Layer";
import { Atom } from "effect/unstable/reactivity";

import type { EnvironmentRegistry } from "../connection/registry.ts";
import { createAssetEnvironmentAtoms } from "./assets.ts";
import {
createAssetEnvironmentAtoms,
InvalidAssetCollectionKeyError,
parseAssetCollectionKey,
} from "./assets.ts";

describe("asset collection keys", () => {
it("preserves malformed JSON and its native cause", () => {
const key = "not-json";
let error: unknown;

try {
parseAssetCollectionKey(key);
} catch (cause) {
error = cause;
}

expect(error).toBeInstanceOf(InvalidAssetCollectionKeyError);
expect(error).toMatchObject({ key, cause: expect.any(SyntaxError) });
});

it("rejects invalid asset collection shapes", () => {
const key = JSON.stringify(["environment-1", [{ _tag: "unknown" }]]);

expect(() => parseAssetCollectionKey(key)).toThrowError(InvalidAssetCollectionKeyError);
});
});

describe("createAssetEnvironmentAtoms", () => {
it("keys asset URL queries by environment and resource", () => {
Expand Down
32 changes: 29 additions & 3 deletions packages/client-runtime/src/state/assets.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { EnvironmentId, type AssetResource, WS_METHODS } from "@t3tools/contracts";
import { AssetResource, EnvironmentId, WS_METHODS } from "@t3tools/contracts";
import * as Schema from "effect/Schema";
import { Atom } from "effect/unstable/reactivity";

import type { EnvironmentRegistry } from "../connection/registry.ts";
Expand All @@ -8,6 +9,32 @@ const ASSET_URL_REFRESH_INTERVAL_MS = 30 * 60_000;
const ASSET_URL_STALE_TIME_MS = 5 * 60_000;
const ASSET_URL_IDLE_TTL_MS = 60 * 60_000;

export class InvalidAssetCollectionKeyError extends Schema.TaggedErrorClass<InvalidAssetCollectionKeyError>()(
"InvalidAssetCollectionKeyError",
{
key: Schema.String,
cause: Schema.Defect(),
},
) {
override get message(): string {
return `Invalid asset collection atom key: ${JSON.stringify(this.key)}.`;
}
}

const decodeAssetCollectionKey = Schema.decodeUnknownSync(
Schema.Tuple([EnvironmentId, Schema.Array(AssetResource)]),
);

export function parseAssetCollectionKey(
key: string,
): readonly [EnvironmentId, ReadonlyArray<AssetResource>] {
try {
return decodeAssetCollectionKey(JSON.parse(key));
} catch (cause) {
throw new InvalidAssetCollectionKeyError({ key, cause });
}
}

export function resolveAssetUrl(httpBaseUrl: string, relativeUrl: string): string | null {
try {
return new URL(relativeUrl, httpBaseUrl).toString();
Expand All @@ -27,8 +54,7 @@ export function createAssetEnvironmentAtoms<R, E>(
refreshIntervalMs: ASSET_URL_REFRESH_INTERVAL_MS,
});
const createUrlsFamily = Atom.family((key: string) => {
const [rawEnvironmentId, resources] = JSON.parse(key) as [string, ReadonlyArray<AssetResource>];
const environmentId = EnvironmentId.make(rawEnvironmentId);
const [environmentId, resources] = parseAssetCollectionKey(key);
return Atom.make((get) =>
resources.map((resource) =>
get(
Expand Down
58 changes: 58 additions & 0 deletions packages/client-runtime/src/state/entities.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ import * as Option from "effect/Option";
import { AsyncResult, Atom, AtomRegistry } from "effect/unstable/reactivity";

import { PrimaryConnectionTarget } from "../connection/model.ts";
import {
InvalidScopedProjectKeyError,
InvalidScopedProjectRefCollectionKeyError,
InvalidScopedThreadKeyError,
parseProjectKey,
parseProjectRefCollectionKey,
parseThreadKey,
} from "./entities.ts";
import type { EnvironmentShellState } from "./shell.ts";
import { EMPTY_ENVIRONMENT_THREAD_STATE, type EnvironmentThreadState } from "./threads.ts";
import { createEnvironmentProjectAtoms } from "./projectEntities.ts";
Expand All @@ -25,6 +33,56 @@ const OTHER_PROJECT_ID = ProjectId.make("project-2");
const THREAD_ID = ThreadId.make("thread-1");
const OTHER_THREAD_ID = ThreadId.make("thread-2");

describe("scoped entity keys", () => {
it("preserves an invalid project key as structured error data", () => {
const key = "missing-project-key-separator";
let error: unknown;

try {
parseProjectKey(key);
} catch (cause) {
error = cause;
}

expect(error).toEqual(new InvalidScopedProjectKeyError({ key }));
});

it("preserves an invalid thread key as structured error data", () => {
const key = "missing-thread-key-separator";
let error: unknown;

try {
parseThreadKey(key);
} catch (cause) {
error = cause;
}

expect(error).toEqual(new InvalidScopedThreadKeyError({ key }));
});

it("preserves malformed project reference collection input and its cause", () => {
const key = "not-json";
let error: unknown;

try {
parseProjectRefCollectionKey(key);
} catch (cause) {
error = cause;
}

expect(error).toBeInstanceOf(InvalidScopedProjectRefCollectionKeyError);
expect(error).toMatchObject({ key, cause: expect.anything() });
});

it("rejects invalid project reference collection shapes", () => {
const key = JSON.stringify([["environment-1"]]);

expect(() => parseProjectRefCollectionKey(key)).toThrowError(
InvalidScopedProjectRefCollectionKeyError,
);
});
});

const THREAD_SHELL = {
id: THREAD_ID,
projectId: PROJECT_ID,
Expand Down
50 changes: 47 additions & 3 deletions packages/client-runtime/src/state/entities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,45 @@ import {
type ScopedProjectRef,
type ScopedThreadRef,
} from "@t3tools/contracts";
import * as Schema from "effect/Schema";

export class InvalidScopedProjectKeyError extends Schema.TaggedErrorClass<InvalidScopedProjectKeyError>()(
"InvalidScopedProjectKeyError",
{
key: Schema.String,
},
) {
override get message(): string {
return `Invalid scoped project atom key: ${JSON.stringify(this.key)}.`;
}
}

export class InvalidScopedThreadKeyError extends Schema.TaggedErrorClass<InvalidScopedThreadKeyError>()(
"InvalidScopedThreadKeyError",
{
key: Schema.String,
},
) {
override get message(): string {
return `Invalid scoped thread atom key: ${JSON.stringify(this.key)}.`;
}
}

export class InvalidScopedProjectRefCollectionKeyError extends Schema.TaggedErrorClass<InvalidScopedProjectRefCollectionKeyError>()(
"InvalidScopedProjectRefCollectionKeyError",
{
key: Schema.String,
cause: Schema.Defect(),
},
) {
override get message(): string {
return `Invalid scoped project reference collection atom key: ${JSON.stringify(this.key)}.`;
}
}

const decodeProjectRefCollectionKey = Schema.decodeUnknownSync(
Schema.Array(Schema.Tuple([Schema.String, Schema.String])),
);

export function projectKey(ref: ScopedProjectRef): string {
return `${ref.environmentId}\u0000${ref.projectId}`;
Expand All @@ -21,7 +60,7 @@ export function projectRefCollectionKey(refs: ReadonlyArray<ScopedProjectRef>):
export function parseProjectKey(key: string): ScopedProjectRef {
const separator = key.indexOf("\u0000");
if (separator < 0) {
throw new Error("Invalid scoped project atom key.");
throw new InvalidScopedProjectKeyError({ key });
}
return {
environmentId: EnvironmentId.make(key.slice(0, separator)),
Expand All @@ -30,7 +69,12 @@ export function parseProjectKey(key: string): ScopedProjectRef {
}

export function parseProjectRefCollectionKey(key: string): ReadonlyArray<ScopedProjectRef> {
const entries = JSON.parse(key) as ReadonlyArray<readonly [string, string]>;
let entries: ReadonlyArray<readonly [string, string]>;
try {
entries = decodeProjectRefCollectionKey(JSON.parse(key));
} catch (cause) {
throw new InvalidScopedProjectRefCollectionKeyError({ key, cause });
}
return entries.map(([environmentId, projectId]) => ({
environmentId: EnvironmentId.make(environmentId),
projectId: ProjectId.make(projectId),
Expand All @@ -40,7 +84,7 @@ export function parseProjectRefCollectionKey(key: string): ReadonlyArray<ScopedP
export function parseThreadKey(key: string): ScopedThreadRef {
const separator = key.indexOf("\u0000");
if (separator < 0) {
throw new Error("Invalid scoped thread atom key.");
throw new InvalidScopedThreadKeyError({ key });
}
return {
environmentId: EnvironmentId.make(key.slice(0, separator)),
Expand Down
25 changes: 3 additions & 22 deletions packages/client-runtime/src/state/threads.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import {
EnvironmentId,
ORCHESTRATION_WS_METHODS,
ThreadId,
type EnvironmentId as EnvironmentIdType,
type OrchestrationThread,
type OrchestrationThreadStreamItem,
Expand All @@ -20,6 +18,7 @@ import { connectionProjectionPhase } from "../connection/model.ts";
import { EnvironmentSupervisor } from "../connection/supervisor.ts";
import { EnvironmentCacheStore } from "../platform/persistence.ts";
import { subscribe } from "../rpc/client.ts";
import { parseThreadKey, threadKey } from "./entities.ts";
import { applyThreadDetailEvent } from "./threadReducer.ts";
import { THREAD_STATE_IDLE_TTL_MS } from "./threadRetention.ts";
import { followStreamInEnvironment } from "./runtime.ts";
Expand Down Expand Up @@ -227,29 +226,11 @@ export function threadStateChanges(environmentId: EnvironmentIdType, threadId: T
);
}

function threadAtomKey(environmentId: EnvironmentIdType, threadId: ThreadIdType): string {
return `${environmentId}\u0000${threadId}`;
}

function parseThreadAtomKey(key: string): {
readonly environmentId: EnvironmentIdType;
readonly threadId: ThreadIdType;
} {
const separator = key.indexOf("\u0000");
if (separator < 0) {
throw new Error("Invalid environment thread atom key.");
}
return {
environmentId: EnvironmentId.make(key.slice(0, separator)),
threadId: ThreadId.make(key.slice(separator + 1)),
};
}

export function createEnvironmentThreadStateAtoms<R, E>(
runtime: Atom.AtomRuntime<EnvironmentRegistry | EnvironmentCacheStore | R, E>,
) {
const family = Atom.family((key: string) => {
const { environmentId, threadId } = parseThreadAtomKey(key);
const { environmentId, threadId } = parseThreadKey(key);
return runtime
.atom(threadStateChanges(environmentId, threadId), {
initialValue: EMPTY_ENVIRONMENT_THREAD_STATE,
Expand All @@ -262,7 +243,7 @@ export function createEnvironmentThreadStateAtoms<R, E>(

return {
stateAtom: (environmentId: EnvironmentIdType, threadId: ThreadIdType) =>
family(threadAtomKey(environmentId, threadId)),
family(threadKey({ environmentId, threadId })),
};
}

Expand Down
Loading