From 5eecbe7b0a2273d759c12183d19c39a3f3a01887 Mon Sep 17 00:00:00 2001 From: Rhys Sullivan <39114868+RhysSullivan@users.noreply.github.com> Date: Tue, 28 Apr 2026 10:37:03 -0700 Subject: [PATCH] Add WorkOS identity webhook sync --- apps/cloud/src/api.test.ts | 4 + apps/cloud/src/api.ts | 3 + apps/cloud/src/api/layers.ts | 23 ++++ apps/cloud/src/api/router.ts | 6 + apps/cloud/src/auth/workos.ts | 10 ++ apps/cloud/src/env-augment.d.ts | 3 + apps/cloud/src/identity/api.ts | 8 ++ apps/cloud/src/identity/handlers.ts | 28 ++++ apps/cloud/src/identity/sync.test.ts | 199 +++++++++++++++++++++++++++ apps/cloud/src/identity/sync.ts | 163 ++++++++++++++++++++++ apps/cloud/worker-configuration.d.ts | 2 + 11 files changed, 449 insertions(+) create mode 100644 apps/cloud/src/identity/api.ts create mode 100644 apps/cloud/src/identity/handlers.ts create mode 100644 apps/cloud/src/identity/sync.test.ts create mode 100644 apps/cloud/src/identity/sync.ts diff --git a/apps/cloud/src/api.test.ts b/apps/cloud/src/api.test.ts index cd7955d72..68d07b3ae 100644 --- a/apps/cloud/src/api.test.ts +++ b/apps/cloud/src/api.test.ts @@ -15,6 +15,7 @@ import { Effect, Layer, Schema } from "effect"; import { ApiRequestHandler, AutumnRequestHandlerService, + IdentityWebhookRequestHandlerService, NonProtectedRequestHandlerService, ProtectedRequestHandlerService, OrgRequestHandlerService, @@ -128,6 +129,9 @@ const TestRequestHandlersLive = Layer.mergeAll( Layer.succeed(AutumnRequestHandlerService, { app: Effect.succeed(HttpServerResponse.unsafeJson({ source: "autumn" })), }), + Layer.succeed(IdentityWebhookRequestHandlerService, { + app: Effect.succeed(HttpServerResponse.unsafeJson({ source: "identity-webhook" })), + }), Layer.succeed(ProtectedRequestHandlerService, { app: ProtectedTestApp }), ); diff --git a/apps/cloud/src/api.ts b/apps/cloud/src/api.ts index e0858123f..d185f3813 100644 --- a/apps/cloud/src/api.ts +++ b/apps/cloud/src/api.ts @@ -5,14 +5,17 @@ import { ProtectedApiApp } from "./api/protected"; import { ApiRequestHandler, AutumnRequestHandlerService, + IdentityWebhookRequestHandlerService, NonProtectedRequestHandlerService, ProtectedRequestHandlerService, OrgRequestHandlerService, } from "./api/router"; +import { IdentityWebhookApiApp } from "./api/layers"; const ApiRequestHandlersLive = Layer.mergeAll( Layer.succeed(OrgRequestHandlerService, { app: OrgApiApp }), Layer.succeed(NonProtectedRequestHandlerService, { app: NonProtectedApiApp }), + Layer.succeed(IdentityWebhookRequestHandlerService, { app: IdentityWebhookApiApp }), Layer.succeed(AutumnRequestHandlerService, { app: AutumnApiApp }), Layer.succeed(ProtectedRequestHandlerService, { app: ProtectedApiApp }), ); diff --git a/apps/cloud/src/api/layers.ts b/apps/cloud/src/api/layers.ts index 1113250a9..042879dbb 100644 --- a/apps/cloud/src/api/layers.ts +++ b/apps/cloud/src/api/layers.ts @@ -20,6 +20,8 @@ import { DbService } from "../services/db"; import { TelemetryLive } from "../services/telemetry"; import { OrgHttpApi } from "../org/compose"; import { OrgHandlers } from "../org/handlers"; +import { IdentityApi, IdentityWebhookHandlers } from "../identity/handlers"; +import { IdentitySync } from "../identity/sync"; import { ErrorCaptureLive } from "../observability"; import { CoreSharedServices } from "./core-shared-services"; @@ -40,11 +42,16 @@ const IdentityDirectoryLive = IdentityDirectory.Live.pipe( Layer.provideMerge(UserStoreLive), Layer.provideMerge(CoreSharedServices), ); +const IdentitySyncLive = IdentitySync.Live.pipe( + Layer.provideMerge(UserStoreLive), + Layer.provideMerge(CoreSharedServices), +); export const SharedServices = Layer.mergeAll( DbLive, UserStoreLive, IdentityDirectoryLive, + IdentitySyncLive, CoreSharedServices, HttpServer.layerContext, TelemetryLive, @@ -76,6 +83,10 @@ const OrgApiLive = HttpApiBuilder.api(OrgHttpApi).pipe( Layer.provideMerge(OrgAuthLive), ); +const IdentityWebhookApiLive = HttpApiBuilder.api(IdentityApi).pipe( + Layer.provide(IdentityWebhookHandlers), +); + const NonProtectedRequestLayer = NonProtectedApiLive.pipe( Layer.provideMerge(RouterConfig), Layer.provideMerge(HttpServer.layerContext), @@ -90,6 +101,13 @@ const OrgRequestLayer = OrgApiLive.pipe( Layer.provideMerge(HttpApiBuilder.Middleware.layer), ); +const IdentityWebhookRequestLayer = IdentityWebhookApiLive.pipe( + Layer.provideMerge(RouterConfig), + Layer.provideMerge(HttpServer.layerContext), + Layer.provideMerge(HttpApiBuilder.Router.Live), + Layer.provideMerge(HttpApiBuilder.Middleware.layer), +); + export const NonProtectedApiApp = Effect.flatMap( HttpApiBuilder.httpApp.pipe(Effect.provide(NonProtectedRequestLayer)), HttpMiddleware.logger, @@ -99,3 +117,8 @@ export const OrgApiApp = Effect.flatMap( HttpApiBuilder.httpApp.pipe(Effect.provide(OrgRequestLayer)), HttpMiddleware.logger, ).pipe(Effect.provide(SharedServices)); + +export const IdentityWebhookApiApp = Effect.flatMap( + HttpApiBuilder.httpApp.pipe(Effect.provide(IdentityWebhookRequestLayer)), + HttpMiddleware.logger, +).pipe(Effect.provide(SharedServices)); diff --git a/apps/cloud/src/api/router.ts b/apps/cloud/src/api/router.ts index 5bd9ebd59..eeba8cffe 100644 --- a/apps/cloud/src/api/router.ts +++ b/apps/cloud/src/api/router.ts @@ -17,6 +17,10 @@ export class AutumnRequestHandlerService extends Context.Tag( "@executor/cloud/AutumnRequestHandlerService", )() {} +export class IdentityWebhookRequestHandlerService extends Context.Tag( + "@executor/cloud/IdentityWebhookRequestHandlerService", +)() {} + export class ProtectedRequestHandlerService extends Context.Tag( "@executor/cloud/ProtectedRequestHandlerService", )() {} @@ -25,12 +29,14 @@ export const ApiRouterApp = Effect.gen(function* () { const org = yield* OrgRequestHandlerService; const nonProtected = yield* NonProtectedRequestHandlerService; const autumn = yield* AutumnRequestHandlerService; + const identityWebhooks = yield* IdentityWebhookRequestHandlerService; const protectedHandler = yield* ProtectedRequestHandlerService; return yield* HttpRouter.empty.pipe( HttpRouter.mountApp("/org", org.app, { includePrefix: true }), HttpRouter.mountApp("/auth", nonProtected.app, { includePrefix: true }), HttpRouter.mountApp("/autumn", autumn.app, { includePrefix: true }), + HttpRouter.mountApp("/webhooks", identityWebhooks.app, { includePrefix: true }), HttpRouter.mountApp("/", protectedHandler.app), HttpRouter.toHttpApp, ); diff --git a/apps/cloud/src/auth/workos.ts b/apps/cloud/src/auth/workos.ts index 3b5daf9eb..8ae18ed01 100644 --- a/apps/cloud/src/auth/workos.ts +++ b/apps/cloud/src/auth/workos.ts @@ -214,6 +214,16 @@ const make = Effect.gen(function* () { /** Delete a domain claim. */ deleteOrganizationDomain: (domainId: string) => use((wos) => wos.organizationDomains.delete(domainId)), + + /** Verify and parse a WorkOS webhook payload. */ + constructWebhookEvent: (payload: Record, signature: string, secret: string) => + use((wos) => + wos.webhooks.constructEvent({ + payload, + sigHeader: signature, + secret, + }), + ), }; }); diff --git a/apps/cloud/src/env-augment.d.ts b/apps/cloud/src/env-augment.d.ts index 3977d7236..2b0780b79 100644 --- a/apps/cloud/src/env-augment.d.ts +++ b/apps/cloud/src/env-augment.d.ts @@ -21,6 +21,9 @@ declare global { // Billing AUTUMN_SECRET_KEY?: string; + // Identity + WORKOS_WEBHOOK_SECRET?: string; + // MCP EXECUTOR_MCP_DEBUG?: string; MCP_AUTHKIT_DOMAIN?: string; diff --git a/apps/cloud/src/identity/api.ts b/apps/cloud/src/identity/api.ts new file mode 100644 index 000000000..01ab63f07 --- /dev/null +++ b/apps/cloud/src/identity/api.ts @@ -0,0 +1,8 @@ +import { HttpApiEndpoint, HttpApiGroup } from "@effect/platform"; +import { UserStoreError, WorkOSError } from "../auth/errors"; + +export class IdentityWebhookApi extends HttpApiGroup.make("identityWebhooks").add( + HttpApiEndpoint.post("workos")`/webhooks/workos` + .addError(UserStoreError) + .addError(WorkOSError), +) {} diff --git a/apps/cloud/src/identity/handlers.ts b/apps/cloud/src/identity/handlers.ts new file mode 100644 index 000000000..48bd338ac --- /dev/null +++ b/apps/cloud/src/identity/handlers.ts @@ -0,0 +1,28 @@ +import { HttpApi, HttpApiBuilder, HttpServerRequest, HttpServerResponse } from "@effect/platform"; +import { env } from "cloudflare:workers"; +import { Effect } from "effect"; + +import { WorkOSError } from "../auth/errors"; +import { IdentityWebhookApi } from "./api"; +import { IdentitySync } from "./sync"; + +export const IdentityApi = HttpApi.make("identity").add(IdentityWebhookApi); + +export const IdentityWebhookHandlers = HttpApiBuilder.group( + IdentityApi, + "identityWebhooks", + (handlers) => + handlers.handleRaw("workos", ({ request }) => + Effect.gen(function* () { + const secret = env.WORKOS_WEBHOOK_SECRET; + if (!secret) return HttpServerResponse.text("Missing webhook secret", { status: 500 }); + + const webRequest = yield* HttpServerRequest.toWeb(request).pipe( + Effect.mapError(() => new WorkOSError()), + ); + const sync = yield* IdentitySync; + const result = yield* sync.constructAndApplyWebhook(webRequest, secret); + return HttpServerResponse.unsafeJson({ result }); + }), + ), +); diff --git a/apps/cloud/src/identity/sync.test.ts b/apps/cloud/src/identity/sync.test.ts new file mode 100644 index 000000000..4c22b82b8 --- /dev/null +++ b/apps/cloud/src/identity/sync.test.ts @@ -0,0 +1,199 @@ +import { describe, expect, it } from "@effect/vitest"; +import { Effect, Layer } from "effect"; + +import { UserStoreService } from "../auth/context"; +import { WorkOSAuth } from "../auth/workos"; +import { IdentityProvider } from "./provider"; +import { IdentitySync } from "./sync"; +import type { IdentityOrganization } from "./types"; + +type AccountInput = { + readonly id: string; + readonly email?: string | null; + readonly name?: string | null; +}; + +type OrgInput = { + readonly id: string; + readonly name: string; +}; + +type MembershipInput = { + readonly accountId: string; + readonly organizationId: string; + readonly status?: string; + readonly roleSlug?: string; +}; + +const makeSync = () => { + const accounts = new Map(); + const organizations = new Map(); + const memberships = new Map(); + const events = new Set(); + + const UserStoreTest = Layer.succeed(UserStoreService, { + use: (fn: (store: { + ensureAccount: (input: AccountInput) => Promise; + upsertOrganization: (input: OrgInput) => Promise; + getOrganization: (id: string) => Promise; + upsertMembership: (input: MembershipInput) => Promise; + deactivateMembership: ( + accountId: string, + organizationId: string, + ) => Promise; + recordIdentityEvent: (event: { + provider: string; + eventId: string; + eventType: string; + }) => Promise; + }) => Promise) => + Effect.promise(() => + fn({ + ensureAccount: async (input) => { + accounts.set(input.id, input); + return input; + }, + upsertOrganization: async (input) => { + organizations.set(input.id, input); + return input; + }, + getOrganization: async (id) => organizations.get(id) ?? null, + upsertMembership: async (input) => { + memberships.set(`${input.accountId}:${input.organizationId}`, input); + return input; + }, + deactivateMembership: async (accountId, organizationId) => { + const key = `${accountId}:${organizationId}`; + const current = memberships.get(key); + const next = current ? { ...current, status: "inactive" } : null; + if (next) memberships.set(key, next); + return next; + }, + recordIdentityEvent: async (event) => { + const key = `${event.provider}:${event.eventId}`; + if (events.has(key)) return false; + events.add(key); + return true; + }, + }), + ), + } as unknown as UserStoreService["Type"]); + + const IdentityProviderTest = Layer.succeed(IdentityProvider, { + authenticateSealedSession: () => Effect.succeed(null), + authenticateRequest: () => Effect.succeed(null), + listUserMemberships: () => Effect.succeed([]), + listOrganizationMembers: () => Effect.succeed([]), + listOrganizationRoles: () => Effect.succeed([]), + getOrganization: (organizationId: string) => + Effect.succeed({ id: organizationId, name: "Fetched Org" } satisfies IdentityOrganization), + refreshSession: () => Effect.succeed(null), + } as IdentityProvider["Type"]); + + const WorkOSTest = Layer.succeed(WorkOSAuth, { + constructWebhookEvent: () => Effect.die("not used"), + } as unknown as WorkOSAuth["Type"]); + + return { + accounts, + organizations, + memberships, + layer: IdentitySync.Live.pipe( + Layer.provideMerge(UserStoreTest), + Layer.provideMerge(IdentityProviderTest), + Layer.provideMerge(WorkOSTest), + ) as Layer.Layer, + }; +}; + +describe("IdentitySync", () => { + it.effect("upserts users from WorkOS user events", () => + Effect.gen(function* () { + const sync = makeSync(); + + const result = yield* Effect.provide( + Effect.gen(function* () { + const service = yield* IdentitySync; + return yield* service.applyEvent({ + id: "event_1", + event: "user.created", + data: { + id: "user_1", + email: "user@test.com", + firstName: "Ada", + lastName: "Lovelace", + }, + }); + }), + sync.layer, + ); + + expect(result).toBe("processed"); + expect(sync.accounts.get("user_1")).toMatchObject({ + email: "user@test.com", + name: "Ada Lovelace", + }); + }), + ); + + it.effect("deduplicates already processed events", () => + Effect.gen(function* () { + const sync = makeSync(); + + const program = Effect.gen(function* () { + const service = yield* IdentitySync; + const event = { + id: "event_1", + event: "organization.created", + data: { id: "org_1", name: "Acme" }, + }; + return [yield* service.applyEvent(event), yield* service.applyEvent(event)] as const; + }); + + const results = yield* Effect.provide(program, sync.layer); + + expect(results).toEqual(["processed", "duplicate"]); + }), + ); + + it.effect("upserts and deactivates memberships", () => + Effect.gen(function* () { + const sync = makeSync(); + + yield* Effect.provide( + Effect.gen(function* () { + const service = yield* IdentitySync; + yield* service.applyEvent({ + id: "event_create", + event: "organization_membership.created", + data: { + id: "mem_1", + userId: "user_1", + organizationId: "org_1", + organizationName: "Acme", + status: "active", + role: { slug: "admin" }, + }, + }); + return yield* service.applyEvent({ + id: "event_delete", + event: "organization_membership.deleted", + data: { + id: "mem_1", + userId: "user_1", + organizationId: "org_1", + status: "inactive", + }, + }); + }), + sync.layer, + ); + + expect(sync.organizations.get("org_1")?.name).toBe("Acme"); + expect(sync.memberships.get("user_1:org_1")).toMatchObject({ + roleSlug: "admin", + status: "inactive", + }); + }), + ); +}); diff --git a/apps/cloud/src/identity/sync.ts b/apps/cloud/src/identity/sync.ts new file mode 100644 index 000000000..25385682e --- /dev/null +++ b/apps/cloud/src/identity/sync.ts @@ -0,0 +1,163 @@ +import { Context, Effect, Layer } from "effect"; + +import { UserStoreService } from "../auth/context"; +import { WorkOSError, type UserStoreError } from "../auth/errors"; +import { WorkOSAuth } from "../auth/workos"; +import { IdentityProvider } from "./provider"; + +type IdentityEvent = { + readonly id: string; + readonly event: string; + readonly data: Record; +}; + +type UserData = { + readonly id: string; + readonly email?: string | null; + readonly firstName?: string | null; + readonly lastName?: string | null; + readonly profilePictureUrl?: string | null; +}; + +type OrganizationData = { + readonly id: string; + readonly name: string; + readonly externalId?: string | null; +}; + +type MembershipData = { + readonly id: string; + readonly userId: string; + readonly organizationId: string; + readonly organizationName?: string | null; + readonly status: string; + readonly role?: { readonly slug?: string | null } | null; +}; + +export class IdentitySync extends Context.Tag("@executor/cloud/IdentitySync")< + IdentitySync, + { + readonly applyEvent: ( + event: IdentityEvent, + ) => Effect.Effect<"processed" | "duplicate" | "ignored", UserStoreError | WorkOSError>; + readonly constructAndApplyWebhook: ( + request: Request, + secret: string, + ) => Effect.Effect<"processed" | "duplicate" | "ignored", UserStoreError | WorkOSError>; + } +>() { + static Live = Layer.effect( + this, + Effect.gen(function* () { + const users = yield* UserStoreService; + const provider = yield* IdentityProvider; + const workos = yield* WorkOSAuth; + + const upsertAccount = (data: UserData) => + users.use((store) => + store.ensureAccount({ + id: data.id, + email: data.email ?? null, + name: [data.firstName, data.lastName].filter(Boolean).join(" ") || null, + avatarUrl: data.profilePictureUrl ?? null, + externalId: data.id, + identityProvider: "workos", + }), + ); + + const upsertOrganization = (data: OrganizationData) => + users.use((store) => + store.upsertOrganization({ + id: data.id, + name: data.name, + externalId: data.externalId ?? data.id, + identityProvider: "workos", + }), + ); + + const upsertMembership = (data: MembershipData) => + Effect.gen(function* () { + yield* users.use((store) => + store.ensureAccount({ + id: data.userId, + externalId: data.userId, + identityProvider: "workos", + }), + ); + + const existingOrg = yield* users.use((store) => store.getOrganization(data.organizationId)); + if (!existingOrg) { + if (data.organizationName) { + yield* upsertOrganization({ id: data.organizationId, name: data.organizationName }); + } else { + const org = yield* provider.getOrganization(data.organizationId); + yield* upsertOrganization({ id: org.id, name: org.name }); + } + } + + yield* users.use((store) => + store.upsertMembership({ + accountId: data.userId, + organizationId: data.organizationId, + externalId: data.id, + identityProvider: "workos", + status: data.status, + roleSlug: data.role?.slug ?? "member", + }), + ); + }); + + const applyEvent = (event: IdentityEvent) => + Effect.gen(function* () { + const inserted = yield* users.use((store) => + store.recordIdentityEvent({ + provider: "workos", + eventId: event.id, + eventType: event.event, + }), + ); + if (!inserted) return "duplicate" as const; + + switch (event.event) { + case "user.created": + case "user.updated": + yield* upsertAccount(event.data as UserData); + return "processed" as const; + case "organization.created": + case "organization.updated": + yield* upsertOrganization(event.data as OrganizationData); + return "processed" as const; + case "organization_membership.created": + case "organization_membership.updated": + yield* upsertMembership(event.data as MembershipData); + return "processed" as const; + case "organization_membership.deleted": { + const membership = event.data as MembershipData; + yield* users.use((store) => + store.deactivateMembership(membership.userId, membership.organizationId), + ); + return "processed" as const; + } + default: + return "ignored" as const; + } + }); + + return IdentitySync.of({ + applyEvent, + constructAndApplyWebhook: (request, secret) => + Effect.gen(function* () { + const signature = request.headers.get("workos-signature"); + if (!signature) return yield* new WorkOSError(); + const payload = yield* Effect.promise(() => request.json() as Promise>); + const event = yield* workos.constructWebhookEvent(payload, signature, secret); + return yield* applyEvent({ + id: String(event.id), + event: event.event, + data: event.data as Record, + }); + }), + }); + }), + ); +} diff --git a/apps/cloud/worker-configuration.d.ts b/apps/cloud/worker-configuration.d.ts index dfb7cc176..deb3ef063 100644 --- a/apps/cloud/worker-configuration.d.ts +++ b/apps/cloud/worker-configuration.d.ts @@ -12,6 +12,7 @@ declare namespace Cloudflare { WORKOS_API_KEY: string; WORKOS_CLIENT_ID: string; WORKOS_COOKIE_PASSWORD: string; + WORKOS_WEBHOOK_SECRET: string; APP_URL: string; WORKOS_CLAIM_TOKEN: string; MCP_SESSION: DurableObjectNamespace; @@ -29,6 +30,7 @@ declare namespace NodeJS { | "WORKOS_API_KEY" | "WORKOS_CLIENT_ID" | "WORKOS_COOKIE_PASSWORD" + | "WORKOS_WEBHOOK_SECRET" | "APP_URL" | "WORKOS_CLAIM_TOKEN" >