From 878c7d704af0d24a764811c71a90cb908b61adfe Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Fri, 19 Jun 2026 18:00:35 -0700 Subject: [PATCH] Align relay agent activity Effect services Co-authored-by: codex --- .../AgentActivityPublisher.test.ts | 31 ++--- .../agentActivity/AgentActivityPublisher.ts | 26 ++--- .../src/agentActivity/AgentActivityRows.ts | 42 +++---- infra/relay/src/agentActivity/ApnsClient.ts | 110 +++++++++--------- .../src/agentActivity/ApnsDeliveries.test.ts | 20 ++-- .../relay/src/agentActivity/ApnsDeliveries.ts | 67 ++++++----- .../src/agentActivity/ApnsDeliveryQueue.ts | 45 ++++--- .../src/agentActivity/DeliveryAttempts.ts | 29 +++-- infra/relay/src/agentActivity/Devices.ts | 33 +++--- .../relay/src/agentActivity/LiveActivities.ts | 73 ++++++------ .../agentActivity/MobileRegistrations.test.ts | 39 ++++--- .../src/agentActivity/MobileRegistrations.ts | 30 +++-- .../src/agentActivity/apnsDeliveryJobs.ts | 60 ++++++++-- 13 files changed, 320 insertions(+), 285 deletions(-) diff --git a/infra/relay/src/agentActivity/AgentActivityPublisher.test.ts b/infra/relay/src/agentActivity/AgentActivityPublisher.test.ts index 322ac77d896..9671f4984b2 100644 --- a/infra/relay/src/agentActivity/AgentActivityPublisher.test.ts +++ b/infra/relay/src/agentActivity/AgentActivityPublisher.test.ts @@ -41,8 +41,8 @@ function target(deviceId: string): LiveActivities.TargetRow { } function makeLiveActivities( - overrides: Partial = {}, -): LiveActivities.LiveActivitiesShape { + overrides: Partial = {}, +): LiveActivities.LiveActivities["Service"] { return { register: () => Effect.void, listTargets: () => Effect.succeed([]), @@ -55,8 +55,8 @@ function makeLiveActivities( } function makeAgentActivityRows( - overrides: Partial = {}, -): AgentActivityRows.AgentActivityRowsShape { + overrides: Partial = {}, +): AgentActivityRows.AgentActivityRows["Service"] { return { upsert: () => Effect.void, remove: () => Effect.void, @@ -88,8 +88,8 @@ function makeEnvironmentLinks( } function makeApnsDeliveries( - overrides: Partial = {}, -): ApnsDeliveries.ApnsDeliveriesShape { + overrides: Partial = {}, +): ApnsDeliveries.ApnsDeliveries["Service"] { return { sendForTarget: () => Effect.succeed(null), sendPushNotificationForTarget: () => Effect.succeed(null), @@ -133,7 +133,8 @@ describe("AgentActivityPublisher", () => { remote_start_queued_at: null, remote_started_at: "1970-01-01T00:00:01.000Z", }; - const sent: Array[0]> = []; + const sent: Array[0]> = + []; const deliveryResult: RelayDeliveryResult = { deviceId: "device-1", kind: "live_activity_update", @@ -211,7 +212,8 @@ describe("AgentActivityPublisher", () => { readonly environmentId: string; readonly environmentPublicKey: string; }> = []; - const upserts: Array[0]> = []; + const upserts: Array[0]> = + []; return Effect.gen(function* () { const result = yield* Effect.gen(function* () { @@ -302,9 +304,10 @@ describe("AgentActivityPublisher", () => { updatedAt: "1970-01-01T00:00:10.000Z", }; const sentAggregates: Array< - Parameters[0] + Parameters[0] > = []; - const removals: Array[0]> = []; + const removals: Array[0]> = + []; return Effect.gen(function* () { const result = yield* Effect.gen(function* () { @@ -405,10 +408,10 @@ describe("AgentActivityPublisher", () => { headline: "Needs input", }; const liveAggregates: Array< - Parameters[0] + Parameters[0] > = []; const pushAggregates: Array< - Parameters[0] + Parameters[0] > = []; return Effect.gen(function* () { @@ -517,10 +520,10 @@ describe("AgentActivityPublisher", () => { headline: "Needs approval", }; const liveAggregates: Array< - Parameters[0] + Parameters[0] > = []; const pushAggregates: Array< - Parameters[0] + Parameters[0] > = []; return Effect.gen(function* () { diff --git a/infra/relay/src/agentActivity/AgentActivityPublisher.ts b/infra/relay/src/agentActivity/AgentActivityPublisher.ts index 0f5ddc32137..d33cc42cd8d 100644 --- a/infra/relay/src/agentActivity/AgentActivityPublisher.ts +++ b/infra/relay/src/agentActivity/AgentActivityPublisher.ts @@ -23,22 +23,20 @@ export type AgentActivityPublishError = | LiveActivities.LiveActivityTargetListPersistenceError | ApnsDeliveries.ApnsDeliveryError; -export interface AgentActivityPublisherShape { - readonly publish: (input: { - readonly environmentId: string; - readonly environmentPublicKey: string; - readonly threadId: string; - readonly state: RelayAgentActivityState | null; - }) => Effect.Effect; - readonly replayForLiveActivityRegistration: (input: { - readonly userId: string; - readonly deviceId: string; - }) => Effect.Effect; -} - export class AgentActivityPublisher extends Context.Service< AgentActivityPublisher, - AgentActivityPublisherShape + { + readonly publish: (input: { + readonly environmentId: string; + readonly environmentPublicKey: string; + readonly threadId: string; + readonly state: RelayAgentActivityState | null; + }) => Effect.Effect; + readonly replayForLiveActivityRegistration: (input: { + readonly userId: string; + readonly deviceId: string; + }) => Effect.Effect; + } >()("t3code-relay/agentActivity/AgentActivityPublisher") {} const make = Effect.gen(function* () { diff --git a/infra/relay/src/agentActivity/AgentActivityRows.ts b/infra/relay/src/agentActivity/AgentActivityRows.ts index a0695b8e7da..854facfc7c5 100644 --- a/infra/relay/src/agentActivity/AgentActivityRows.ts +++ b/infra/relay/src/agentActivity/AgentActivityRows.ts @@ -3,7 +3,7 @@ import { RelayAgentActivityState as RelayAgentActivityStateSchema } from "@t3too import * as Context from "effect/Context"; import * as DateTime from "effect/DateTime"; import * as Effect from "effect/Effect"; -import { cast } from "effect/Function"; +import * as Function from "effect/Function"; import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; import * as Schema from "effect/Schema"; @@ -39,24 +39,26 @@ export class AgentActivityRowListPersistenceError extends Schema.TaggedErrorClas } } -export interface AgentActivityRowsShape { - readonly upsert: (input: { - readonly environmentPublicKey: string; - readonly state: RelayAgentActivityState; - }) => Effect.Effect; - readonly remove: (input: { - readonly environmentId: string; - readonly environmentPublicKey: string; - readonly threadId: string; - }) => Effect.Effect; - readonly listForUser: (input: { - readonly userId: string; - }) => Effect.Effect, AgentActivityRowListPersistenceError>; -} - -export class AgentActivityRows extends Context.Service()( - "t3code-relay/agentActivity/AgentActivityRows", -) {} +export class AgentActivityRows extends Context.Service< + AgentActivityRows, + { + readonly upsert: (input: { + readonly environmentPublicKey: string; + readonly state: RelayAgentActivityState; + }) => Effect.Effect; + readonly remove: (input: { + readonly environmentId: string; + readonly environmentPublicKey: string; + readonly threadId: string; + }) => Effect.Effect; + readonly listForUser: (input: { + readonly userId: string; + }) => Effect.Effect< + ReadonlyArray, + AgentActivityRowListPersistenceError + >; + } +>()("t3code-relay/agentActivity/AgentActivityRows") {} const decodeJsonString = Schema.decodeEffect(Schema.UnknownFromJsonString); const encodeJsonValue = Schema.encodeEffect(Schema.UnknownFromJsonString); @@ -82,7 +84,7 @@ const make = Effect.gen(function* () { const now = yield* DateTime.now; const stateJson = yield* encodeRelayAgentActivityStateJson(input.state).pipe( Effect.flatMap(decodeJsonString), - Effect.map(cast), + Effect.map(Function.cast), ); yield* db .insert(relayAgentActivityRows) diff --git a/infra/relay/src/agentActivity/ApnsClient.ts b/infra/relay/src/agentActivity/ApnsClient.ts index 90c0fe7dc84..61bfd69bc96 100644 --- a/infra/relay/src/agentActivity/ApnsClient.ts +++ b/infra/relay/src/agentActivity/ApnsClient.ts @@ -8,8 +8,10 @@ import * as Layer from "effect/Layer"; import * as Option from "effect/Option"; import * as Redacted from "effect/Redacted"; import * as Schema from "effect/Schema"; -import { Headers, HttpClient, HttpClientRequest } from "effect/unstable/http"; -import * as RelayConfiguration from "../Config.ts"; +import * as Headers from "effect/unstable/http/Headers"; +import * as HttpClient from "effect/unstable/http/HttpClient"; +import * as HttpClientRequest from "effect/unstable/http/HttpClientRequest"; +import type * as RelayConfiguration from "../Config.ts"; import type { ApnsNotificationPayload } from "./apnsDeliveryJobs.ts"; const LIVE_ACTIVITY_NAME = "AgentActivity"; @@ -231,29 +233,28 @@ function apnsReasonFromBody(body: string): string | undefined { }); } -export interface ApnsClientShape { - readonly makeLiveActivityRequest: typeof makeLiveActivityRequest; - readonly makePushNotificationRequest: typeof makePushNotificationRequest; - readonly sendLiveActivityRequest: (input: { - readonly credentials: RelayConfiguration.ApnsCredentials; - readonly request: ApnsLiveActivityRequest; - readonly issuedAtUnixSeconds: number; - }) => Effect.Effect; - readonly sendPushNotificationRequest: (input: { - readonly credentials: RelayConfiguration.ApnsCredentials; - readonly request: ApnsPushNotificationRequest; - readonly issuedAtUnixSeconds: number; - }) => Effect.Effect; -} - -export class ApnsClient extends Context.Service()( - "t3code-relay/agentActivity/ApnsClient", -) {} +export class ApnsClient extends Context.Service< + ApnsClient, + { + readonly makeLiveActivityRequest: typeof makeLiveActivityRequest; + readonly makePushNotificationRequest: typeof makePushNotificationRequest; + readonly sendLiveActivityRequest: (input: { + readonly credentials: RelayConfiguration.ApnsCredentials; + readonly request: ApnsLiveActivityRequest; + readonly issuedAtUnixSeconds: number; + }) => Effect.Effect; + readonly sendPushNotificationRequest: (input: { + readonly credentials: RelayConfiguration.ApnsCredentials; + readonly request: ApnsPushNotificationRequest; + readonly issuedAtUnixSeconds: number; + }) => Effect.Effect; + } +>()("t3code-relay/agentActivity/ApnsClient") {} const make = Effect.gen(function* () { const httpClient = yield* HttpClient.HttpClient; - const sendLiveActivityRequest: ApnsClientShape["sendLiveActivityRequest"] = Effect.fn( + const sendLiveActivityRequest: ApnsClient["Service"]["sendLiveActivityRequest"] = Effect.fn( "relay.apns.send_live_activity_request", )(function* (input) { yield* Effect.annotateCurrentSpan({ "relay.apns.event": input.request.event }); @@ -288,40 +289,41 @@ const make = Effect.gen(function* () { }; }); - const sendPushNotificationRequest: ApnsClientShape["sendPushNotificationRequest"] = Effect.fn( - "relay.apns.send_push_notification_request", - )(function* (input) { - yield* Effect.annotateCurrentSpan({ "relay.apns.event": "push_notification" }); - const jwt = yield* makeApnsJwt({ - ...input.credentials, - issuedAtUnixSeconds: input.issuedAtUnixSeconds, + const sendPushNotificationRequest: ApnsClient["Service"]["sendPushNotificationRequest"] = + Effect.fn("relay.apns.send_push_notification_request")(function* (input) { + yield* Effect.annotateCurrentSpan({ "relay.apns.event": "push_notification" }); + const jwt = yield* makeApnsJwt({ + ...input.credentials, + issuedAtUnixSeconds: input.issuedAtUnixSeconds, + }); + const host = + input.credentials.environment === "production" + ? "https://api.push.apple.com" + : "https://api.sandbox.push.apple.com"; + const response = yield* HttpClientRequest.post( + `${host}/3/device/${input.request.token}`, + ).pipe( + HttpClientRequest.setHeaders({ + authorization: `bearer ${jwt}`, + "apns-priority": input.request.priority, + "apns-push-type": "alert", + "apns-topic": input.credentials.bundleId, + }), + HttpClientRequest.bodyJson(input.request.payload), + Effect.flatMap(httpClient.execute), + Effect.mapError((cause) => new ApnsHttpRequestError({ cause })), + ); + const responseText = yield* response.text.pipe( + Effect.mapError((cause) => new ApnsHttpRequestError({ cause })), + ); + const reason = apnsReasonFromBody(responseText); + return { + ok: response.status >= 200 && response.status < 300, + status: response.status, + ...(reason === undefined ? {} : { reason }), + apnsId: Option.getOrNull(Headers.get(response.headers, "apns-id")), + }; }); - const host = - input.credentials.environment === "production" - ? "https://api.push.apple.com" - : "https://api.sandbox.push.apple.com"; - const response = yield* HttpClientRequest.post(`${host}/3/device/${input.request.token}`).pipe( - HttpClientRequest.setHeaders({ - authorization: `bearer ${jwt}`, - "apns-priority": input.request.priority, - "apns-push-type": "alert", - "apns-topic": input.credentials.bundleId, - }), - HttpClientRequest.bodyJson(input.request.payload), - Effect.flatMap(httpClient.execute), - Effect.mapError((cause) => new ApnsHttpRequestError({ cause })), - ); - const responseText = yield* response.text.pipe( - Effect.mapError((cause) => new ApnsHttpRequestError({ cause })), - ); - const reason = apnsReasonFromBody(responseText); - return { - ok: response.status >= 200 && response.status < 300, - status: response.status, - ...(reason === undefined ? {} : { reason }), - apnsId: Option.getOrNull(Headers.get(response.headers, "apns-id")), - }; - }); return ApnsClient.of({ makeLiveActivityRequest, diff --git a/infra/relay/src/agentActivity/ApnsDeliveries.test.ts b/infra/relay/src/agentActivity/ApnsDeliveries.test.ts index 81de6d32687..1497b6a73f4 100644 --- a/infra/relay/src/agentActivity/ApnsDeliveries.test.ts +++ b/infra/relay/src/agentActivity/ApnsDeliveries.test.ts @@ -141,16 +141,16 @@ function makeLayer(input: { readonly sourceJobClaims?: ReadonlyMap; readonly queuedJobs?: Array; readonly queuedStarts?: Array< - Parameters[0] + Parameters[0] >; readonly clearedStarts?: Array< - Parameters[0] + Parameters[0] >; readonly markedDeliveries?: Array< - Parameters[0] + Parameters[0] >; readonly invalidatedTokens?: Array< - Parameters[0] + Parameters[0] >; readonly currentTargets?: ReadonlyArray; readonly config?: RelayConfiguration.RelayConfiguration["Service"]; @@ -227,10 +227,10 @@ describe("ApnsDeliveries", () => { const attempts: Array = []; const queuedJobs: Array = []; const queuedStarts: Array< - Parameters[0] + Parameters[0] > = []; const markedDeliveries: Array< - Parameters[0] + Parameters[0] > = []; return Effect.gen(function* () { @@ -933,7 +933,7 @@ describe("ApnsDeliveries", () => { it.effect("invalidates dead device push tokens after permanent APNs alert failures", () => { const attempts: Array = []; const invalidatedTokens: Array< - Parameters[0] + Parameters[0] > = []; const payload = makeApnsDeliveryJobPayload({ kind: "push_notification", @@ -1000,7 +1000,7 @@ describe("ApnsDeliveries", () => { it.effect("clears queued start state when a start job fails in APNs", () => { const attempts: Array = []; const clearedStarts: Array< - Parameters[0] + Parameters[0] > = []; const payload = makeApnsDeliveryJobPayload({ kind: "live_activity_start", @@ -1035,7 +1035,7 @@ describe("ApnsDeliveries", () => { it.effect("invalidates dead push-to-start tokens after permanent APNs start failures", () => { const attempts: Array = []; const invalidatedTokens: Array< - Parameters[0] + Parameters[0] > = []; const payload = makeApnsDeliveryJobPayload({ kind: "live_activity_start", @@ -1082,7 +1082,7 @@ describe("ApnsDeliveries", () => { it.effect("invalidates dead Live Activity tokens after APNs unregisters them", () => { const attempts: Array = []; const invalidatedTokens: Array< - Parameters[0] + Parameters[0] > = []; const payload = makeApnsDeliveryJobPayload({ kind: "live_activity_update", diff --git a/infra/relay/src/agentActivity/ApnsDeliveries.ts b/infra/relay/src/agentActivity/ApnsDeliveries.ts index c1dba1467fa..d70808144fc 100644 --- a/infra/relay/src/agentActivity/ApnsDeliveries.ts +++ b/infra/relay/src/agentActivity/ApnsDeliveries.ts @@ -356,7 +356,7 @@ export type SendLiveActivityDeliveryInput = }); function makeLiveActivityDeliveryRequest( - apns: Apns.ApnsClientShape, + apns: Apns.ApnsClient["Service"], input: SendLiveActivityDeliveryInput, now: DateTime.DateTime, ) { @@ -391,33 +391,32 @@ function makeLiveActivityDeliveryRequest( } } -export interface ApnsDeliveriesShape { - readonly sendForTarget: (input: { - readonly target: LiveActivities.TargetRow; - readonly aggregate: RelayAgentActivityAggregateState | null; - readonly nowMs: number; - }) => Effect.Effect; - readonly sendPushNotificationForTarget: (input: { - readonly target: LiveActivities.TargetRow; - readonly aggregate: RelayAgentActivityAggregateState | null; - }) => Effect.Effect; - readonly sendLiveActivity: ( - input: SendLiveActivityDeliveryInput, - ) => Effect.Effect; - readonly processSignedJob: ( - body: unknown, - ) => Effect.Effect; - readonly sendPushNotification: (input: { - readonly target: LiveActivityDeliveryTarget; - readonly token: string; - readonly sourceJobId?: string | null; - readonly notification: ApnsNotificationPayload; - }) => Effect.Effect; -} - -export class ApnsDeliveries extends Context.Service()( - "t3code-relay/agentActivity/ApnsDeliveries", -) {} +export class ApnsDeliveries extends Context.Service< + ApnsDeliveries, + { + readonly sendForTarget: (input: { + readonly target: LiveActivities.TargetRow; + readonly aggregate: RelayAgentActivityAggregateState | null; + readonly nowMs: number; + }) => Effect.Effect; + readonly sendPushNotificationForTarget: (input: { + readonly target: LiveActivities.TargetRow; + readonly aggregate: RelayAgentActivityAggregateState | null; + }) => Effect.Effect; + readonly sendLiveActivity: ( + input: SendLiveActivityDeliveryInput, + ) => Effect.Effect; + readonly processSignedJob: ( + body: unknown, + ) => Effect.Effect; + readonly sendPushNotification: (input: { + readonly target: LiveActivityDeliveryTarget; + readonly token: string; + readonly sourceJobId?: string | null; + readonly notification: ApnsNotificationPayload; + }) => Effect.Effect; + } +>()("t3code-relay/agentActivity/ApnsDeliveries") {} const make = Effect.gen(function* () { const attempts = yield* DeliveryAttempts.DeliveryAttempts; @@ -442,7 +441,7 @@ const make = Effect.gen(function* () { ); }); - const sendLiveActivity: ApnsDeliveriesShape["sendLiveActivity"] = Effect.fn( + const sendLiveActivity: ApnsDeliveries["Service"]["sendLiveActivity"] = Effect.fn( "relay.apns_deliveries.send_live_activity", )(function* (input) { yield* Effect.annotateCurrentSpan({ @@ -550,7 +549,7 @@ const make = Effect.gen(function* () { }; }); - const sendPushNotification: ApnsDeliveriesShape["sendPushNotification"] = Effect.fn( + const sendPushNotification: ApnsDeliveries["Service"]["sendPushNotification"] = Effect.fn( "relay.apns_deliveries.send_push_notification", )(function* (input) { yield* Effect.annotateCurrentSpan({ @@ -654,14 +653,14 @@ const make = Effect.gen(function* () { }; }); - const processSignedJob: ApnsDeliveriesShape["processSignedJob"] = Effect.fn( + const processSignedJob: ApnsDeliveries["Service"]["processSignedJob"] = Effect.fn( "relay.apns_deliveries.process_signed_job", )(function* (body) { const signedJob = yield* decodeSignedApnsDeliveryJob(body).pipe( Effect.mapError( () => new ApnsDeliveryJobInvalid({ - message: "Invalid APNs delivery queue job.", + reason: "invalid-queue-payload", }), ), ); @@ -686,7 +685,7 @@ const make = Effect.gen(function* () { if (payload.aggregate === null) { return Effect.fail( new ApnsDeliveryJobInvalid({ - message: "Live Activity start/update jobs require an aggregate.", + reason: "missing-live-activity-aggregate", }), ); } @@ -715,7 +714,7 @@ const make = Effect.gen(function* () { if (payload.notification === null) { return Effect.fail( new ApnsDeliveryJobInvalid({ - message: "Push notification jobs require a notification payload.", + reason: "missing-push-notification", }), ); } diff --git a/infra/relay/src/agentActivity/ApnsDeliveryQueue.ts b/infra/relay/src/agentActivity/ApnsDeliveryQueue.ts index 3582e236b4d..33c21cf0d54 100644 --- a/infra/relay/src/agentActivity/ApnsDeliveryQueue.ts +++ b/infra/relay/src/agentActivity/ApnsDeliveryQueue.ts @@ -33,34 +33,31 @@ export class ApnsDeliveryQueueSendError extends Schema.TaggedErrorClass Effect.Effect; -} - export class ApnsDeliveryQueueSender extends Context.Service< ApnsDeliveryQueueSender, - ApnsDeliveryQueueSenderShape + { + readonly send: (body: SignedApnsDeliveryJob) => Effect.Effect; + } >()("t3code-relay/agentActivity/ApnsDeliveryQueue/ApnsDeliveryQueueSender") {} -export interface ApnsDeliveryQueueShape { - readonly enqueueLiveActivity: (input: { - readonly kind: ApnsDeliveryJobPayload["kind"]; - readonly userId: string; - readonly deviceId: string; - readonly token: string; - readonly aggregate: ApnsDeliveryJobPayload["aggregate"]; - }) => Effect.Effect; - readonly enqueuePushNotification: (input: { - readonly userId: string; - readonly deviceId: string; - readonly token: string; - readonly notification: NonNullable; - }) => Effect.Effect; -} - -export class ApnsDeliveryQueue extends Context.Service()( - "t3code-relay/agentActivity/ApnsDeliveryQueue", -) {} +export class ApnsDeliveryQueue extends Context.Service< + ApnsDeliveryQueue, + { + readonly enqueueLiveActivity: (input: { + readonly kind: ApnsDeliveryJobPayload["kind"]; + readonly userId: string; + readonly deviceId: string; + readonly token: string; + readonly aggregate: ApnsDeliveryJobPayload["aggregate"]; + }) => Effect.Effect; + readonly enqueuePushNotification: (input: { + readonly userId: string; + readonly deviceId: string; + readonly token: string; + readonly notification: NonNullable; + }) => Effect.Effect; + } +>()("t3code-relay/agentActivity/ApnsDeliveryQueue") {} const make = Effect.gen(function* () { const sender = yield* ApnsDeliveryQueueSender; diff --git a/infra/relay/src/agentActivity/DeliveryAttempts.ts b/infra/relay/src/agentActivity/DeliveryAttempts.ts index 6eb9b93c388..931837818b6 100644 --- a/infra/relay/src/agentActivity/DeliveryAttempts.ts +++ b/infra/relay/src/agentActivity/DeliveryAttempts.ts @@ -43,21 +43,20 @@ export interface DeliveryAttemptCompletionInput { export type DeliverySourceJobClaimResult = "claimed" | "completed" | "in_flight"; -export interface DeliveryAttemptsShape { - readonly record: ( - input: DeliveryAttemptInput, - ) => Effect.Effect; - readonly claimSourceJob: ( - input: DeliveryAttemptInput & { readonly sourceJobId: string }, - ) => Effect.Effect; - readonly completeSourceJob: ( - input: DeliveryAttemptCompletionInput, - ) => Effect.Effect; -} - -export class DeliveryAttempts extends Context.Service()( - "t3code-relay/agentActivity/DeliveryAttempts", -) {} +export class DeliveryAttempts extends Context.Service< + DeliveryAttempts, + { + readonly record: ( + input: DeliveryAttemptInput, + ) => Effect.Effect; + readonly claimSourceJob: ( + input: DeliveryAttemptInput & { readonly sourceJobId: string }, + ) => Effect.Effect; + readonly completeSourceJob: ( + input: DeliveryAttemptCompletionInput, + ) => Effect.Effect; + } +>()("t3code-relay/agentActivity/DeliveryAttempts") {} const SOURCE_JOB_CLAIM_LEASE_MINUTES = 10; diff --git a/infra/relay/src/agentActivity/Devices.ts b/infra/relay/src/agentActivity/Devices.ts index 108735f27ae..51a9bd53d64 100644 --- a/infra/relay/src/agentActivity/Devices.ts +++ b/infra/relay/src/agentActivity/Devices.ts @@ -40,23 +40,22 @@ export class DeviceListPersistenceError extends Schema.TaggedErrorClass Effect.Effect; - readonly unregister: (input: { - readonly userId: string; - readonly deviceId: string; - }) => Effect.Effect; - readonly listForUser: (input: { - readonly userId: string; - }) => Effect.Effect, DeviceListPersistenceError>; -} - -export class Devices extends Context.Service()( - "t3code-relay/agentActivity/Devices", -) {} +export class Devices extends Context.Service< + Devices, + { + readonly register: (input: { + readonly userId: string; + readonly registration: RelayDeviceRegistrationRequest; + }) => Effect.Effect; + readonly unregister: (input: { + readonly userId: string; + readonly deviceId: string; + }) => Effect.Effect; + readonly listForUser: (input: { + readonly userId: string; + }) => Effect.Effect, DeviceListPersistenceError>; + } +>()("t3code-relay/agentActivity/Devices") {} const make = Effect.gen(function* () { const db = yield* RelayDb.RelayDb; diff --git a/infra/relay/src/agentActivity/LiveActivities.ts b/infra/relay/src/agentActivity/LiveActivities.ts index 988dd6988b2..9ee1274b935 100644 --- a/infra/relay/src/agentActivity/LiveActivities.ts +++ b/infra/relay/src/agentActivity/LiveActivities.ts @@ -7,7 +7,7 @@ import { RelayAgentActivityAggregateState as RelayAgentActivityAggregateStateSch import * as Context from "effect/Context"; import * as DateTime from "effect/DateTime"; import * as Effect from "effect/Effect"; -import { cast } from "effect/Function"; +import * as Function from "effect/Function"; import * as Layer from "effect/Layer"; import * as Schema from "effect/Schema"; import { and, eq, sql } from "drizzle-orm"; @@ -64,41 +64,40 @@ export interface LiveActivityRow { export type TargetRow = DeviceRow & LiveActivityRow; -export interface LiveActivitiesShape { - readonly register: (input: { - readonly userId: string; - readonly registration: RelayLiveActivityRegistrationRequest; - }) => Effect.Effect; - readonly listTargets: (input: { - readonly userId: string; - }) => Effect.Effect, LiveActivityTargetListPersistenceError>; - readonly markDelivery: (input: { - readonly userId: string; - readonly deviceId: string; - readonly kind: RelayDeliveryKind; - readonly aggregate: RelayAgentActivityAggregateState | null; - readonly deliveredAt: string; - }) => Effect.Effect; - readonly markStartQueued: (input: { - readonly userId: string; - readonly deviceId: string; - readonly queuedAt: string; - }) => Effect.Effect; - readonly clearStartQueued: (input: { - readonly userId: string; - readonly deviceId: string; - }) => Effect.Effect; - readonly invalidateDeliveryToken: (input: { - readonly userId: string; - readonly deviceId: string; - readonly kind: RelayDeliveryKind; - readonly invalidatedAt: string; - }) => Effect.Effect; -} - -export class LiveActivities extends Context.Service()( - "t3code-relay/agentActivity/LiveActivities", -) {} +export class LiveActivities extends Context.Service< + LiveActivities, + { + readonly register: (input: { + readonly userId: string; + readonly registration: RelayLiveActivityRegistrationRequest; + }) => Effect.Effect; + readonly listTargets: (input: { + readonly userId: string; + }) => Effect.Effect, LiveActivityTargetListPersistenceError>; + readonly markDelivery: (input: { + readonly userId: string; + readonly deviceId: string; + readonly kind: RelayDeliveryKind; + readonly aggregate: RelayAgentActivityAggregateState | null; + readonly deliveredAt: string; + }) => Effect.Effect; + readonly markStartQueued: (input: { + readonly userId: string; + readonly deviceId: string; + readonly queuedAt: string; + }) => Effect.Effect; + readonly clearStartQueued: (input: { + readonly userId: string; + readonly deviceId: string; + }) => Effect.Effect; + readonly invalidateDeliveryToken: (input: { + readonly userId: string; + readonly deviceId: string; + readonly kind: RelayDeliveryKind; + readonly invalidatedAt: string; + }) => Effect.Effect; + } +>()("t3code-relay/agentActivity/LiveActivities") {} const decodeJsonString = Schema.decodeEffect(Schema.UnknownFromJsonString); const encodeJsonValue = Schema.encodeEffect(Schema.UnknownFromJsonString); @@ -223,7 +222,7 @@ const make = Effect.gen(function* () { ? null : yield* encodeRelayAgentActivityAggregateStateJson(input.aggregate).pipe( Effect.flatMap(decodeJsonString), - Effect.map(cast), + Effect.map(Function.cast), ); yield* db diff --git a/infra/relay/src/agentActivity/MobileRegistrations.test.ts b/infra/relay/src/agentActivity/MobileRegistrations.test.ts index 8d8e6f21461..eed330dd589 100644 --- a/infra/relay/src/agentActivity/MobileRegistrations.test.ts +++ b/infra/relay/src/agentActivity/MobileRegistrations.test.ts @@ -38,7 +38,9 @@ const device: RelayDeviceRegistrationRequest = { }, }; -function makeDevices(overrides: Partial = {}): Devices.DevicesShape { +function makeDevices( + overrides: Partial = {}, +): Devices.Devices["Service"] { return { register: () => Effect.void, unregister: () => Effect.void, @@ -48,8 +50,8 @@ function makeDevices(overrides: Partial = {}): Devices.Dev } function makeLiveActivities( - overrides: Partial = {}, -): LiveActivities.LiveActivitiesShape { + overrides: Partial = {}, +): LiveActivities.LiveActivities["Service"] { return { register: () => Effect.void, listTargets: () => Effect.succeed([]), @@ -62,8 +64,8 @@ function makeLiveActivities( } function makeAgentActivityRows( - overrides: Partial = {}, -): AgentActivityRows.AgentActivityRowsShape { + overrides: Partial = {}, +): AgentActivityRows.AgentActivityRows["Service"] { return { upsert: () => Effect.void, remove: () => Effect.void, @@ -108,8 +110,8 @@ function makeEnvironmentLinks( } function makeDeliveryAttempts( - overrides: Partial = {}, -): DeliveryAttempts.DeliveryAttemptsShape { + overrides: Partial = {}, +): DeliveryAttempts.DeliveryAttempts["Service"] { return { record: () => Effect.void, claimSourceJob: () => Effect.succeed("claimed"), @@ -138,8 +140,8 @@ const config = RelayConfiguration.RelayConfiguration.of({ }); function makeRegistrationReplayLayer(input: { - readonly devices: Devices.DevicesShape; - readonly liveActivities: LiveActivities.LiveActivitiesShape; + readonly devices: Devices.Devices["Service"]; + readonly liveActivities: LiveActivities.LiveActivities["Service"]; readonly queuedJobs: Array; }) { return MobileRegistrations.layer.pipe( @@ -167,8 +169,8 @@ function makeRegistrationReplayLayer(input: { } function makeAgentActivityPublisher( - overrides: Partial = {}, -): AgentActivityPublisher.AgentActivityPublisherShape { + overrides: Partial = {}, +): AgentActivityPublisher.AgentActivityPublisher["Service"] { return { publish: () => Effect.succeed({ ok: true, deliveries: [] }), replayForLiveActivityRegistration: () => Effect.succeed(null), @@ -178,10 +180,10 @@ function makeAgentActivityPublisher( describe("MobileRegistrations", () => { it.effect("registers devices through the device persistence service", () => { - let registered: Parameters[0] | null = null; + let registered: Parameters[0] | null = null; let replayed: | Parameters< - AgentActivityPublisher.AgentActivityPublisherShape["replayForLiveActivityRegistration"] + AgentActivityPublisher.AgentActivityPublisher["Service"]["replayForLiveActivityRegistration"] >[0] | null = null; @@ -263,7 +265,7 @@ describe("MobileRegistrations", () => { }); it.effect("unregisters the current user's device", () => { - let unregistered: Parameters[0] | null = null; + let unregistered: Parameters[0] | null = null; return Effect.gen(function* () { const result = yield* Effect.gen(function* () { @@ -310,10 +312,11 @@ describe("MobileRegistrations", () => { deviceId: "device-1" as const, activityPushToken: "activity-token" as const, }; - let registered: Parameters[0] | null = null; + let registered: Parameters[0] | null = + null; let replayed: | Parameters< - AgentActivityPublisher.AgentActivityPublisherShape["replayForLiveActivityRegistration"] + AgentActivityPublisher.AgentActivityPublisher["Service"]["replayForLiveActivityRegistration"] >[0] | null = null; @@ -372,9 +375,9 @@ describe("MobileRegistrations", () => { () => { const queuedJobs: Array = []; const queuedStarts: Array< - Parameters[0] + Parameters[0] > = []; - const registeredDevices: Array[0]> = []; + const registeredDevices: Array[0]> = []; const devices = makeDevices({ register: (input) => Effect.sync(() => { diff --git a/infra/relay/src/agentActivity/MobileRegistrations.ts b/infra/relay/src/agentActivity/MobileRegistrations.ts index d9c013232a3..b44d24dfa5d 100644 --- a/infra/relay/src/agentActivity/MobileRegistrations.ts +++ b/infra/relay/src/agentActivity/MobileRegistrations.ts @@ -15,24 +15,22 @@ export type MobileRegistrationError = | Devices.DeviceUnregistrationPersistenceError | LiveActivities.LiveActivityRegistrationPersistenceError; -export interface MobileRegistrationsShape { - readonly registerDevice: (input: { - readonly userId: string; - readonly payload: RelayDeviceRegistrationRequest; - }) => Effect.Effect<{ readonly ok: true }, MobileRegistrationError>; - readonly registerLiveActivity: (input: { - readonly userId: string; - readonly payload: RelayLiveActivityRegistrationRequest; - }) => Effect.Effect<{ readonly ok: true }, MobileRegistrationError>; - readonly unregisterDevice: (input: { - readonly userId: string; - readonly deviceId: string; - }) => Effect.Effect<{ readonly ok: true }, MobileRegistrationError>; -} - export class MobileRegistrations extends Context.Service< MobileRegistrations, - MobileRegistrationsShape + { + readonly registerDevice: (input: { + readonly userId: string; + readonly payload: RelayDeviceRegistrationRequest; + }) => Effect.Effect<{ readonly ok: true }, MobileRegistrationError>; + readonly registerLiveActivity: (input: { + readonly userId: string; + readonly payload: RelayLiveActivityRegistrationRequest; + }) => Effect.Effect<{ readonly ok: true }, MobileRegistrationError>; + readonly unregisterDevice: (input: { + readonly userId: string; + readonly deviceId: string; + }) => Effect.Effect<{ readonly ok: true }, MobileRegistrationError>; + } >()("t3code-relay/agentActivity/MobileRegistrations") {} const make = Effect.gen(function* () { diff --git a/infra/relay/src/agentActivity/apnsDeliveryJobs.ts b/infra/relay/src/agentActivity/apnsDeliveryJobs.ts index d509baa9168..8de33752a9a 100644 --- a/infra/relay/src/agentActivity/apnsDeliveryJobs.ts +++ b/infra/relay/src/agentActivity/apnsDeliveryJobs.ts @@ -52,9 +52,45 @@ export type SignedApnsDeliveryJob = typeof SignedApnsDeliveryJob.Type; export class ApnsDeliveryJobInvalid extends Schema.TaggedErrorClass()( "ApnsDeliveryJobInvalid", { - message: Schema.String, + reason: Schema.Literals([ + "invalid-queue-payload", + "missing-live-activity-aggregate", + "unexpected-live-activity-notification", + "missing-push-notification", + "unexpected-push-notification-aggregate", + "invalid-created-at", + "invalid-expires-at", + "invalid-time-window", + "time-window-too-long", + "invalid-signature", + ]), }, -) {} +) { + override get message(): string { + switch (this.reason) { + case "invalid-queue-payload": + return "Invalid APNs delivery queue job."; + case "missing-live-activity-aggregate": + return "Live Activity start/update jobs require an aggregate."; + case "unexpected-live-activity-notification": + return "Live Activity jobs must not carry push notification payloads."; + case "missing-push-notification": + return "Push notification jobs require a notification payload."; + case "unexpected-push-notification-aggregate": + return "Push notification jobs must not carry aggregate state."; + case "invalid-created-at": + return "Invalid APNs delivery job creation time."; + case "invalid-expires-at": + return "Invalid APNs delivery job expiry."; + case "invalid-time-window": + return "Invalid APNs delivery job time window."; + case "time-window-too-long": + return "APNs delivery job time window is too long."; + case "invalid-signature": + return "Invalid APNs delivery job signature."; + } + } +} export class ApnsDeliveryJobExpired extends Schema.TaggedErrorClass()( "ApnsDeliveryJobExpired", @@ -106,31 +142,31 @@ function validatePayloadShape(payload: ApnsDeliveryJobPayload): ApnsDeliveryJobI case "live_activity_update": if (payload.aggregate === null) { return new ApnsDeliveryJobInvalid({ - message: "Live Activity start/update jobs require an aggregate.", + reason: "missing-live-activity-aggregate", }); } if (payload.notification !== null) { return new ApnsDeliveryJobInvalid({ - message: "Live Activity jobs must not carry push notification payloads.", + reason: "unexpected-live-activity-notification", }); } return null; case "live_activity_end": if (payload.notification !== null) { return new ApnsDeliveryJobInvalid({ - message: "Live Activity jobs must not carry push notification payloads.", + reason: "unexpected-live-activity-notification", }); } return null; case "push_notification": if (payload.notification === null) { return new ApnsDeliveryJobInvalid({ - message: "Push notification jobs require a notification payload.", + reason: "missing-push-notification", }); } if (payload.aggregate !== null) { return new ApnsDeliveryJobInvalid({ - message: "Push notification jobs must not carry aggregate state.", + reason: "unexpected-push-notification-aggregate", }); } return null; @@ -177,19 +213,19 @@ export function verifySignedApnsDeliveryJob(input: { } const createdAt = DateTime.make(input.job.payload.createdAt); if (Option.isNone(createdAt)) { - return new ApnsDeliveryJobInvalid({ message: "Invalid APNs delivery job creation time." }); + return new ApnsDeliveryJobInvalid({ reason: "invalid-created-at" }); } const expiresAt = DateTime.make(input.job.payload.expiresAt); if (Option.isNone(expiresAt)) { - return new ApnsDeliveryJobInvalid({ message: "Invalid APNs delivery job expiry." }); + return new ApnsDeliveryJobInvalid({ reason: "invalid-expires-at" }); } const createdAtMs = createdAt.value.epochMilliseconds; const expiresAtMs = expiresAt.value.epochMilliseconds; if (expiresAtMs <= createdAtMs) { - return new ApnsDeliveryJobInvalid({ message: "Invalid APNs delivery job time window." }); + return new ApnsDeliveryJobInvalid({ reason: "invalid-time-window" }); } if (expiresAtMs - createdAtMs > MAX_JOB_AGE_MS) { - return new ApnsDeliveryJobInvalid({ message: "APNs delivery job time window is too long." }); + return new ApnsDeliveryJobInvalid({ reason: "time-window-too-long" }); } if (expiresAtMs <= input.nowMs) { return new ApnsDeliveryJobExpired({ expiresAt: input.job.payload.expiresAt }); @@ -199,7 +235,7 @@ export function verifySignedApnsDeliveryJob(input: { payload: input.job.payload, }); if (!timingSafeEqualBase64Url(input.job.signature, expected)) { - return new ApnsDeliveryJobInvalid({ message: "Invalid APNs delivery job signature." }); + return new ApnsDeliveryJobInvalid({ reason: "invalid-signature" }); } return input.job.payload; }