Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/cloud/src/api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { Effect, Layer, Schema } from "effect";
import {
ApiRequestHandler,
AutumnRequestHandlerService,
IdentityWebhookRequestHandlerService,
NonProtectedRequestHandlerService,
ProtectedRequestHandlerService,
OrgRequestHandlerService,
Expand Down Expand Up @@ -128,6 +129,9 @@ const TestRequestHandlersLive = Layer.mergeAll(
Layer.succeed(AutumnRequestHandlerService, {
app: Effect.succeed(HttpServerResponse.unsafeJson({ source: "autumn" })),
}),
Layer.succeed(IdentityWebhookRequestHandlerService, {
app: Effect.succeed(HttpServerResponse.unsafeJson({ source: "identity-webhook" })),
}),
Layer.succeed(ProtectedRequestHandlerService, { app: ProtectedTestApp }),
);

Expand Down
3 changes: 3 additions & 0 deletions apps/cloud/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import { ProtectedApiApp } from "./api/protected";
import {
ApiRequestHandler,
AutumnRequestHandlerService,
IdentityWebhookRequestHandlerService,
NonProtectedRequestHandlerService,
ProtectedRequestHandlerService,
OrgRequestHandlerService,
} from "./api/router";
import { IdentityWebhookApiApp } from "./api/layers";

const ApiRequestHandlersLive = Layer.mergeAll(
Layer.succeed(OrgRequestHandlerService, { app: OrgApiApp }),
Layer.succeed(NonProtectedRequestHandlerService, { app: NonProtectedApiApp }),
Layer.succeed(IdentityWebhookRequestHandlerService, { app: IdentityWebhookApiApp }),
Layer.succeed(AutumnRequestHandlerService, { app: AutumnApiApp }),
Layer.succeed(ProtectedRequestHandlerService, { app: ProtectedApiApp }),
);
Expand Down
23 changes: 23 additions & 0 deletions apps/cloud/src/api/layers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import { DbService } from "../services/db";
import { TelemetryLive } from "../services/telemetry";
import { OrgHttpApi } from "../org/compose";
import { OrgHandlers } from "../org/handlers";
import { IdentityApi, IdentityWebhookHandlers } from "../identity/handlers";
import { IdentitySync } from "../identity/sync";
import { ErrorCaptureLive } from "../observability";

import { CoreSharedServices } from "./core-shared-services";
Expand All @@ -40,11 +42,16 @@ const IdentityDirectoryLive = IdentityDirectory.Live.pipe(
Layer.provideMerge(UserStoreLive),
Layer.provideMerge(CoreSharedServices),
);
const IdentitySyncLive = IdentitySync.Live.pipe(
Layer.provideMerge(UserStoreLive),
Layer.provideMerge(CoreSharedServices),
);

export const SharedServices = Layer.mergeAll(
DbLive,
UserStoreLive,
IdentityDirectoryLive,
IdentitySyncLive,
CoreSharedServices,
HttpServer.layerContext,
TelemetryLive,
Expand Down Expand Up @@ -76,6 +83,10 @@ const OrgApiLive = HttpApiBuilder.api(OrgHttpApi).pipe(
Layer.provideMerge(OrgAuthLive),
);

const IdentityWebhookApiLive = HttpApiBuilder.api(IdentityApi).pipe(
Layer.provide(IdentityWebhookHandlers),
);

const NonProtectedRequestLayer = NonProtectedApiLive.pipe(
Layer.provideMerge(RouterConfig),
Layer.provideMerge(HttpServer.layerContext),
Expand All @@ -90,6 +101,13 @@ const OrgRequestLayer = OrgApiLive.pipe(
Layer.provideMerge(HttpApiBuilder.Middleware.layer),
);

const IdentityWebhookRequestLayer = IdentityWebhookApiLive.pipe(
Layer.provideMerge(RouterConfig),
Layer.provideMerge(HttpServer.layerContext),
Layer.provideMerge(HttpApiBuilder.Router.Live),
Layer.provideMerge(HttpApiBuilder.Middleware.layer),
);

export const NonProtectedApiApp = Effect.flatMap(
HttpApiBuilder.httpApp.pipe(Effect.provide(NonProtectedRequestLayer)),
HttpMiddleware.logger,
Expand All @@ -99,3 +117,8 @@ export const OrgApiApp = Effect.flatMap(
HttpApiBuilder.httpApp.pipe(Effect.provide(OrgRequestLayer)),
HttpMiddleware.logger,
).pipe(Effect.provide(SharedServices));

export const IdentityWebhookApiApp = Effect.flatMap(
HttpApiBuilder.httpApp.pipe(Effect.provide(IdentityWebhookRequestLayer)),
HttpMiddleware.logger,
).pipe(Effect.provide(SharedServices));
6 changes: 6 additions & 0 deletions apps/cloud/src/api/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ export class AutumnRequestHandlerService extends Context.Tag(
"@executor/cloud/AutumnRequestHandlerService",
)<AutumnRequestHandlerService, RequestAppService>() {}

export class IdentityWebhookRequestHandlerService extends Context.Tag(
"@executor/cloud/IdentityWebhookRequestHandlerService",
)<IdentityWebhookRequestHandlerService, RequestAppService>() {}

export class ProtectedRequestHandlerService extends Context.Tag(
"@executor/cloud/ProtectedRequestHandlerService",
)<ProtectedRequestHandlerService, RequestAppService>() {}
Expand All @@ -25,12 +29,14 @@ export const ApiRouterApp = Effect.gen(function* () {
const org = yield* OrgRequestHandlerService;
const nonProtected = yield* NonProtectedRequestHandlerService;
const autumn = yield* AutumnRequestHandlerService;
const identityWebhooks = yield* IdentityWebhookRequestHandlerService;
const protectedHandler = yield* ProtectedRequestHandlerService;

return yield* HttpRouter.empty.pipe(
HttpRouter.mountApp("/org", org.app, { includePrefix: true }),
HttpRouter.mountApp("/auth", nonProtected.app, { includePrefix: true }),
HttpRouter.mountApp("/autumn", autumn.app, { includePrefix: true }),
HttpRouter.mountApp("/webhooks", identityWebhooks.app, { includePrefix: true }),
HttpRouter.mountApp("/", protectedHandler.app),
HttpRouter.toHttpApp,
);
Expand Down
10 changes: 10 additions & 0 deletions apps/cloud/src/auth/workos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ const make = Effect.gen(function* () {
/** Delete a domain claim. */
deleteOrganizationDomain: (domainId: string) =>
use((wos) => wos.organizationDomains.delete(domainId)),

/** Verify and parse a WorkOS webhook payload. */
constructWebhookEvent: (payload: Record<string, unknown>, signature: string, secret: string) =>
use((wos) =>
wos.webhooks.constructEvent({
payload,
sigHeader: signature,
secret,
}),
),
};
});

Expand Down
3 changes: 3 additions & 0 deletions apps/cloud/src/env-augment.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ declare global {
// Billing
AUTUMN_SECRET_KEY?: string;

// Identity
WORKOS_WEBHOOK_SECRET?: string;

// MCP
EXECUTOR_MCP_DEBUG?: string;
MCP_AUTHKIT_DOMAIN?: string;
Expand Down
8 changes: 8 additions & 0 deletions apps/cloud/src/identity/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { HttpApiEndpoint, HttpApiGroup } from "@effect/platform";
import { UserStoreError, WorkOSError } from "../auth/errors";

export class IdentityWebhookApi extends HttpApiGroup.make("identityWebhooks").add(
HttpApiEndpoint.post("workos")`/webhooks/workos`
.addError(UserStoreError)
.addError(WorkOSError),
) {}
28 changes: 28 additions & 0 deletions apps/cloud/src/identity/handlers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { HttpApi, HttpApiBuilder, HttpServerRequest, HttpServerResponse } from "@effect/platform";
import { env } from "cloudflare:workers";
import { Effect } from "effect";

import { WorkOSError } from "../auth/errors";
import { IdentityWebhookApi } from "./api";
import { IdentitySync } from "./sync";

export const IdentityApi = HttpApi.make("identity").add(IdentityWebhookApi);

export const IdentityWebhookHandlers = HttpApiBuilder.group(
IdentityApi,
"identityWebhooks",
(handlers) =>
handlers.handleRaw("workos", ({ request }) =>
Effect.gen(function* () {
const secret = env.WORKOS_WEBHOOK_SECRET;
if (!secret) return HttpServerResponse.text("Missing webhook secret", { status: 500 });

const webRequest = yield* HttpServerRequest.toWeb(request).pipe(
Effect.mapError(() => new WorkOSError()),
);
const sync = yield* IdentitySync;
const result = yield* sync.constructAndApplyWebhook(webRequest, secret);
return HttpServerResponse.unsafeJson({ result });
}),
),
);
199 changes: 199 additions & 0 deletions apps/cloud/src/identity/sync.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import { describe, expect, it } from "@effect/vitest";
import { Effect, Layer } from "effect";

import { UserStoreService } from "../auth/context";
import { WorkOSAuth } from "../auth/workos";
import { IdentityProvider } from "./provider";
import { IdentitySync } from "./sync";
import type { IdentityOrganization } from "./types";

type AccountInput = {
readonly id: string;
readonly email?: string | null;
readonly name?: string | null;
};

type OrgInput = {
readonly id: string;
readonly name: string;
};

type MembershipInput = {
readonly accountId: string;
readonly organizationId: string;
readonly status?: string;
readonly roleSlug?: string;
};

const makeSync = () => {
const accounts = new Map<string, AccountInput>();
const organizations = new Map<string, OrgInput>();
const memberships = new Map<string, MembershipInput>();
const events = new Set<string>();

const UserStoreTest = Layer.succeed(UserStoreService, {
use: <A>(fn: (store: {
ensureAccount: (input: AccountInput) => Promise<AccountInput>;
upsertOrganization: (input: OrgInput) => Promise<OrgInput>;
getOrganization: (id: string) => Promise<OrgInput | null>;
upsertMembership: (input: MembershipInput) => Promise<MembershipInput>;
deactivateMembership: (
accountId: string,
organizationId: string,
) => Promise<MembershipInput | null>;
recordIdentityEvent: (event: {
provider: string;
eventId: string;
eventType: string;
}) => Promise<boolean>;
}) => Promise<A>) =>
Effect.promise(() =>
fn({
ensureAccount: async (input) => {
accounts.set(input.id, input);
return input;
},
upsertOrganization: async (input) => {
organizations.set(input.id, input);
return input;
},
getOrganization: async (id) => organizations.get(id) ?? null,
upsertMembership: async (input) => {
memberships.set(`${input.accountId}:${input.organizationId}`, input);
return input;
},
deactivateMembership: async (accountId, organizationId) => {
const key = `${accountId}:${organizationId}`;
const current = memberships.get(key);
const next = current ? { ...current, status: "inactive" } : null;
if (next) memberships.set(key, next);
return next;
},
recordIdentityEvent: async (event) => {
const key = `${event.provider}:${event.eventId}`;
if (events.has(key)) return false;
events.add(key);
return true;
},
}),
),
} as unknown as UserStoreService["Type"]);

const IdentityProviderTest = Layer.succeed(IdentityProvider, {
authenticateSealedSession: () => Effect.succeed(null),
authenticateRequest: () => Effect.succeed(null),
listUserMemberships: () => Effect.succeed([]),
listOrganizationMembers: () => Effect.succeed([]),
listOrganizationRoles: () => Effect.succeed([]),
getOrganization: (organizationId: string) =>
Effect.succeed({ id: organizationId, name: "Fetched Org" } satisfies IdentityOrganization),
refreshSession: () => Effect.succeed(null),
} as IdentityProvider["Type"]);

const WorkOSTest = Layer.succeed(WorkOSAuth, {
constructWebhookEvent: () => Effect.die("not used"),
} as unknown as WorkOSAuth["Type"]);

return {
accounts,
organizations,
memberships,
layer: IdentitySync.Live.pipe(
Layer.provideMerge(UserStoreTest),
Layer.provideMerge(IdentityProviderTest),
Layer.provideMerge(WorkOSTest),
) as Layer.Layer<IdentitySync, never, never>,
};
};

describe("IdentitySync", () => {
it.effect("upserts users from WorkOS user events", () =>
Effect.gen(function* () {
const sync = makeSync();

const result = yield* Effect.provide(
Effect.gen(function* () {
const service = yield* IdentitySync;
return yield* service.applyEvent({
id: "event_1",
event: "user.created",
data: {
id: "user_1",
email: "user@test.com",
firstName: "Ada",
lastName: "Lovelace",
},
});
}),
sync.layer,
);

expect(result).toBe("processed");
expect(sync.accounts.get("user_1")).toMatchObject({
email: "user@test.com",
name: "Ada Lovelace",
});
}),
);

it.effect("deduplicates already processed events", () =>
Effect.gen(function* () {
const sync = makeSync();

const program = Effect.gen(function* () {
const service = yield* IdentitySync;
const event = {
id: "event_1",
event: "organization.created",
data: { id: "org_1", name: "Acme" },
};
return [yield* service.applyEvent(event), yield* service.applyEvent(event)] as const;
});

const results = yield* Effect.provide(program, sync.layer);

expect(results).toEqual(["processed", "duplicate"]);
}),
);

it.effect("upserts and deactivates memberships", () =>
Effect.gen(function* () {
const sync = makeSync();

yield* Effect.provide(
Effect.gen(function* () {
const service = yield* IdentitySync;
yield* service.applyEvent({
id: "event_create",
event: "organization_membership.created",
data: {
id: "mem_1",
userId: "user_1",
organizationId: "org_1",
organizationName: "Acme",
status: "active",
role: { slug: "admin" },
},
});
return yield* service.applyEvent({
id: "event_delete",
event: "organization_membership.deleted",
data: {
id: "mem_1",
userId: "user_1",
organizationId: "org_1",
status: "inactive",
},
});
}),
sync.layer,
);

expect(sync.organizations.get("org_1")?.name).toBe("Acme");
expect(sync.memberships.get("user_1:org_1")).toMatchObject({
roleSlug: "admin",
status: "inactive",
});
}),
);
});
Loading
Loading