diff --git a/packages/effect-acp/src/_internal/shared.ts b/packages/effect-acp/src/_internal/shared.ts index 7e43bbf8831..f54f81e2a08 100644 --- a/packages/effect-acp/src/_internal/shared.ts +++ b/packages/effect-acp/src/_internal/shared.ts @@ -12,7 +12,7 @@ export const callRpc = ( ): Effect.Effect => effect.pipe( Effect.catchIf(isError, (error) => - Effect.fail(AcpError.AcpRequestError.fromProtocolError(error)), + Effect.fail(AcpError.AcpRequestError.fromProtocolError(error, { method })), ), Effect.catchTags({ RpcClientError: (cause) => diff --git a/packages/effect-acp/src/errors.test.ts b/packages/effect-acp/src/errors.test.ts index 5187fabf5d2..a54c5af4843 100644 --- a/packages/effect-acp/src/errors.test.ts +++ b/packages/effect-acp/src/errors.test.ts @@ -51,6 +51,8 @@ describe("effect-acp errors", () => { code: -32602, errorMessage: "Invalid params", data: { field: "sessionId" }, + method: "session/load", + operation: "receive-response", }); }); }); diff --git a/packages/effect-acp/src/errors.ts b/packages/effect-acp/src/errors.ts index b3c0dee6294..3fe0a469001 100644 --- a/packages/effect-acp/src/errors.ts +++ b/packages/effect-acp/src/errors.ts @@ -5,8 +5,11 @@ import * as AcpSchema from "./_generated/schema.gen.ts"; export const AcpRequestOperation = Schema.Literals([ "decode-extension-request-payload", + "encode-extension-response", "handle-request", "handle-extension-request", + "receive-response", + "receive-streaming-response", ]); export type AcpRequestOperation = typeof AcpRequestOperation.Type; @@ -65,6 +68,7 @@ const schemaIssueDiagnostics = (root: SchemaIssue.Issue): AcpSchemaIssueDiagnost export interface AcpRequestDiagnostics { readonly method?: string; + readonly requestId?: string; readonly operation?: AcpRequestOperation; readonly cause?: unknown; readonly issueCount?: number; @@ -109,6 +113,7 @@ export class AcpProtocolParseError extends Schema.TaggedErrorClass()( @@ -167,6 +185,7 @@ export class AcpRequestError extends Schema.TaggedErrorClass()( errorMessage: Schema.String, data: Schema.optional(Schema.Unknown), method: Schema.optionalKey(Schema.String), + requestId: Schema.optionalKey(Schema.String), operation: Schema.optionalKey(AcpRequestOperation), issueCount: Schema.optionalKey(Schema.Number), issueKinds: Schema.optionalKey(Schema.Array(AcpSchemaIssueKind)), @@ -177,14 +196,59 @@ export class AcpRequestError extends Schema.TaggedErrorClass()( return this.errorMessage; } - static fromProtocolError(error: AcpSchema.Error) { + static fromProtocolError( + error: AcpSchema.Error, + context: { + readonly method: string; + readonly requestId?: string; + readonly cause?: unknown; + }, + ) { return new AcpRequestError({ code: error.code, errorMessage: error.message, ...(error.data !== undefined ? { data: error.data } : {}), + method: context.method, + ...(context.requestId === undefined ? {} : { requestId: context.requestId }), + operation: "receive-response", + cause: context.cause ?? error, }); } + static fromExtensionResponseFailure(method: string, requestId: string, cause: unknown) { + return AcpRequestError.internalError("Extension request failed", undefined, { + method, + requestId, + operation: "receive-response", + cause, + }); + } + + static fromExtensionResponseEncodingError( + method: string, + requestId: string, + cause: AcpProtocolParseError, + ) { + return AcpRequestError.internalError("Internal error", undefined, { + method, + requestId, + operation: "encode-extension-response", + cause, + }); + } + + static unsupportedStreamingResponse(method: string, requestId: string) { + return AcpRequestError.internalError( + "Streaming extension responses are not supported", + undefined, + { + method, + requestId, + operation: "receive-streaming-response", + }, + ); + } + static fromCoreHandlerError(error: AcpError, method: string) { if (error._tag === "AcpRequestError") { return error; diff --git a/packages/effect-acp/src/protocol.test.ts b/packages/effect-acp/src/protocol.test.ts index c8e03dd7235..ece068dfc88 100644 --- a/packages/effect-acp/src/protocol.test.ts +++ b/packages/effect-acp/src/protocol.test.ts @@ -260,15 +260,33 @@ it.layer(NodeServices.layer)("effect-acp protocol", (it) => { const bigintError = yield* transport.notify("x/test", 1n).pipe(Effect.flip); assert.instanceOf(bigintError, AcpError.AcpProtocolParseError); assert.equal(bigintError.operation, "encode-message"); + assert.equal(bigintError.method, "x/test"); assert.instanceOf(bigintError.cause, TypeError); - assert.equal(bigintError.message, "ACP protocol operation 'encode-message' failed."); + assert.equal( + bigintError.message, + "ACP protocol operation 'encode-message' failed for method 'x/test'.", + ); const circular: Record = {}; circular.self = circular; const circularError = yield* transport.notify("x/test", circular).pipe(Effect.flip); assert.instanceOf(circularError, AcpError.AcpProtocolParseError); assert.equal(circularError.operation, "encode-message"); + assert.equal(circularError.method, "x/test"); assert.instanceOf(circularError.cause, TypeError); + + const requestError = yield* transport.request("x/request", 1n).pipe( + Effect.match({ + onFailure: (error) => error, + onSuccess: () => assert.fail("Expected request encoding to fail"), + }), + ); + assert.instanceOf(requestError, AcpError.AcpProtocolParseError); + assert.deepInclude(requestError, { + operation: "encode-message", + method: "x/request", + requestId: "1", + }); }), ); @@ -310,6 +328,60 @@ it.layer(NodeServices.layer)("effect-acp protocol", (it) => { }), ); + it.effect("correlates extension response errors with the originating request", () => + Effect.gen(function* () { + const { stdio, input, output } = yield* makeInMemoryStdio(); + const transport = yield* AcpProtocol.makeAcpPatchedProtocol({ + stdio, + serverRequestMethods: new Set(), + }); + + const response = yield* transport + .request("x/private", { hello: "world" }) + .pipe(Effect.forkScoped); + yield* Queue.take(output); + yield* Queue.offer( + input, + encoder.encode( + `${encodeUnknownJsonString({ + jsonrpc: "2.0", + id: 1, + error: { + _tag: "Cause", + code: -32602, + message: "Invalid params", + data: [ + { + _tag: "Fail", + error: { + code: -32602, + message: "Invalid params", + data: { field: "hello" }, + }, + }, + ], + }, + })}\n`, + ), + ); + + const error = yield* Fiber.join(response).pipe( + Effect.match({ + onFailure: (error) => error, + onSuccess: () => assert.fail("Expected extension request to fail"), + }), + ); + assert.instanceOf(error, AcpError.AcpRequestError); + assert.deepInclude(error, { + code: -32602, + errorMessage: "Invalid params", + method: "x/private", + requestId: "1", + operation: "receive-response", + }); + }), + ); + it.effect("preserves zero-valued ids for inbound core client requests", () => Effect.gen(function* () { const { stdio, input, output } = yield* makeInMemoryStdio(); diff --git a/packages/effect-acp/src/protocol.ts b/packages/effect-acp/src/protocol.ts index 6c3bd399028..27c619296c0 100644 --- a/packages/effect-acp/src/protocol.ts +++ b/packages/effect-acp/src/protocol.ts @@ -66,6 +66,11 @@ export interface AcpPatchedProtocol { readonly notify: (method: string, payload: unknown) => Effect.Effect; } +interface AcpPendingRequest { + readonly deferred: Deferred.Deferred; + readonly method: string; +} + const decodeSessionUpdate = Schema.decodeUnknownEffect(AcpSchema.SessionNotification); const decodeElicitationComplete = Schema.decodeUnknownEffect( AcpSchema.ElicitationCompleteNotification, @@ -83,9 +88,7 @@ export const makeAcpPatchedProtocol = Effect.fn("makeAcpPatchedProtocol")(functi const outgoing = yield* Queue.unbounded>(); const nextRequestId = yield* Ref.make(1n); const terminationHandled = yield* Ref.make(false); - const extPending = yield* Ref.make( - new Map>(), - ); + const extPending = yield* Ref.make(new Map()); const logProtocol = (event: AcpProtocolLogEvent) => { if (event.direction === "incoming" && !options.logIncoming) { @@ -109,13 +112,17 @@ export const makeAcpPatchedProtocol = Effect.fn("makeAcpPatchedProtocol")(functi payload: message, }); + const method = message._tag === "Request" ? message.tag : undefined; + const encodedRequestId = + message._tag === "Request" + ? message.id + : "requestId" in message + ? message.requestId + : undefined; + const requestId = encodedRequestId === "" ? undefined : encodedRequestId; const encoded = yield* Effect.try({ try: () => parser.encode(message), - catch: (cause) => - new AcpError.AcpProtocolParseError({ - operation: "encode-message", - cause, - }), + catch: (cause) => AcpError.AcpProtocolParseError.fromEncodingError(method, requestId, cause), }); if (encoded) { @@ -131,16 +138,16 @@ export const makeAcpPatchedProtocol = Effect.fn("makeAcpPatchedProtocol")(functi const resolveExtPending = ( requestId: string, - onFound: (deferred: Deferred.Deferred) => Effect.Effect, + onFound: (pendingRequest: AcpPendingRequest) => Effect.Effect, ) => Ref.modify(extPending, (pending) => { - const deferred = pending.get(requestId); - if (!deferred) { + const pendingRequest = pending.get(requestId); + if (!pendingRequest) { return [Effect.void, pending] as const; } const next = new Map(pending); next.delete(requestId); - return [onFound(deferred), next] as const; + return [onFound(pendingRequest), next] as const; }).pipe(Effect.flatten); const removeExtPending = (requestId: string) => @@ -154,15 +161,15 @@ export const makeAcpPatchedProtocol = Effect.fn("makeAcpPatchedProtocol")(functi }); const completeExtPendingFailure = (requestId: string, error: AcpError.AcpError) => - resolveExtPending(requestId, (deferred) => Deferred.fail(deferred, error)); + resolveExtPending(requestId, ({ deferred }) => Deferred.fail(deferred, error)); const completeExtPendingSuccess = (requestId: string, value: unknown) => - resolveExtPending(requestId, (deferred) => Deferred.succeed(deferred, value)); + resolveExtPending(requestId, ({ deferred }) => Deferred.succeed(deferred, value)); const failAllExtPending = (error: AcpError.AcpError) => Ref.getAndSet(extPending, new Map()).pipe( Effect.flatMap((pending) => - Effect.forEach([...pending.values()], (deferred) => Deferred.fail(deferred, error), { + Effect.forEach([...pending.values()], ({ deferred }) => Deferred.fail(deferred, error), { discard: true, }), ), @@ -303,7 +310,26 @@ export const makeAcpPatchedProtocol = Effect.fn("makeAcpPatchedProtocol")(functi if (!options.serverRequestMethods.has(message.tag)) { return handleExtRequest(message).pipe( - Effect.catch(() => respondWithError(message.id, AcpError.AcpRequestError.internalError())), + Effect.catchTags({ + AcpProtocolParseError: (error) => + Effect.logWarning(error).pipe( + Effect.annotateLogs({ + method: message.tag, + requestId: message.id, + operation: error.operation, + }), + Effect.andThen( + respondWithError( + message.id, + AcpError.AcpRequestError.fromExtensionResponseEncodingError( + message.tag, + message.id, + error, + ), + ), + ), + ), + }), Effect.asVoid, ); } @@ -314,7 +340,8 @@ export const makeAcpPatchedProtocol = Effect.fn("makeAcpPatchedProtocol")(functi const handleExitEncoded = (message: RpcMessage.ResponseExitEncoded) => Ref.get(extPending).pipe( Effect.flatMap((pending) => { - if (!pending.has(message.requestId)) { + const pendingRequest = pending.get(message.requestId); + if (!pendingRequest) { return Queue.offer(clientQueue, message).pipe(Effect.asVoid); } if (message.exit._tag === "Success") { @@ -324,12 +351,20 @@ export const makeAcpPatchedProtocol = Effect.fn("makeAcpPatchedProtocol")(functi if (failure && isProtocolError(failure.error)) { return completeExtPendingFailure( message.requestId, - AcpError.AcpRequestError.fromProtocolError(failure.error), + AcpError.AcpRequestError.fromProtocolError(failure.error, { + method: pendingRequest.method, + requestId: message.requestId, + cause: message.exit.cause, + }), ); } return completeExtPendingFailure( message.requestId, - AcpError.AcpRequestError.internalError("Extension request failed"), + AcpError.AcpRequestError.fromExtensionResponseFailure( + pendingRequest.method, + message.requestId, + message.exit.cause, + ), ); }), ); @@ -344,16 +379,18 @@ export const makeAcpPatchedProtocol = Effect.fn("makeAcpPatchedProtocol")(functi return handleExitEncoded(message); case "Chunk": return Ref.get(extPending).pipe( - Effect.flatMap((pending) => - pending.has(message.requestId) + Effect.flatMap((pending) => { + const pendingRequest = pending.get(message.requestId); + return pendingRequest ? completeExtPendingFailure( message.requestId, - AcpError.AcpRequestError.internalError( - "Streaming extension responses are not supported", + AcpError.AcpRequestError.unsupportedStreamingResponse( + pendingRequest.method, + message.requestId, ), ) - : Queue.offer(clientQueue, message).pipe(Effect.asVoid), - ), + : Queue.offer(clientQueue, message).pipe(Effect.asVoid); + }), ); case "Defect": case "ClientProtocolError": @@ -401,6 +438,7 @@ export const makeAcpPatchedProtocol = Effect.fn("makeAcpPatchedProtocol")(functi payload: { operation: error.operation, ...(error.method === undefined ? {} : { method: error.method }), + ...(error.requestId === undefined ? {} : { requestId: error.requestId }), ...(error.issueCount === undefined ? {} : { issueCount: error.issueCount }), ...(error.issueKinds === undefined ? {} : { issueKinds: error.issueKinds }), ...(error.maximumPathDepth === undefined @@ -494,18 +532,16 @@ export const makeAcpPatchedProtocol = Effect.fn("makeAcpPatchedProtocol")(functi (current) => [current, current + 1n] as const, ); const deferred = yield* Deferred.make(); - yield* Ref.update(extPending, (pending) => new Map(pending).set(String(requestId), deferred)); + yield* Ref.update(extPending, (pending) => + new Map(pending).set(String(requestId), { deferred, method }), + ); yield* offerOutgoing({ _tag: "Request", id: String(requestId), tag: method, payload, headers: [], - }).pipe( - Effect.catch((error) => - removeExtPending(String(requestId)).pipe(Effect.andThen(Effect.fail(error))), - ), - ); + }).pipe(Effect.tapError(() => removeExtPending(String(requestId)))); return yield* Deferred.await(deferred).pipe( Effect.onInterrupt(() => removeExtPending(String(requestId))), ); diff --git a/packages/effect-codex-app-server/src/errors.ts b/packages/effect-codex-app-server/src/errors.ts index 2f769f47de2..2559bba618c 100644 --- a/packages/effect-codex-app-server/src/errors.ts +++ b/packages/effect-codex-app-server/src/errors.ts @@ -5,6 +5,7 @@ export const CodexAppServerRequestOperation = Schema.Literals([ "decode-payload", "encode-payload", "handle-request", + "receive-response", ]); export type CodexAppServerRequestOperation = typeof CodexAppServerRequestOperation.Type; @@ -83,6 +84,7 @@ const payloadKind = (payload: unknown): CodexAppServerPayloadKind => { export interface CodexAppServerRequestDiagnostics { readonly method?: string; + readonly requestId?: string; readonly operation?: CodexAppServerRequestOperation; readonly cause?: unknown; readonly issueCount?: number; @@ -154,6 +156,7 @@ export class CodexAppServerProtocolParseError extends Schema.TaggedErrorClass { const bigintError = yield* transport.notify("x/test", 1n).pipe(Effect.flip); assert.instanceOf(bigintError, CodexError.CodexAppServerProtocolParseError); assert.equal(bigintError.operation, "encode-wire-message"); + assert.equal(bigintError.method, "x/test"); assert.exists(bigintError.cause); assert.equal( bigintError.message, - "Codex App Server protocol operation 'encode-wire-message' failed.", + "Codex App Server protocol operation 'encode-wire-message' failed for method 'x/test'.", ); const circular: Record = {}; @@ -223,7 +224,57 @@ it.layer(NodeServices.layer)("effect-codex-app-server protocol", (it) => { const circularError = yield* transport.notify("x/test", circular).pipe(Effect.flip); assert.instanceOf(circularError, CodexError.CodexAppServerProtocolParseError); assert.equal(circularError.operation, "encode-wire-message"); + assert.equal(circularError.method, "x/test"); assert.exists(circularError.cause); + + const requestError = yield* transport.request("x/request", 1n).pipe( + Effect.match({ + onFailure: (error) => error, + onSuccess: () => assert.fail("Expected request encoding to fail"), + }), + ); + assert.instanceOf(requestError, CodexError.CodexAppServerProtocolParseError); + assert.deepInclude(requestError, { + operation: "encode-wire-message", + method: "x/request", + requestId: "1", + }); + }), + ); + + it.effect("correlates response errors with the originating request", () => + Effect.gen(function* () { + const { stdio, input, output } = yield* makeInMemoryStdio(); + const transport = yield* CodexProtocol.makeCodexAppServerPatchedProtocol({ stdio }); + + const response = yield* transport.request("thread/start", {}).pipe(Effect.forkScoped); + yield* Queue.take(output); + yield* Queue.offer( + input, + encodeJsonl({ + id: 1, + error: { + code: -32602, + message: "Invalid params", + data: { field: "cwd" }, + }, + }), + ); + + const error = yield* Fiber.join(response).pipe( + Effect.match({ + onFailure: (error) => error, + onSuccess: () => assert.fail("Expected Codex App Server request to fail"), + }), + ); + assert.instanceOf(error, CodexError.CodexAppServerRequestError); + assert.deepInclude(error, { + code: -32602, + errorMessage: "Invalid params", + method: "thread/start", + requestId: "1", + operation: "receive-response", + }); }), ); diff --git a/packages/effect-codex-app-server/src/protocol.ts b/packages/effect-codex-app-server/src/protocol.ts index c0f07f95a5a..fbf173cbc5e 100644 --- a/packages/effect-codex-app-server/src/protocol.ts +++ b/packages/effect-codex-app-server/src/protocol.ts @@ -67,6 +67,11 @@ export interface CodexAppServerPatchedProtocol { ) => Effect.Effect; } +interface CodexAppServerPendingRequest { + readonly deferred: Deferred.Deferred; + readonly method: string; +} + function isObject(value: unknown): value is Record { return typeof value === "object" && value !== null; } @@ -94,9 +99,21 @@ const encodeWireMessage = ( ): Effect.Effect => encodeJsonString(message).pipe( Effect.map((encoded) => `${encoded}\n`), - Effect.mapError((cause) => - CodexError.CodexAppServerProtocolParseError.fromSchemaError("encode-wire-message", cause), - ), + Effect.mapError((cause) => { + const method = typeof message.method === "string" ? message.method : undefined; + const requestId = + typeof message.id === "string" || typeof message.id === "number" + ? String(message.id) + : undefined; + return CodexError.CodexAppServerProtocolParseError.fromSchemaError( + "encode-wire-message", + cause, + { + ...(method === undefined ? {} : { method }), + ...(requestId === undefined ? {} : { requestId }), + }, + ); + }), ); const decodeWireMessage = ( @@ -138,9 +155,7 @@ export const makeCodexAppServerPatchedProtocol = Effect.fn("makeCodexAppServerPa const outgoing = yield* Queue.unbounded>(); const incomingNotifications = yield* Queue.unbounded(); const incomingRequests = yield* Queue.unbounded(); - const pending = yield* Ref.make( - new Map>(), - ); + const pending = yield* Ref.make(new Map()); const nextRequestId = yield* Ref.make(1); const remainder = yield* Ref.make(""); const terminationHandled = yield* Ref.make(false); @@ -161,7 +176,7 @@ export const makeCodexAppServerPatchedProtocol = Effect.fn("makeCodexAppServerPa const failAllPending = (error: CodexError.CodexAppServerError) => Ref.get(pending).pipe( Effect.flatMap((current) => - Effect.forEach([...current.values()], (deferred) => Deferred.fail(deferred, error), { + Effect.forEach([...current.values()], ({ deferred }) => Deferred.fail(deferred, error), { discard: true, }), ), @@ -214,18 +229,16 @@ export const makeCodexAppServerPatchedProtocol = Effect.fn("makeCodexAppServerPa const resolvePending = ( requestId: string, - handler: ( - deferred: Deferred.Deferred, - ) => Effect.Effect, + handler: (pendingRequest: CodexAppServerPendingRequest) => Effect.Effect, ) => Ref.modify(pending, (current) => { - const deferred = current.get(requestId); - if (!deferred) { + const pendingRequest = current.get(requestId); + if (!pendingRequest) { return [Effect.void, current] as const; } const next = new Map(current); next.delete(requestId); - return [handler(deferred), next] as const; + return [handler(pendingRequest), next] as const; }).pipe(Effect.flatten); const respond = (requestId: string | number, result: unknown) => @@ -240,14 +253,20 @@ export const makeCodexAppServerPatchedProtocol = Effect.fn("makeCodexAppServerPa const requestId = String(response.id); const protocolError = response.error; if (protocolError !== undefined) { - return resolvePending(requestId, (deferred) => + return resolvePending(requestId, ({ deferred, method }) => Deferred.fail( deferred, - CodexError.CodexAppServerRequestError.fromProtocolError(protocolError), + CodexError.CodexAppServerRequestError.fromProtocolError( + protocolError, + method, + requestId, + ), ), ); } - return resolvePending(requestId, (deferred) => Deferred.succeed(deferred, response.result)); + return resolvePending(requestId, ({ deferred }) => + Deferred.succeed(deferred, response.result), + ); }; const handleRequest = (request: CodexAppServerIncomingRequest) => @@ -322,6 +341,7 @@ export const makeCodexAppServerPatchedProtocol = Effect.fn("makeCodexAppServerPa payload: { operation: error.operation, ...(error.method === undefined ? {} : { method: error.method }), + ...(error.requestId === undefined ? {} : { requestId: error.requestId }), ...(error.issueCount === undefined ? {} : { issueCount: error.issueCount }), ...(error.issueKinds === undefined ? {} : { issueKinds: error.issueKinds }), ...(error.maximumPathDepth === undefined @@ -375,16 +395,14 @@ export const makeCodexAppServerPatchedProtocol = Effect.fn("makeCodexAppServerPa (current) => [current, current + 1] as const, ); const deferred = yield* Deferred.make(); - yield* Ref.update(pending, (current) => new Map(current).set(String(requestId), deferred)); + yield* Ref.update(pending, (current) => + new Map(current).set(String(requestId), { deferred, method }), + ); yield* offerOutgoing({ id: requestId, method, ...(payload !== undefined ? { params: payload } : {}), - }).pipe( - Effect.catch((error) => - removePending(String(requestId)).pipe(Effect.andThen(Effect.fail(error))), - ), - ); + }).pipe(Effect.tapError(() => removePending(String(requestId)))); return yield* Deferred.await(deferred).pipe( Effect.onInterrupt(() => removePending(String(requestId))), );