From 8cf8c61efb9fd99892bcb250db12d7052b5fef08 Mon Sep 17 00:00:00 2001 From: Oliver Yu <387030+oliy@users.noreply.github.com> Date: Thu, 4 Jun 2026 15:51:01 -0500 Subject: [PATCH] PIPE-550 Add failure reporting to wrangler pipelines (#14174) Co-authored-by: Dario Piotrowicz Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com> --- .changeset/pipelines-failure-reason.md | 9 ++ .../wrangler/src/__tests__/pipelines.test.ts | 115 +++++++++++++++--- packages/wrangler/src/pipelines/cli/get.ts | 9 ++ packages/wrangler/src/pipelines/cli/list.ts | 20 +++ packages/wrangler/src/pipelines/types.ts | 2 + 5 files changed, 141 insertions(+), 14 deletions(-) create mode 100644 .changeset/pipelines-failure-reason.md diff --git a/.changeset/pipelines-failure-reason.md b/.changeset/pipelines-failure-reason.md new file mode 100644 index 0000000000..53a5148587 --- /dev/null +++ b/.changeset/pipelines-failure-reason.md @@ -0,0 +1,9 @@ +--- +"wrangler": minor +--- + +Surface pipeline status and failure reasons in `wrangler pipelines list` and `wrangler pipelines get` + +`wrangler pipelines list` now includes a `Status` column, and when any pipelines are in a `failed` state it prints a summary of each failing pipeline along with the reason reported by the API. + +`wrangler pipelines get` now shows the pipeline `Status` in the general details and, for failed pipelines, highlights the failure with the reason returned by the server so it is clear why a pipeline is not running. diff --git a/packages/wrangler/src/__tests__/pipelines.test.ts b/packages/wrangler/src/__tests__/pipelines.test.ts index a66c3b5249..782ebd3083 100644 --- a/packages/wrangler/src/__tests__/pipelines.test.ts +++ b/packages/wrangler/src/__tests__/pipelines.test.ts @@ -581,13 +581,57 @@ describe("wrangler pipelines", () => { " ⛅️ wrangler x.x.x ────────────────── - ┌─┬─┬─┬─┐ - │ Name │ ID │ Created │ Modified │ - ├─┼─┼─┼─┤ - │ pipeline_one │ pipeline_1 │ 1/1/2024 │ 1/1/2024 │ - ├─┼─┼─┼─┤ - │ pipeline_two │ pipeline_2 │ 1/2/2024 │ 1/2/2024 │ - └─┴─┴─┴─┘" + ┌─┬─┬─┬─┬─┐ + │ Name │ ID │ Created │ Modified │ Status │ + ├─┼─┼─┼─┼─┤ + │ pipeline_one │ pipeline_1 │ 1/1/2024 │ 1/1/2024 │ active │ + ├─┼─┼─┼─┼─┤ + │ pipeline_two │ pipeline_2 │ 1/2/2024 │ 1/2/2024 │ active │ + └─┴─┴─┴─┴─┘" + `); + }); + + it("should surface failed pipelines in the list", async ({ expect }) => { + const mockPipelines: Pipeline[] = [ + { + id: "pipeline_1", + name: "healthy_pipeline", + sql: "INSERT INTO sink1 SELECT * FROM stream1;", + status: "running", + created_at: "2024-01-01T00:00:00Z", + modified_at: "2024-01-01T00:00:00Z", + }, + { + id: "pipeline_2", + name: "broken_pipeline", + sql: "INSERT INTO sink2 SELECT * FROM stream2;", + status: "failed", + failure_reason: "Sink bucket 'my-bucket' does not exist", + created_at: "2024-01-02T00:00:00Z", + modified_at: "2024-01-02T00:00:00Z", + }, + ]; + + mockListPipelinesRequest(expect, mockPipelines); + + await runWrangler("pipelines list"); + + expect(std.err).toMatchInlineSnapshot(`""`); + expect(std.out).toMatchInlineSnapshot(` + " + ⛅️ wrangler x.x.x + ────────────────── + ┌─┬─┬─┬─┬─┐ + │ Name │ ID │ Created │ Modified │ Status │ + ├─┼─┼─┼─┼─┤ + │ healthy_pipeline │ pipeline_1 │ 1/1/2024 │ 1/1/2024 │ running │ + ├─┼─┼─┼─┼─┤ + │ broken_pipeline │ pipeline_2 │ 1/2/2024 │ 1/2/2024 │ failed │ + └─┴─┴─┴─┴─┘ + + 1 pipeline is in a failed state. Run 'wrangler pipelines get ' for details: + X broken_pipeline: Sink bucket 'my-bucket' does not exist + " `); }); @@ -680,13 +724,13 @@ describe("wrangler pipelines", () => { " ⛅️ wrangler x.x.x ────────────────── - ┌─┬─┬─┬─┬─┐ - │ Name │ ID │ Created │ Modified │ Type │ - ├─┼─┼─┼─┼─┤ - │ new_pipeline │ pipeline_1 │ 1/1/2024 │ 1/1/2024 │ │ - ├─┼─┼─┼─┼─┤ - │ legacy_pipeline │ legacy_123 │ N/A │ N/A │ Legacy │ - └─┴─┴─┴─┴─┘" + ┌─┬─┬─┬─┬─┬─┐ + │ Name │ ID │ Created │ Modified │ Status │ Type │ + ├─┼─┼─┼─┼─┼─┤ + │ new_pipeline │ pipeline_1 │ 1/1/2024 │ 1/1/2024 │ active │ │ + ├─┼─┼─┼─┼─┼─┤ + │ legacy_pipeline │ legacy_123 │ N/A │ N/A │ N/A │ Legacy │ + └─┴─┴─┴─┴─┴─┘" `); }); @@ -788,6 +832,7 @@ describe("wrangler pipelines", () => { General: ID: pipeline_123 Name: my_pipeline + Status: active Created At: 1/1/2024, 12:00:00 AM Modified At: 1/1/2024, 12:00:00 AM @@ -810,6 +855,48 @@ describe("wrangler pipelines", () => { `); }); + it("highlights failure reason for a failed pipeline", async ({ + expect, + }) => { + const mockPipeline: Pipeline = { + id: "pipeline_123", + name: "my_pipeline", + sql: "INSERT INTO test_sink SELECT * FROM test_stream;", + status: "failed", + failure_reason: + "Schema validation failed: column 'id' expected int64 but got string", + created_at: "2024-01-01T00:00:00Z", + modified_at: "2024-01-01T00:00:00Z", + }; + + const getRequest = mockGetPipelineRequest("pipeline_123", mockPipeline); + + await runWrangler("pipelines get pipeline_123"); + + expect(getRequest.count).toBe(1); + expect(std.err).toMatchInlineSnapshot(`""`); + expect(std.out).toMatchInlineSnapshot(` + " + ⛅️ wrangler x.x.x + ────────────────── + General: + ID: pipeline_123 + Name: my_pipeline + Status: failed + Created At: 1/1/2024, 12:00:00 AM + Modified At: 1/1/2024, 12:00:00 AM + + X This pipeline is in a failed state. + Reason: Schema validation failed: column 'id' expected int64 but got string + + Pipeline SQL: + INSERT INTO test_sink SELECT * FROM test_stream; + + Connected Streams: None + Connected Sinks: None" + `); + }); + it("resolves pipeline by name", async ({ expect }) => { const mockPipeline: Pipeline = { id: "pipeline_123", diff --git a/packages/wrangler/src/pipelines/cli/get.ts b/packages/wrangler/src/pipelines/cli/get.ts index 48fd89d680..72e53ae3b9 100644 --- a/packages/wrangler/src/pipelines/cli/get.ts +++ b/packages/wrangler/src/pipelines/cli/get.ts @@ -1,3 +1,4 @@ +import { bold, red } from "@cloudflare/cli-shared-helpers/colors"; import { UserError } from "@cloudflare/workers-utils"; import { createCommand } from "../../core/create-command"; import { logger } from "../../logger"; @@ -59,6 +60,7 @@ export const pipelinesGetCommand = createCommand({ const general: Record = { ID: pipeline.id, Name: pipeline.name, + Status: pipeline.status, "Created At": new Date(pipeline.created_at).toLocaleString(), "Modified At": new Date(pipeline.modified_at).toLocaleString(), }; @@ -66,6 +68,13 @@ export const pipelinesGetCommand = createCommand({ logger.log("General:"); logger.log(formatLabelledValues(general, { indentationCount: 2 })); + if (pipeline.status === "failed") { + logger.log(`\n${bold(red("✘ This pipeline is in a failed state."))}`); + logger.log( + red(` Reason: ${pipeline.failure_reason ?? "Unknown failure"}`) + ); + } + logger.log("\nPipeline SQL:"); logger.log(pipeline.sql); diff --git a/packages/wrangler/src/pipelines/cli/list.ts b/packages/wrangler/src/pipelines/cli/list.ts index 68467522b8..5a0180c9ff 100644 --- a/packages/wrangler/src/pipelines/cli/list.ts +++ b/packages/wrangler/src/pipelines/cli/list.ts @@ -1,3 +1,4 @@ +import { red } from "@cloudflare/cli-shared-helpers/colors"; import { createCommand } from "../../core/create-command"; import { logger } from "../../logger"; import { requireAuth } from "../../user"; @@ -68,6 +69,7 @@ export const pipelinesListCommand = createCommand({ ID: pipeline.id, Created: new Date(pipeline.created_at).toLocaleDateString(), Modified: new Date(pipeline.modified_at).toLocaleDateString(), + Status: pipeline.status, Type: "", })), ...legacyPipelines.map((pipeline) => ({ @@ -75,6 +77,7 @@ export const pipelinesListCommand = createCommand({ ID: pipeline.id, Created: "N/A", Modified: "N/A", + Status: "N/A", Type: "Legacy", })), ]; @@ -85,10 +88,27 @@ export const pipelinesListCommand = createCommand({ ID: pipeline.id, Created: new Date(pipeline.created_at).toLocaleDateString(), Modified: new Date(pipeline.modified_at).toLocaleDateString(), + Status: pipeline.status, })); logger.table(tableData); } + const failedPipelines = (newPipelines || []).filter( + (pipeline) => pipeline.status === "failed" + ); + if (failedPipelines.length > 0) { + logger.log( + `\n${failedPipelines.length} pipeline${failedPipelines.length === 1 ? " is" : "s are"} in a failed state. Run 'wrangler pipelines get ' for details:` + ); + for (const pipeline of failedPipelines) { + logger.log( + red( + ` ✘ ${pipeline.name}: ${pipeline.failure_reason ?? "Unknown failure"}\n` + ) + ); + } + } + if (hasLegacyPipelines) { logger.warn( "⚠️ You have legacy pipelines. Consider creating new pipelines by running 'wrangler pipelines setup'." diff --git a/packages/wrangler/src/pipelines/types.ts b/packages/wrangler/src/pipelines/types.ts index 8f525706f8..e4c2ec1ca3 100644 --- a/packages/wrangler/src/pipelines/types.ts +++ b/packages/wrangler/src/pipelines/types.ts @@ -14,6 +14,8 @@ export interface Pipeline { modified_at: string; sql: string; status: string; + /** Free-form server message describing why the pipeline failed. Only set when `status` is "failed". */ + failure_reason?: string; tables?: PipelineTable[]; }