Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions apps/cloud/src/api.request-scope.node.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// ---------------------------------------------------------------------------
// Regression for https://github.com/RhysSullivan/executor/pull/468 — the
// cloud API v4 routing refactor wired DbService.Live (and other I/O-holding
// services) into `Layer.provideMerge` of an `HttpRouter.toWebHandler` app.
// `toWebHandler` builds the layer ONCE at worker boot and reuses the
// resolved Context for every request, so `Effect.acquireRelease` runs only
// at boot. On Cloudflare Workers that means the postgres.js socket (a
// `Writable` I/O object) is opened in request 1's context and reused by
// request 2, which the runtime forbids:
//
// StorageError: [storage-drizzle] findMany select failed:
// Cannot perform I/O on behalf of a different request. (I/O type: Writable)
//
// The only primitive that actually rebuilds per request is a custom
// `HttpRouter.middleware` whose per-request handler does
// `Layer.build(layer)` inside `Effect.scoped`. `provideMerge` runs the
// layer at boot; `HttpRouter.provideRequest` (despite its name) also runs
// the layer at boot — its `Layer.build` lives in the *outer* middleware
// effect, which executes at layer-construction time. Only an explicit
// `Effect.scoped` inside the per-request handler creates a fresh scope
// for `acquireRelease`.
// ---------------------------------------------------------------------------

import { describe, it, expect } from "@effect/vitest";
import { Context, Effect, Layer } from "effect";
import { HttpRouter, HttpServer, HttpServerResponse } from "effect/unstable/http";

import { RequestScopedServicesLive } from "./api/layers";
import { requestScopedMiddleware } from "./api/request-scoped";
import { makeApiLive } from "./api/router";

class Counter extends Context.Service<
Counter,
{ readonly id: number }
>()("test/Counter") {}

const makeCounterLive = (counts: { acquires: number; releases: number }) =>
Layer.effect(Counter)(
Effect.acquireRelease(
Effect.sync(() => {
counts.acquires += 1;
return { id: counts.acquires };
}),
() =>
Effect.sync(() => {
counts.releases += 1;
}),
),
);

const Routes = HttpRouter.add(
"GET",
"/",
Effect.gen(function* () {
const c = yield* Counter;
return HttpServerResponse.jsonUnsafe({ id: c.id });
}),
);

describe("HttpRouter.toWebHandler request scoping", () => {
it("Layer.provideMerge of a scoped layer captures the boot scope (the bug)", async () => {
const counts = { acquires: 0, releases: 0 };
const App = Routes.pipe(
Layer.provideMerge(makeCounterLive(counts)),
Layer.provideMerge(HttpServer.layerServices),
);
const handler = HttpRouter.toWebHandler(App, { disableLogger: true })
.handler;

const a = await handler(new Request("http://test.local/"));
const b = await handler(new Request("http://test.local/"));

// Same id => the resource was acquired once at boot and shared.
// On Cloudflare Workers this is the I/O-isolation crash mode.
expect(await a.json()).toEqual({ id: 1 });
expect(await b.json()).toEqual({ id: 1 });
expect(counts.acquires).toBe(1);
});

it("HttpRouter.provideRequest is misleadingly named — it also captures boot scope", async () => {
const counts = { acquires: 0, releases: 0 };
const App = Routes.pipe(
HttpRouter.provideRequest(makeCounterLive(counts)),
Layer.provideMerge(HttpServer.layerServices),
);
const handler = HttpRouter.toWebHandler(App, { disableLogger: true })
.handler;

const a = await handler(new Request("http://test.local/"));
const b = await handler(new Request("http://test.local/"));

// `provideRequest` runs `Layer.build` in the OUTER middleware effect,
// which fires at layer-construction time — same lifetime as the boot
// scope. Both requests see the same acquired resource.
expect(await a.json()).toEqual({ id: 1 });
expect(await b.json()).toEqual({ id: 1 });
expect(counts.acquires).toBe(1);
});

it("requestScopedMiddleware runs acquireRelease per request (the fix)", async () => {
const counts = { acquires: 0, releases: 0 };
const App = Routes.pipe(
Layer.provide(requestScopedMiddleware(makeCounterLive(counts)).layer),
Layer.provideMerge(HttpServer.layerServices),
);
const handler = HttpRouter.toWebHandler(App, { disableLogger: true })
.handler;

const a = await handler(new Request("http://test.local/"));
const b = await handler(new Request("http://test.local/"));

expect(await a.json()).toEqual({ id: 1 });
expect(await b.json()).toEqual({ id: 2 });
expect(counts.acquires).toBe(2);
expect(counts.releases).toBe(2);
});
});

