Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions infra/relay/src/agentActivity/AgentActivityPublisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ function target(deviceId: string): LiveActivities.TargetRow {
}

function makeLiveActivities(
overrides: Partial<LiveActivities.LiveActivitiesShape> = {},
): LiveActivities.LiveActivitiesShape {
overrides: Partial<LiveActivities.LiveActivities["Service"]> = {},
): LiveActivities.LiveActivities["Service"] {
return {
register: () => Effect.void,
listTargets: () => Effect.succeed([]),
Expand All @@ -55,8 +55,8 @@ function makeLiveActivities(
}

function makeAgentActivityRows(
overrides: Partial<AgentActivityRows.AgentActivityRowsShape> = {},
): AgentActivityRows.AgentActivityRowsShape {
overrides: Partial<AgentActivityRows.AgentActivityRows["Service"]> = {},
): AgentActivityRows.AgentActivityRows["Service"] {
return {
upsert: () => Effect.void,
remove: () => Effect.void,
Expand Down Expand Up @@ -88,8 +88,8 @@ function makeEnvironmentLinks(
}

function makeApnsDeliveries(
overrides: Partial<ApnsDeliveries.ApnsDeliveriesShape> = {},
): ApnsDeliveries.ApnsDeliveriesShape {
overrides: Partial<ApnsDeliveries.ApnsDeliveries["Service"]> = {},
): ApnsDeliveries.ApnsDeliveries["Service"] {
return {
sendForTarget: () => Effect.succeed(null),
sendPushNotificationForTarget: () => Effect.succeed(null),
Expand Down Expand Up @@ -133,7 +133,8 @@ describe("AgentActivityPublisher", () => {
remote_start_queued_at: null,
remote_started_at: "1970-01-01T00:00:01.000Z",
};
const sent: Array<Parameters<ApnsDeliveries.ApnsDeliveriesShape["sendForTarget"]>[0]> = [];
const sent: Array<Parameters<ApnsDeliveries.ApnsDeliveries["Service"]["sendForTarget"]>[0]> =
[];
const deliveryResult: RelayDeliveryResult = {
deviceId: "device-1",
kind: "live_activity_update",
Expand Down Expand Up @@ -211,7 +212,8 @@ describe("AgentActivityPublisher", () => {
readonly environmentId: string;
readonly environmentPublicKey: string;
}> = [];
const upserts: Array<Parameters<AgentActivityRows.AgentActivityRowsShape["upsert"]>[0]> = [];
const upserts: Array<Parameters<AgentActivityRows.AgentActivityRows["Service"]["upsert"]>[0]> =
[];

return Effect.gen(function* () {
const result = yield* Effect.gen(function* () {
Expand Down Expand Up @@ -302,9 +304,10 @@ describe("AgentActivityPublisher", () => {
updatedAt: "1970-01-01T00:00:10.000Z",
};
const sentAggregates: Array<
Parameters<ApnsDeliveries.ApnsDeliveriesShape["sendForTarget"]>[0]
Parameters<ApnsDeliveries.ApnsDeliveries["Service"]["sendForTarget"]>[0]
> = [];
const removals: Array<Parameters<AgentActivityRows.AgentActivityRowsShape["remove"]>[0]> = [];
const removals: Array<Parameters<AgentActivityRows.AgentActivityRows["Service"]["remove"]>[0]> =
[];

return Effect.gen(function* () {
const result = yield* Effect.gen(function* () {
Expand Down Expand Up @@ -405,10 +408,10 @@ describe("AgentActivityPublisher", () => {
headline: "Needs input",
};
const liveAggregates: Array<
Parameters<ApnsDeliveries.ApnsDeliveriesShape["sendForTarget"]>[0]
Parameters<ApnsDeliveries.ApnsDeliveries["Service"]["sendForTarget"]>[0]
> = [];
const pushAggregates: Array<
Parameters<ApnsDeliveries.ApnsDeliveriesShape["sendPushNotificationForTarget"]>[0]
Parameters<ApnsDeliveries.ApnsDeliveries["Service"]["sendPushNotificationForTarget"]>[0]
> = [];

return Effect.gen(function* () {
Expand Down Expand Up @@ -517,10 +520,10 @@ describe("AgentActivityPublisher", () => {
headline: "Needs approval",
};
const liveAggregates: Array<
Parameters<ApnsDeliveries.ApnsDeliveriesShape["sendForTarget"]>[0]
Parameters<ApnsDeliveries.ApnsDeliveries["Service"]["sendForTarget"]>[0]
> = [];
const pushAggregates: Array<
Parameters<ApnsDeliveries.ApnsDeliveriesShape["sendPushNotificationForTarget"]>[0]
Parameters<ApnsDeliveries.ApnsDeliveries["Service"]["sendPushNotificationForTarget"]>[0]
> = [];

return Effect.gen(function* () {
Expand Down
26 changes: 12 additions & 14 deletions infra/relay/src/agentActivity/AgentActivityPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,20 @@ export type AgentActivityPublishError =
| LiveActivities.LiveActivityTargetListPersistenceError
| ApnsDeliveries.ApnsDeliveryError;

export interface AgentActivityPublisherShape {
readonly publish: (input: {
readonly environmentId: string;
readonly environmentPublicKey: string;
readonly threadId: string;
readonly state: RelayAgentActivityState | null;
}) => Effect.Effect<RelayPublishResponse, AgentActivityPublishError>;
readonly replayForLiveActivityRegistration: (input: {
readonly userId: string;
readonly deviceId: string;
}) => Effect.Effect<RelayDeliveryResult | null, AgentActivityPublishError>;
}

export class AgentActivityPublisher extends Context.Service<
AgentActivityPublisher,
AgentActivityPublisherShape
{
readonly publish: (input: {
readonly environmentId: string;
readonly environmentPublicKey: string;
readonly threadId: string;
readonly state: RelayAgentActivityState | null;
}) => Effect.Effect<RelayPublishResponse, AgentActivityPublishError>;
readonly replayForLiveActivityRegistration: (input: {
readonly userId: string;
readonly deviceId: string;
}) => Effect.Effect<RelayDeliveryResult | null, AgentActivityPublishError>;
}
>()("t3code-relay/agentActivity/AgentActivityPublisher") {}

const make = Effect.gen(function* () {
Expand Down
42 changes: 22 additions & 20 deletions infra/relay/src/agentActivity/AgentActivityRows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { RelayAgentActivityState as RelayAgentActivityStateSchema } from "@t3too
import * as Context from "effect/Context";
import * as DateTime from "effect/DateTime";
import * as Effect from "effect/Effect";
import { cast } from "effect/Function";
import * as Function from "effect/Function";
import * as Layer from "effect/Layer";
import * as Option from "effect/Option";
import * as Schema from "effect/Schema";
Expand Down Expand Up @@ -39,24 +39,26 @@ export class AgentActivityRowListPersistenceError extends Schema.TaggedErrorClas
}
}

export interface AgentActivityRowsShape {
readonly upsert: (input: {
readonly environmentPublicKey: string;
readonly state: RelayAgentActivityState;
}) => Effect.Effect<void, AgentActivityRowUpsertPersistenceError>;
readonly remove: (input: {
readonly environmentId: string;
readonly environmentPublicKey: string;
readonly threadId: string;
}) => Effect.Effect<void, AgentActivityRowDeletePersistenceError>;
readonly listForUser: (input: {
readonly userId: string;
}) => Effect.Effect<ReadonlyArray<RelayAgentActivityState>, AgentActivityRowListPersistenceError>;
}

export class AgentActivityRows extends Context.Service<AgentActivityRows, AgentActivityRowsShape>()(
"t3code-relay/agentActivity/AgentActivityRows",
) {}
export class AgentActivityRows extends Context.Service<
AgentActivityRows,
{
readonly upsert: (input: {
readonly environmentPublicKey: string;
readonly state: RelayAgentActivityState;
}) => Effect.Effect<void, AgentActivityRowUpsertPersistenceError>;
readonly remove: (input: {
readonly environmentId: string;
readonly environmentPublicKey: string;
readonly threadId: string;
}) => Effect.Effect<void, AgentActivityRowDeletePersistenceError>;
readonly listForUser: (input: {
readonly userId: string;
}) => Effect.Effect<
ReadonlyArray<RelayAgentActivityState>,
AgentActivityRowListPersistenceError
>;
}
>()("t3code-relay/agentActivity/AgentActivityRows") {}

const decodeJsonString = Schema.decodeEffect(Schema.UnknownFromJsonString);
const encodeJsonValue = Schema.encodeEffect(Schema.UnknownFromJsonString);
Expand All @@ -82,7 +84,7 @@ const make = Effect.gen(function* () {
const now = yield* DateTime.now;
const stateJson = yield* encodeRelayAgentActivityStateJson(input.state).pipe(
Effect.flatMap(decodeJsonString),
Effect.map(cast<unknown, RelayAgentActivityState>),
Effect.map(Function.cast<unknown, RelayAgentActivityState>),
);
yield* db
.insert(relayAgentActivityRows)
Expand Down
110 changes: 56 additions & 54 deletions infra/relay/src/agentActivity/ApnsClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import * as Layer from "effect/Layer";
import * as Option from "effect/Option";
import * as Redacted from "effect/Redacted";
import * as Schema from "effect/Schema";
import { Headers, HttpClient, HttpClientRequest } from "effect/unstable/http";
import * as RelayConfiguration from "../Config.ts";
import * as Headers from "effect/unstable/http/Headers";
import * as HttpClient from "effect/unstable/http/HttpClient";
import * as HttpClientRequest from "effect/unstable/http/HttpClientRequest";
import type * as RelayConfiguration from "../Config.ts";
import type { ApnsNotificationPayload } from "./apnsDeliveryJobs.ts";

const LIVE_ACTIVITY_NAME = "AgentActivity";
Expand Down Expand Up @@ -231,29 +233,28 @@ function apnsReasonFromBody(body: string): string | undefined {
});
}

export interface ApnsClientShape {
readonly makeLiveActivityRequest: typeof makeLiveActivityRequest;
readonly makePushNotificationRequest: typeof makePushNotificationRequest;
readonly sendLiveActivityRequest: (input: {
readonly credentials: RelayConfiguration.ApnsCredentials;
readonly request: ApnsLiveActivityRequest;
readonly issuedAtUnixSeconds: number;
}) => Effect.Effect<ApnsDeliveryResult, ApnsError>;
readonly sendPushNotificationRequest: (input: {
readonly credentials: RelayConfiguration.ApnsCredentials;
readonly request: ApnsPushNotificationRequest;
readonly issuedAtUnixSeconds: number;
}) => Effect.Effect<ApnsDeliveryResult, ApnsError>;
}

export class ApnsClient extends Context.Service<ApnsClient, ApnsClientShape>()(
"t3code-relay/agentActivity/ApnsClient",
) {}
export class ApnsClient extends Context.Service<
ApnsClient,
{
readonly makeLiveActivityRequest: typeof makeLiveActivityRequest;
readonly makePushNotificationRequest: typeof makePushNotificationRequest;
readonly sendLiveActivityRequest: (input: {
readonly credentials: RelayConfiguration.ApnsCredentials;
readonly request: ApnsLiveActivityRequest;
readonly issuedAtUnixSeconds: number;
}) => Effect.Effect<ApnsDeliveryResult, ApnsError>;
readonly sendPushNotificationRequest: (input: {
readonly credentials: RelayConfiguration.ApnsCredentials;
readonly request: ApnsPushNotificationRequest;
readonly issuedAtUnixSeconds: number;
}) => Effect.Effect<ApnsDeliveryResult, ApnsError>;
}
>()("t3code-relay/agentActivity/ApnsClient") {}

const make = Effect.gen(function* () {
const httpClient = yield* HttpClient.HttpClient;

const sendLiveActivityRequest: ApnsClientShape["sendLiveActivityRequest"] = Effect.fn(
const sendLiveActivityRequest: ApnsClient["Service"]["sendLiveActivityRequest"] = Effect.fn(
"relay.apns.send_live_activity_request",
)(function* (input) {
yield* Effect.annotateCurrentSpan({ "relay.apns.event": input.request.event });
Expand Down Expand Up @@ -288,40 +289,41 @@ const make = Effect.gen(function* () {
};
});

const sendPushNotificationRequest: ApnsClientShape["sendPushNotificationRequest"] = Effect.fn(
"relay.apns.send_push_notification_request",
)(function* (input) {
yield* Effect.annotateCurrentSpan({ "relay.apns.event": "push_notification" });
const jwt = yield* makeApnsJwt({
...input.credentials,
issuedAtUnixSeconds: input.issuedAtUnixSeconds,
const sendPushNotificationRequest: ApnsClient["Service"]["sendPushNotificationRequest"] =
Effect.fn("relay.apns.send_push_notification_request")(function* (input) {
yield* Effect.annotateCurrentSpan({ "relay.apns.event": "push_notification" });
const jwt = yield* makeApnsJwt({
...input.credentials,
issuedAtUnixSeconds: input.issuedAtUnixSeconds,
});
const host =
input.credentials.environment === "production"
? "https://api.push.apple.com"
: "https://api.sandbox.push.apple.com";
const response = yield* HttpClientRequest.post(
`${host}/3/device/${input.request.token}`,
).pipe(
HttpClientRequest.setHeaders({
authorization: `bearer ${jwt}`,
"apns-priority": input.request.priority,
"apns-push-type": "alert",
"apns-topic": input.credentials.bundleId,
}),
HttpClientRequest.bodyJson(input.request.payload),
Effect.flatMap(httpClient.execute),
Effect.mapError((cause) => new ApnsHttpRequestError({ cause })),
);
const responseText = yield* response.text.pipe(
Effect.mapError((cause) => new ApnsHttpRequestError({ cause })),
);
const reason = apnsReasonFromBody(responseText);
return {
ok: response.status >= 200 && response.status < 300,
status: response.status,
...(reason === undefined ? {} : { reason }),
apnsId: Option.getOrNull(Headers.get(response.headers, "apns-id")),
};
});
const host =
input.credentials.environment === "production"
? "https://api.push.apple.com"
: "https://api.sandbox.push.apple.com";
const response = yield* HttpClientRequest.post(`${host}/3/device/${input.request.token}`).pipe(
HttpClientRequest.setHeaders({
authorization: `bearer ${jwt}`,
"apns-priority": input.request.priority,
"apns-push-type": "alert",
"apns-topic": input.credentials.bundleId,
}),
HttpClientRequest.bodyJson(input.request.payload),
Effect.flatMap(httpClient.execute),
Effect.mapError((cause) => new ApnsHttpRequestError({ cause })),
);
const responseText = yield* response.text.pipe(
Effect.mapError((cause) => new ApnsHttpRequestError({ cause })),
);
const reason = apnsReasonFromBody(responseText);
return {
ok: response.status >= 200 && response.status < 300,
status: response.status,
...(reason === undefined ? {} : { reason }),
apnsId: Option.getOrNull(Headers.get(response.headers, "apns-id")),
};
});

return ApnsClient.of({
makeLiveActivityRequest,
Expand Down
20 changes: 10 additions & 10 deletions infra/relay/src/agentActivity/ApnsDeliveries.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,16 @@ function makeLayer(input: {
readonly sourceJobClaims?: ReadonlyMap<string, DeliveryAttempts.DeliverySourceJobClaimResult>;
readonly queuedJobs?: Array<SignedApnsDeliveryJob>;
readonly queuedStarts?: Array<
Parameters<LiveActivities.LiveActivitiesShape["markStartQueued"]>[0]
Parameters<LiveActivities.LiveActivities["Service"]["markStartQueued"]>[0]
>;
readonly clearedStarts?: Array<
Parameters<LiveActivities.LiveActivitiesShape["clearStartQueued"]>[0]
Parameters<LiveActivities.LiveActivities["Service"]["clearStartQueued"]>[0]
>;
readonly markedDeliveries?: Array<
Parameters<LiveActivities.LiveActivitiesShape["markDelivery"]>[0]
Parameters<LiveActivities.LiveActivities["Service"]["markDelivery"]>[0]
>;
readonly invalidatedTokens?: Array<
Parameters<LiveActivities.LiveActivitiesShape["invalidateDeliveryToken"]>[0]
Parameters<LiveActivities.LiveActivities["Service"]["invalidateDeliveryToken"]>[0]
>;
readonly currentTargets?: ReadonlyArray<LiveActivities.TargetRow>;
readonly config?: RelayConfiguration.RelayConfiguration["Service"];
Expand Down Expand Up @@ -227,10 +227,10 @@ describe("ApnsDeliveries", () => {
const attempts: Array<DeliveryAttempts.DeliveryAttemptInput> = [];
const queuedJobs: Array<SignedApnsDeliveryJob> = [];
const queuedStarts: Array<
Parameters<LiveActivities.LiveActivitiesShape["markStartQueued"]>[0]
Parameters<LiveActivities.LiveActivities["Service"]["markStartQueued"]>[0]
> = [];
const markedDeliveries: Array<
Parameters<LiveActivities.LiveActivitiesShape["markDelivery"]>[0]
Parameters<LiveActivities.LiveActivities["Service"]["markDelivery"]>[0]
> = [];

return Effect.gen(function* () {
Expand Down Expand Up @@ -933,7 +933,7 @@ describe("ApnsDeliveries", () => {
it.effect("invalidates dead device push tokens after permanent APNs alert failures", () => {
const attempts: Array<DeliveryAttempts.DeliveryAttemptInput> = [];
const invalidatedTokens: Array<
Parameters<LiveActivities.LiveActivitiesShape["invalidateDeliveryToken"]>[0]
Parameters<LiveActivities.LiveActivities["Service"]["invalidateDeliveryToken"]>[0]
> = [];
const payload = makeApnsDeliveryJobPayload({
kind: "push_notification",
Expand Down Expand Up @@ -1000,7 +1000,7 @@ describe("ApnsDeliveries", () => {
it.effect("clears queued start state when a start job fails in APNs", () => {
const attempts: Array<DeliveryAttempts.DeliveryAttemptInput> = [];
const clearedStarts: Array<
Parameters<LiveActivities.LiveActivitiesShape["clearStartQueued"]>[0]
Parameters<LiveActivities.LiveActivities["Service"]["clearStartQueued"]>[0]
> = [];
const payload = makeApnsDeliveryJobPayload({
kind: "live_activity_start",
Expand Down Expand Up @@ -1035,7 +1035,7 @@ describe("ApnsDeliveries", () => {
it.effect("invalidates dead push-to-start tokens after permanent APNs start failures", () => {
const attempts: Array<DeliveryAttempts.DeliveryAttemptInput> = [];
const invalidatedTokens: Array<
Parameters<LiveActivities.LiveActivitiesShape["invalidateDeliveryToken"]>[0]
Parameters<LiveActivities.LiveActivities["Service"]["invalidateDeliveryToken"]>[0]
> = [];
const payload = makeApnsDeliveryJobPayload({
kind: "live_activity_start",
Expand Down Expand Up @@ -1082,7 +1082,7 @@ describe("ApnsDeliveries", () => {
it.effect("invalidates dead Live Activity tokens after APNs unregisters them", () => {
const attempts: Array<DeliveryAttempts.DeliveryAttemptInput> = [];
const invalidatedTokens: Array<
Parameters<LiveActivities.LiveActivitiesShape["invalidateDeliveryToken"]>[0]
Parameters<LiveActivities.LiveActivities["Service"]["invalidateDeliveryToken"]>[0]
> = [];
const payload = makeApnsDeliveryJobPayload({
kind: "live_activity_update",
Expand Down
Loading
Loading