diff --git a/apps/cloud/src/api.request-scope.node.test.ts b/apps/cloud/src/api.request-scope.node.test.ts new file mode 100644 index 000000000..6783baf43 --- /dev/null +++ b/apps/cloud/src/api.request-scope.node.test.ts @@ -0,0 +1,158 @@ +// --------------------------------------------------------------------------- +// Regression for https://github.com/RhysSullivan/executor/pull/468 — the +// cloud API v4 routing refactor wired DbService.Live (and other I/O-holding +// services) into `Layer.provideMerge` of an `HttpRouter.toWebHandler` app. +// `toWebHandler` builds the layer ONCE at worker boot and reuses the +// resolved Context for every request, so `Effect.acquireRelease` runs only +// at boot. On Cloudflare Workers that means the postgres.js socket (a +// `Writable` I/O object) is opened in request 1's context and reused by +// request 2, which the runtime forbids: +// +// StorageError: [storage-drizzle] findMany select failed: +// Cannot perform I/O on behalf of a different request. (I/O type: Writable) +// +// The only primitive that actually rebuilds per request is a custom +// `HttpRouter.middleware` whose per-request handler does +// `Layer.build(layer)` inside `Effect.scoped`. `provideMerge` runs the +// layer at boot; `HttpRouter.provideRequest` (despite its name) also runs +// the layer at boot — its `Layer.build` lives in the *outer* middleware +// effect, which executes at layer-construction time. Only an explicit +// `Effect.scoped` inside the per-request handler creates a fresh scope +// for `acquireRelease`. +// --------------------------------------------------------------------------- + +import { describe, it, expect } from "@effect/vitest"; +import { Context, Effect, Layer } from "effect"; +import { HttpRouter, HttpServer, HttpServerResponse } from "effect/unstable/http"; + +import { RequestScopedServicesLive } from "./api/layers"; +import { requestScopedMiddleware } from "./api/request-scoped"; +import { makeApiLive } from "./api/router"; + +class Counter extends Context.Service< + Counter, + { readonly id: number } +>()("test/Counter") {} + +const makeCounterLive = (counts: { acquires: number; releases: number }) => + Layer.effect(Counter)( + Effect.acquireRelease( + Effect.sync(() => { + counts.acquires += 1; + return { id: counts.acquires }; + }), + () => + Effect.sync(() => { + counts.releases += 1; + }), + ), + ); + +const Routes = HttpRouter.add( + "GET", + "/", + Effect.gen(function* () { + const c = yield* Counter; + return HttpServerResponse.jsonUnsafe({ id: c.id }); + }), +); + +describe("HttpRouter.toWebHandler request scoping", () => { + it("Layer.provideMerge of a scoped layer captures the boot scope (the bug)", async () => { + const counts = { acquires: 0, releases: 0 }; + const App = Routes.pipe( + Layer.provideMerge(makeCounterLive(counts)), + Layer.provideMerge(HttpServer.layerServices), + ); + const handler = HttpRouter.toWebHandler(App, { disableLogger: true }) + .handler; + + const a = await handler(new Request("http://test.local/")); + const b = await handler(new Request("http://test.local/")); + + // Same id => the resource was acquired once at boot and shared. + // On Cloudflare Workers this is the I/O-isolation crash mode. + expect(await a.json()).toEqual({ id: 1 }); + expect(await b.json()).toEqual({ id: 1 }); + expect(counts.acquires).toBe(1); + }); + + it("HttpRouter.provideRequest is misleadingly named — it also captures boot scope", async () => { + const counts = { acquires: 0, releases: 0 }; + const App = Routes.pipe( + HttpRouter.provideRequest(makeCounterLive(counts)), + Layer.provideMerge(HttpServer.layerServices), + ); + const handler = HttpRouter.toWebHandler(App, { disableLogger: true }) + .handler; + + const a = await handler(new Request("http://test.local/")); + const b = await handler(new Request("http://test.local/")); + + // `provideRequest` runs `Layer.build` in the OUTER middleware effect, + // which fires at layer-construction time — same lifetime as the boot + // scope. Both requests see the same acquired resource. + expect(await a.json()).toEqual({ id: 1 }); + expect(await b.json()).toEqual({ id: 1 }); + expect(counts.acquires).toBe(1); + }); + + it("requestScopedMiddleware runs acquireRelease per request (the fix)", async () => { + const counts = { acquires: 0, releases: 0 }; + const App = Routes.pipe( + Layer.provide(requestScopedMiddleware(makeCounterLive(counts)).layer), + Layer.provideMerge(HttpServer.layerServices), + ); + const handler = HttpRouter.toWebHandler(App, { disableLogger: true }) + .handler; + + const a = await handler(new Request("http://test.local/")); + const b = await handler(new Request("http://test.local/")); + + expect(await a.json()).toEqual({ id: 1 }); + expect(await b.json()).toEqual({ id: 2 }); + expect(counts.acquires).toBe(2); + expect(counts.releases).toBe(2); + }); +}); + +// --------------------------------------------------------------------------- +// Regression test against the prod handler factory. If anyone reverts +// `makeApiLive` back to wiring `RequestScopedServicesLive` via +// `Layer.provideMerge`, this test fails — the counter only increments +// once at boot instead of once per request. +// --------------------------------------------------------------------------- + +describe("makeApiLive (prod handler factory) request scoping", () => { + it("rebuilds RequestScopedServicesLive per request", async () => { + const counts = { acquires: 0, releases: 0 }; + // Wrap the real per-request layer with an `acquireRelease` counter. + // `requestScopedMiddleware` calls `Layer.build` per request, so this + // counter increments per request iff the wiring is correct. + const trackedRsLive = Layer.effectDiscard( + Effect.acquireRelease( + Effect.sync(() => { + counts.acquires += 1; + }), + () => + Effect.sync(() => { + counts.releases += 1; + }), + ), + ).pipe(Layer.provideMerge(RequestScopedServicesLive)); + + const handler = HttpRouter.toWebHandler(makeApiLive(trackedRsLive), { + disableLogger: true, + }).handler; + + // Hit a protected route. ExecutionStackMiddleware short-circuits with + // 403 (no session cookie) but not before `requestScopedMiddleware` + // has built the per-request layer. We don't care about the response — + // only that the layer was built once per request. + await handler(new Request("http://test.local/scope")); + await handler(new Request("http://test.local/scope")); + + expect(counts.acquires).toBe(2); + expect(counts.releases).toBe(2); + }); +}); diff --git a/apps/cloud/src/api/layers.ts b/apps/cloud/src/api/layers.ts index 0c10b3dc3..3dec10293 100644 --- a/apps/cloud/src/api/layers.ts +++ b/apps/cloud/src/api/layers.ts @@ -16,6 +16,7 @@ import { OrgHandlers } from "../org/handlers"; import { CoreSharedServices } from "./core-shared-services"; import { ProtectedCloudApi, RouterConfig } from "./protected-layers"; +import { requestScopedMiddleware } from "./request-scoped"; export { CoreSharedServices, @@ -26,9 +27,16 @@ export { const DbLive = DbService.Live; const UserStoreLive = UserStoreService.Live.pipe(Layer.provide(DbLive)); -export const SharedServices = Layer.mergeAll( - DbLive, - UserStoreLive, +// Per-request layer. Anything that opens an I/O object (postgres.js socket, +// fetch stream readers, anything backed by a `Writable`) MUST live here — +// `provideRequestScoped` rebuilds it per request so Cloudflare Workers' +// I/O isolation is satisfied. See `api.request-scope.test.ts`. +export const RequestScopedServicesLive = Layer.mergeAll(DbLive, UserStoreLive); + +// Boot-scoped layer. Built once at worker boot, reused across requests. +// Safe for config, in-memory caches, the global tracer provider, and +// stateless service shells. +export const BootSharedServices = Layer.mergeAll( CoreSharedServices, HttpServer.layerServices, TelemetryLive, @@ -36,14 +44,37 @@ export const SharedServices = Layer.mergeAll( // Routes that don't require an authenticated org session — login, // callbacks, etc. Mounts at the paths declared inside `NonProtectedApi`. -export const NonProtectedApiLive = HttpApiBuilder.layer(NonProtectedApi).pipe( - Layer.provide(Layer.mergeAll(CloudAuthPublicHandlers, CloudSessionAuthHandlers)), - Layer.provideMerge(SessionAuthLive), -); +// +// `rsLive` is the per-request DB layer. It's passed in as a parameter so +// tests can substitute a counting fake for `DbService.Live` and assert +// per-request semantics. Handlers here yield `UserStoreService` directly; +// without per-request scoping the postgres.js socket pins to the worker's +// boot scope and Cloudflare Workers' I/O isolation kills the second +// request. +export const makeNonProtectedApiLive = ( + rsLive: Layer.Layer, +) => + HttpApiBuilder.layer(NonProtectedApi).pipe( + Layer.provide(Layer.mergeAll(CloudAuthPublicHandlers, CloudSessionAuthHandlers)), + Layer.provide(requestScopedMiddleware(rsLive).layer), + Layer.provideMerge(SessionAuthLive), + ); // Routes scoped to a specific org (membership management, switching, etc.). // Auth is enforced by `OrgAuth` middleware declared on `OrgHttpApi`. -export const OrgApiLive = HttpApiBuilder.layer(OrgHttpApi).pipe( - Layer.provide(OrgHandlers), - Layer.provideMerge(OrgAuthLive), +export const makeOrgApiLive = ( + rsLive: Layer.Layer, +) => + HttpApiBuilder.layer(OrgHttpApi).pipe( + Layer.provide(OrgHandlers), + Layer.provide(requestScopedMiddleware(rsLive).layer), + Layer.provideMerge(OrgAuthLive), + ); + +// Default exports use the production per-request layer. Existing callers +// that import `NonProtectedApiLive`/`OrgApiLive` continue to work; the +// `make*` factories exist for tests that need to swap in a fake. +export const NonProtectedApiLive = makeNonProtectedApiLive( + RequestScopedServicesLive, ); +export const OrgApiLive = makeOrgApiLive(RequestScopedServicesLive); diff --git a/apps/cloud/src/api/protected.ts b/apps/cloud/src/api/protected.ts index fa3de57f6..608eed7a7 100644 --- a/apps/cloud/src/api/protected.ts +++ b/apps/cloud/src/api/protected.ts @@ -25,11 +25,13 @@ import { AutumnService } from "../services/autumn"; import { DbService } from "../services/db"; import { makeExecutionStack } from "../services/execution-stack"; import { HttpResponseError } from "./error-response"; +import { RequestScopedServicesLive } from "./layers"; import { ProtectedCloudApi, ProtectedCloudApiLive, RouterConfig, } from "./protected-layers"; +import { requestScopedMiddleware } from "./request-scoped"; // One `HttpRouter` middleware that: // 1. authenticates the WorkOS sealed session, @@ -48,6 +50,29 @@ import { // (see `HttpResponseError` in `./error-response.ts`). Letting `unhandled` // pass through is what satisfies `HttpRouter.middleware`'s brand check // without any type casts. + +// One `HttpRouter` middleware that: +// 1. authenticates the WorkOS sealed session, +// 2. verifies live org membership (closes the JWT-cache gap — see +// `auth/authorize-organization.ts`), +// 3. resolves the org name, +// 4. builds the per-request executor + engine, +// 5. provides `AuthContext` + the execution-stack services to the handler. +// +// Replaces both the old outer `Effect.gen` in this file (which did its own +// WorkOS lookup) and the per-route `OrgAuth` HttpApiMiddleware (which did +// a second one). +// +// Errors are NOT caught here: failures propagate as typed errors and are +// rendered to a JSON response by the framework's `Respondable` pipeline +// (see `HttpResponseError` in `./error-response.ts`). Letting `unhandled` +// pass through is what satisfies `HttpRouter.middleware`'s brand check +// without any type casts. +// +// `DbService` and `UserStoreService` are pulled from per-request context +// — `RequestScopedServicesMiddleware` (combined below) provides them +// fresh per request so the postgres.js socket lives in the request +// fiber's scope, not the worker's boot scope. const ExecutionStackMiddleware = HttpRouter.middleware<{ provides: | AuthContext @@ -57,15 +82,8 @@ const ExecutionStackMiddleware = HttpRouter.middleware<{ | McpExtensionService | GraphqlExtensionService; }>()( - // Layer-time setup — capture the long-lived services in a closure so - // the per-request function only needs `HttpRouter`-Provided context. - // That collapses the middleware's `requires` to `never`, giving us a - // real `.layer` (instead of the "Need to .combine(...)" type-error - // sentinel that fires when `requires` leaks to non-never). Effect.gen(function* () { - const context = yield* Effect.context< - WorkOSAuth | UserStoreService | AutumnService | DbService - >(); + const longLived = yield* Effect.context(); return (httpEffect) => Effect.gen(function* () { const request = yield* HttpServerRequest.HttpServerRequest; @@ -103,12 +121,28 @@ const ExecutionStackMiddleware = HttpRouter.middleware<{ Effect.provideService(McpExtensionService, executor.mcp), Effect.provideService(GraphqlExtensionService, executor.graphql), ); - }).pipe(Effect.provideContext(context)); + }).pipe(Effect.provideContext(longLived)); }), -).layer; - -export const ProtectedApiLive = ProtectedCloudApiLive.pipe( - Layer.provide(ExecutionStackMiddleware), - Layer.provideMerge(HttpApiSwagger.layer(ProtectedCloudApi, { path: "/docs" })), - Layer.provideMerge(RouterConfig), ); + +// `rsLive` is the per-request DB layer. Combining it into the auth +// middleware collapses `requires: DbService | UserStoreService` to +// never (so `.layer` is a real Layer instead of the "Need to combine" +// type-error sentinel) AND makes the postgres.js socket request-scoped: +// the layer rebuilds per HTTP request, satisfying Cloudflare Workers' +// I/O isolation. Exposed as a factory so tests can swap in a counting +// fake — see `apps/cloud/src/api.request-scope.node.test.ts`. +export const makeProtectedApiLive = ( + rsLive: Layer.Layer, +) => { + const protectedMiddleware = ExecutionStackMiddleware.combine( + requestScopedMiddleware(rsLive), + ).layer; + return ProtectedCloudApiLive.pipe( + Layer.provide(protectedMiddleware), + Layer.provideMerge(HttpApiSwagger.layer(ProtectedCloudApi, { path: "/docs" })), + Layer.provideMerge(RouterConfig), + ); +}; + +export const ProtectedApiLive = makeProtectedApiLive(RequestScopedServicesLive); diff --git a/apps/cloud/src/api/request-scoped.ts b/apps/cloud/src/api/request-scoped.ts new file mode 100644 index 000000000..c16c8ad8f --- /dev/null +++ b/apps/cloud/src/api/request-scoped.ts @@ -0,0 +1,47 @@ +// --------------------------------------------------------------------------- +// Per-request layer provisioning for `HttpRouter.toWebHandler` +// --------------------------------------------------------------------------- +// +// `HttpRouter.toWebHandler` builds the application layer once into a +// boot-scoped `Context` and reuses it for every request, so any +// `Effect.acquireRelease` inside that layer fires once at worker boot. +// On Cloudflare Workers a postgres.js socket (a `Writable` I/O object) +// opened during request 1 cannot be touched from request 2 — the +// runtime throws "Cannot perform I/O on behalf of a different request". +// +// `Layer.provideMerge` and (despite the name) `HttpRouter.provideRequest` +// both build the inner layer at construction time. The only primitive +// that actually rebuilds per request is a router middleware whose +// per-request handler calls `Layer.build(layer)` inside `Effect.scoped`, +// so `acquireRelease` fires per request and finalizers run when the +// request fiber's scope closes. +// +// See `apps/cloud/src/api.request-scope.node.test.ts` for the regression +// coverage that pins this rule down. +// --------------------------------------------------------------------------- + +import { Effect, Layer } from "effect"; +import { HttpRouter } from "effect/unstable/http"; + +/** + * Build an `HttpRouter.middleware` that provides `layer`'s services to + * each request. The layer is rebuilt per HTTP request so + * `Effect.acquireRelease` fires per request and is released when the + * request fiber's scope closes. + * + * The returned value is a `Middleware`. Use `.layer` to apply it as a + * standalone layer; use `.combine(other)` to fold it into another + * middleware whose per-request body needs services this layer provides + * (e.g. `ExecutionStackMiddleware`'s auth logic that yields + * `DbService` + `UserStoreService` — combining drops those from the + * outer middleware's `requires`). + */ +export const requestScopedMiddleware = (layer: Layer.Layer) => + HttpRouter.middleware<{ provides: A }>()((httpEffect) => + Effect.scoped( + Effect.gen(function* () { + const services = yield* Layer.build(layer); + return yield* Effect.provideContext(httpEffect, services); + }), + ), + ); diff --git a/apps/cloud/src/api/router.ts b/apps/cloud/src/api/router.ts index a122634de..e4e9aa002 100644 --- a/apps/cloud/src/api/router.ts +++ b/apps/cloud/src/api/router.ts @@ -1,24 +1,40 @@ import { Layer } from "effect"; +import { UserStoreService } from "../auth/context"; +import { DbService } from "../services/db"; + import { AutumnRoutesLive } from "./autumn"; import { - NonProtectedApiLive, - OrgApiLive, + BootSharedServices, + RequestScopedServicesLive, RouterConfig, - SharedServices, + makeNonProtectedApiLive, + makeOrgApiLive, } from "./layers"; -import { ProtectedApiLive } from "./protected"; +import { makeProtectedApiLive } from "./protected"; // One router. Each sub-API contributes its routes via `HttpApiBuilder.layer`, // which calls `HttpRouter.use(...)` under the hood. Autumn's catch-all proxy // is added as a plain `HttpRouter.add` route. They all merge into the same // routing table; there is no outer-then-inner router stacking. -export const ApiLive = Layer.mergeAll( - NonProtectedApiLive, - OrgApiLive, - ProtectedApiLive, - AutumnRoutesLive, -).pipe( - Layer.provideMerge(RouterConfig), - Layer.provideMerge(SharedServices), -); +// +// The per-request `DbService` + `UserStoreService` wiring is threaded +// through each sub-API's factory. Boot-scoped services come in here via +// `Layer.provideMerge`. `requestScopedLive` is exposed as a parameter +// so tests can substitute a counting fake for `DbService.Live` and +// assert per-request semantics — see +// `apps/cloud/src/api.request-scope.node.test.ts`. +export const makeApiLive = ( + requestScopedLive: Layer.Layer, +) => + Layer.mergeAll( + makeNonProtectedApiLive(requestScopedLive), + makeOrgApiLive(requestScopedLive), + makeProtectedApiLive(requestScopedLive), + AutumnRoutesLive, + ).pipe( + Layer.provideMerge(RouterConfig), + Layer.provideMerge(BootSharedServices), + ); + +export const ApiLive = makeApiLive(RequestScopedServicesLive); diff --git a/apps/cloud/test-stubs/cloudflare-workers.ts b/apps/cloud/test-stubs/cloudflare-workers.ts index 701b32d6d..8dc4e9c53 100644 --- a/apps/cloud/test-stubs/cloudflare-workers.ts +++ b/apps/cloud/test-stubs/cloudflare-workers.ts @@ -16,4 +16,5 @@ export const env: Record = { export class WorkerEntrypoint {} export class DurableObject {} export class WorkflowEntrypoint {} +export class RpcTarget {} export const exports: Record = {};