// ---------------------------------------------------------------------------
// Regression test against the prod handler factory. If anyone reverts
// `makeApiLive` back to wiring `RequestScopedServicesLive` via
// `Layer.provideMerge`, this test fails — the counter only increments
// once at boot instead of once per request.
// ---------------------------------------------------------------------------

describe("makeApiLive (prod handler factory) request scoping", () => {
it("rebuilds RequestScopedServicesLive per request", async () => {
const counts = { acquires: 0, releases: 0 };
// Wrap the real per-request layer with an `acquireRelease` counter.
// `requestScopedMiddleware` calls `Layer.build` per request, so this
// counter increments per request iff the wiring is correct.
const trackedRsLive = Layer.effectDiscard(
Effect.acquireRelease(
Effect.sync(() => {
counts.acquires += 1;
}),
() =>
Effect.sync(() => {
counts.releases += 1;
}),
),
).pipe(Layer.provideMerge(RequestScopedServicesLive));

const handler = HttpRouter.toWebHandler(makeApiLive(trackedRsLive), {
disableLogger: true,
}).handler;

// Hit a protected route. ExecutionStackMiddleware short-circuits with
// 403 (no session cookie) but not before `requestScopedMiddleware`
// has built the per-request layer. We don't care about the response —
// only that the layer was built once per request.
await handler(new Request("http://test.local/scope"));
await handler(new Request("http://test.local/scope"));

expect(counts.acquires).toBe(2);
expect(counts.releases).toBe(2);
});
});
51 changes: 41 additions & 10 deletions apps/cloud/src/api/layers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { OrgHandlers } from "../org/handlers";

import { CoreSharedServices } from "./core-shared-services";
import { ProtectedCloudApi, RouterConfig } from "./protected-layers";
import { requestScopedMiddleware } from "./request-scoped";

export {
CoreSharedServices,
Expand All @@ -26,24 +27,54 @@ export {
const DbLive = DbService.Live;
const UserStoreLive = UserStoreService.Live.pipe(Layer.provide(DbLive));

export const SharedServices = Layer.mergeAll(
DbLive,
UserStoreLive,
// Per-request layer. Anything that opens an I/O object (postgres.js socket,
// fetch stream readers, anything backed by a `Writable`) MUST live here —
// `provideRequestScoped` rebuilds it per request so Cloudflare Workers'
// I/O isolation is satisfied. See `api.request-scope.test.ts`.
export const RequestScopedServicesLive = Layer.mergeAll(DbLive, UserStoreLive);

// Boot-scoped layer. Built once at worker boot, reused across requests.
// Safe for config, in-memory caches, the global tracer provider, and
// stateless service shells.
export const BootSharedServices = Layer.mergeAll(
CoreSharedServices,
HttpServer.layerServices,
TelemetryLive,
);

// Routes that don't require an authenticated org session — login,
// callbacks, etc. Mounts at the paths declared inside `NonProtectedApi`.
export const NonProtectedApiLive = HttpApiBuilder.layer(NonProtectedApi).pipe(
Layer.provide(Layer.mergeAll(CloudAuthPublicHandlers, CloudSessionAuthHandlers)),
Layer.provideMerge(SessionAuthLive),
);
//
// `rsLive` is the per-request DB layer. It's passed in as a parameter so
// tests can substitute a counting fake for `DbService.Live` and assert
// per-request semantics. Handlers here yield `UserStoreService` directly;
// without per-request scoping the postgres.js socket pins to the worker's
// boot scope and Cloudflare Workers' I/O isolation kills the second
// request.
export const makeNonProtectedApiLive = (
rsLive: Layer.Layer<DbService | UserStoreService>,
) =>
HttpApiBuilder.layer(NonProtectedApi).pipe(
Layer.provide(Layer.mergeAll(CloudAuthPublicHandlers, CloudSessionAuthHandlers)),
Layer.provide(requestScopedMiddleware(rsLive).layer),
Layer.provideMerge(SessionAuthLive),
);

// Routes scoped to a specific org (membership management, switching, etc.).
// Auth is enforced by `OrgAuth` middleware declared on `OrgHttpApi`.
export const OrgApiLive = HttpApiBuilder.layer(OrgHttpApi).pipe(
Layer.provide(OrgHandlers),
Layer.provideMerge(OrgAuthLive),
export const makeOrgApiLive = (
rsLive: Layer.Layer<DbService | UserStoreService>,
) =>
HttpApiBuilder.layer(OrgHttpApi).pipe(
Layer.provide(OrgHandlers),
Layer.provide(requestScopedMiddleware(rsLive).layer),
Layer.provideMerge(OrgAuthLive),
);

// Default exports use the production per-request layer. Existing callers
// that import `NonProtectedApiLive`/`OrgApiLive` continue to work; the
// `make*` factories exist for tests that need to swap in a fake.
export const NonProtectedApiLive = makeNonProtectedApiLive(
RequestScopedServicesLive,
);
export const OrgApiLive = makeOrgApiLive(RequestScopedServicesLive);
64 changes: 49 additions & 15 deletions apps/cloud/src/api/protected.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import { AutumnService } from "../services/autumn";
import { DbService } from "../services/db";
import { makeExecutionStack } from "../services/execution-stack";
import { HttpResponseError } from "./error-response";
import { RequestScopedServicesLive } from "./layers";
import {
ProtectedCloudApi,
ProtectedCloudApiLive,
RouterConfig,
} from "./protected-layers";
import { requestScopedMiddleware } from "./request-scoped";

// One `HttpRouter` middleware that:
// 1. authenticates the WorkOS sealed session,
Expand All @@ -48,6 +50,29 @@ import {
// (see `HttpResponseError` in `./error-response.ts`). Letting `unhandled`
// pass through is what satisfies `HttpRouter.middleware`'s brand check
// without any type casts.

// One `HttpRouter` middleware that:
// 1. authenticates the WorkOS sealed session,
// 2. verifies live org membership (closes the JWT-cache gap — see
// `auth/authorize-organization.ts`),
// 3. resolves the org name,
// 4. builds the per-request executor + engine,
// 5. provides `AuthContext` + the execution-stack services to the handler.
//
// Replaces both the old outer `Effect.gen` in this file (which did its own
// WorkOS lookup) and the per-route `OrgAuth` HttpApiMiddleware (which did
// a second one).
//
// Errors are NOT caught here: failures propagate as typed errors and are
// rendered to a JSON response by the framework's `Respondable` pipeline
// (see `HttpResponseError` in `./error-response.ts`). Letting `unhandled`
// pass through is what satisfies `HttpRouter.middleware`'s brand check
// without any type casts.
//
// `DbService` and `UserStoreService` are pulled from per-request context
// — `RequestScopedServicesMiddleware` (combined below) provides them
// fresh per request so the postgres.js socket lives in the request
// fiber's scope, not the worker's boot scope.
const ExecutionStackMiddleware = HttpRouter.middleware<{
provides:
| AuthContext
Expand All @@ -57,15 +82,8 @@ const ExecutionStackMiddleware = HttpRouter.middleware<{
| McpExtensionService
| GraphqlExtensionService;
}>()(
// Layer-time setup — capture the long-lived services in a closure so
// the per-request function only needs `HttpRouter`-Provided context.
// That collapses the middleware's `requires` to `never`, giving us a
// real `.layer` (instead of the "Need to .combine(...)" type-error
// sentinel that fires when `requires` leaks to non-never).
Effect.gen(function* () {
const context = yield* Effect.context<
WorkOSAuth | UserStoreService | AutumnService | DbService
>();
const longLived = yield* Effect.context<WorkOSAuth | AutumnService>();
return (httpEffect) =>
Effect.gen(function* () {
const request = yield* HttpServerRequest.HttpServerRequest;
Expand Down Expand Up @@ -103,12 +121,28 @@ const ExecutionStackMiddleware = HttpRouter.middleware<{
Effect.provideService(McpExtensionService, executor.mcp),
Effect.provideService(GraphqlExtensionService, executor.graphql),
);
}).pipe(Effect.provideContext(context));
}).pipe(Effect.provideContext(longLived));
}),
).layer;

export const ProtectedApiLive = ProtectedCloudApiLive.pipe(
Layer.provide(ExecutionStackMiddleware),
Layer.provideMerge(HttpApiSwagger.layer(ProtectedCloudApi, { path: "/docs" })),
Layer.provideMerge(RouterConfig),
);

// `rsLive` is the per-request DB layer. Combining it into the auth
// middleware collapses `requires: DbService | UserStoreService` to
// never (so `.layer` is a real Layer instead of the "Need to combine"
// type-error sentinel) AND makes the postgres.js socket request-scoped:
// the layer rebuilds per HTTP request, satisfying Cloudflare Workers'
// I/O isolation. Exposed as a factory so tests can swap in a counting
// fake — see `apps/cloud/src/api.request-scope.node.test.ts`.
export const makeProtectedApiLive = (
rsLive: Layer.Layer<DbService | UserStoreService>,
) => {
const protectedMiddleware = ExecutionStackMiddleware.combine(
requestScopedMiddleware(rsLive),
).layer;
return ProtectedCloudApiLive.pipe(
Layer.provide(protectedMiddleware),
Layer.provideMerge(HttpApiSwagger.layer(ProtectedCloudApi, { path: "/docs" })),
Layer.provideMerge(RouterConfig),
);
};

export const ProtectedApiLive = makeProtectedApiLive(RequestScopedServicesLive);
47 changes: 47 additions & 0 deletions apps/cloud/src/api/request-scoped.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// ---------------------------------------------------------------------------
// Per-request layer provisioning for `HttpRouter.toWebHandler`
// ---------------------------------------------------------------------------
//
// `HttpRouter.toWebHandler` builds the application layer once into a
// boot-scoped `Context` and reuses it for every request, so any
// `Effect.acquireRelease` inside that layer fires once at worker boot.
// On Cloudflare Workers a postgres.js socket (a `Writable` I/O object)
// opened during request 1 cannot be touched from request 2 — the
// runtime throws "Cannot perform I/O on behalf of a different request".
//
// `Layer.provideMerge` and (despite the name) `HttpRouter.provideRequest`
// both build the inner layer at construction time. The only primitive
// that actually rebuilds per request is a router middleware whose
// per-request handler calls `Layer.build(layer)` inside `Effect.scoped`,
// so `acquireRelease` fires per request and finalizers run when the
// request fiber's scope closes.
//
// See `apps/cloud/src/api.request-scope.node.test.ts` for the regression
// coverage that pins this rule down.
// ---------------------------------------------------------------------------

import { Effect, Layer } from "effect";
import { HttpRouter } from "effect/unstable/http";

/**
* Build an `HttpRouter.middleware` that provides `layer`'s services to
* each request. The layer is rebuilt per HTTP request so
* `Effect.acquireRelease` fires per request and is released when the
* request fiber's scope closes.
*
* The returned value is a `Middleware`. Use `.layer` to apply it as a
* standalone layer; use `.combine(other)` to fold it into another
* middleware whose per-request body needs services this layer provides
* (e.g. `ExecutionStackMiddleware`'s auth logic that yields
* `DbService` + `UserStoreService` — combining drops those from the
* outer middleware's `requires`).
*/
export const requestScopedMiddleware = <A>(layer: Layer.Layer<A>) =>
HttpRouter.middleware<{ provides: A }>()((httpEffect) =>
Effect.scoped(
Effect.gen(function* () {
const services = yield* Layer.build(layer);
return yield* Effect.provideContext(httpEffect, services);
}),
),
);
Loading
Loading