Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/pipelines-failure-reason.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"wrangler": minor
---

Surface pipeline status and failure reasons in `wrangler pipelines list` and `wrangler pipelines get`

`wrangler pipelines list` now includes a `Status` column, and when any pipelines are in a `failed` state it prints a summary of each failing pipeline along with the reason reported by the API.

`wrangler pipelines get` now shows the pipeline `Status` in the general details and, for failed pipelines, highlights the failure with the reason returned by the server so it is clear why a pipeline is not running.
115 changes: 101 additions & 14 deletions packages/wrangler/src/__tests__/pipelines.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -581,13 +581,57 @@ describe("wrangler pipelines", () => {
"
⛅️ wrangler x.x.x
──────────────────
┌─┬─┬─┬─┐
│ Name │ ID │ Created │ Modified │
├─┼─┼─┼─┤
│ pipeline_one │ pipeline_1 │ 1/1/2024 │ 1/1/2024 │
├─┼─┼─┼─┤
│ pipeline_two │ pipeline_2 │ 1/2/2024 │ 1/2/2024 │
└─┴─┴─┴─┘"
┌─┬─┬─┬─┬─┐
│ Name │ ID │ Created │ Modified │ Status │
├─┼─┼─┼─┼─┤
│ pipeline_one │ pipeline_1 │ 1/1/2024 │ 1/1/2024 │ active │
├─┼─┼─┼─┼─┤
│ pipeline_two │ pipeline_2 │ 1/2/2024 │ 1/2/2024 │ active │
└─┴─┴─┴─┴─┘"
`);
});

it("should surface failed pipelines in the list", async ({ expect }) => {
const mockPipelines: Pipeline[] = [
{
id: "pipeline_1",
name: "healthy_pipeline",
sql: "INSERT INTO sink1 SELECT * FROM stream1;",
status: "running",
created_at: "2024-01-01T00:00:00Z",
modified_at: "2024-01-01T00:00:00Z",
},
{
id: "pipeline_2",
name: "broken_pipeline",
sql: "INSERT INTO sink2 SELECT * FROM stream2;",
status: "failed",
failure_reason: "Sink bucket 'my-bucket' does not exist",
created_at: "2024-01-02T00:00:00Z",
modified_at: "2024-01-02T00:00:00Z",
},
];

mockListPipelinesRequest(expect, mockPipelines);

await runWrangler("pipelines list");

expect(std.err).toMatchInlineSnapshot(`""`);
expect(std.out).toMatchInlineSnapshot(`
"
⛅️ wrangler x.x.x
──────────────────
┌─┬─┬─┬─┬─┐
│ Name │ ID │ Created │ Modified │ Status │
├─┼─┼─┼─┼─┤
│ healthy_pipeline │ pipeline_1 │ 1/1/2024 │ 1/1/2024 │ running │
├─┼─┼─┼─┼─┤
│ broken_pipeline │ pipeline_2 │ 1/2/2024 │ 1/2/2024 │ failed │
└─┴─┴─┴─┴─┘

1 pipeline is in a failed state. Run 'wrangler pipelines get <pipeline>' for details:
X broken_pipeline: Sink bucket 'my-bucket' does not exist
"
`);
});

Expand Down Expand Up @@ -680,13 +724,13 @@ describe("wrangler pipelines", () => {
"
⛅️ wrangler x.x.x
──────────────────
┌─┬─┬─┬─┬─┐
│ Name │ ID │ Created │ Modified │ Type │
├─┼─┼─┼─┼─┤
│ new_pipeline │ pipeline_1 │ 1/1/2024 │ 1/1/2024 │ │
├─┼─┼─┼─┼─┤
│ legacy_pipeline │ legacy_123 │ N/A │ N/A │ Legacy │
└─┴─┴─┴─┴─┘"
┌─┬─┬─┬─┬─┬─
│ Name │ ID │ Created │ Modified │ Status │ Type │
├─┼─┼─┼─┼─┼─
│ new_pipeline │ pipeline_1 │ 1/1/2024 │ 1/1/2024 │ active │
├─┼─┼─┼─┼─┼─
│ legacy_pipeline │ legacy_123 │ N/A │ N/A │ N/A │ Legacy │
└─┴─┴─┴─┴─┴─┘"
`);
});

Expand Down Expand Up @@ -788,6 +832,7 @@ describe("wrangler pipelines", () => {
General:
ID: pipeline_123
Name: my_pipeline
Status: active
Created At: 1/1/2024, 12:00:00 AM
Modified At: 1/1/2024, 12:00:00 AM

Expand All @@ -810,6 +855,48 @@ describe("wrangler pipelines", () => {
`);
});

it("highlights failure reason for a failed pipeline", async ({
expect,
}) => {
const mockPipeline: Pipeline = {
id: "pipeline_123",
name: "my_pipeline",
sql: "INSERT INTO test_sink SELECT * FROM test_stream;",
status: "failed",
failure_reason:
"Schema validation failed: column 'id' expected int64 but got string",
created_at: "2024-01-01T00:00:00Z",
modified_at: "2024-01-01T00:00:00Z",
};

const getRequest = mockGetPipelineRequest("pipeline_123", mockPipeline);

await runWrangler("pipelines get pipeline_123");

expect(getRequest.count).toBe(1);
expect(std.err).toMatchInlineSnapshot(`""`);
expect(std.out).toMatchInlineSnapshot(`
"
⛅️ wrangler x.x.x
──────────────────
General:
ID: pipeline_123
Name: my_pipeline
Status: failed
Created At: 1/1/2024, 12:00:00 AM
Modified At: 1/1/2024, 12:00:00 AM

X This pipeline is in a failed state.
Reason: Schema validation failed: column 'id' expected int64 but got string

Pipeline SQL:
INSERT INTO test_sink SELECT * FROM test_stream;

Connected Streams: None
Connected Sinks: None"
`);
});

it("resolves pipeline by name", async ({ expect }) => {
const mockPipeline: Pipeline = {
id: "pipeline_123",
Expand Down
9 changes: 9 additions & 0 deletions packages/wrangler/src/pipelines/cli/get.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { bold, red } from "@cloudflare/cli-shared-helpers/colors";
import { UserError } from "@cloudflare/workers-utils";
import { createCommand } from "../../core/create-command";
import { logger } from "../../logger";
Expand Down Expand Up @@ -59,13 +60,21 @@ export const pipelinesGetCommand = createCommand({
const general: Record<string, string> = {
ID: pipeline.id,
Name: pipeline.name,
Status: pipeline.status,
"Created At": new Date(pipeline.created_at).toLocaleString(),
"Modified At": new Date(pipeline.modified_at).toLocaleString(),
};

logger.log("General:");
logger.log(formatLabelledValues(general, { indentationCount: 2 }));

if (pipeline.status === "failed") {
logger.log(`\n${bold(red("✘ This pipeline is in a failed state."))}`);
logger.log(
red(` Reason: ${pipeline.failure_reason ?? "Unknown failure"}`)
);
}

logger.log("\nPipeline SQL:");
logger.log(pipeline.sql);

Expand Down
20 changes: 20 additions & 0 deletions packages/wrangler/src/pipelines/cli/list.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { red } from "@cloudflare/cli-shared-helpers/colors";
import { createCommand } from "../../core/create-command";
import { logger } from "../../logger";
import { requireAuth } from "../../user";
Expand Down Expand Up @@ -68,13 +69,15 @@ export const pipelinesListCommand = createCommand({
ID: pipeline.id,
Created: new Date(pipeline.created_at).toLocaleDateString(),
Modified: new Date(pipeline.modified_at).toLocaleDateString(),
Status: pipeline.status,
Type: "",
})),
...legacyPipelines.map((pipeline) => ({
Name: pipeline.name,
ID: pipeline.id,
Created: "N/A",
Modified: "N/A",
Status: "N/A",
Type: "Legacy",
})),
];
Expand All @@ -85,10 +88,27 @@ export const pipelinesListCommand = createCommand({
ID: pipeline.id,
Created: new Date(pipeline.created_at).toLocaleDateString(),
Modified: new Date(pipeline.modified_at).toLocaleDateString(),
Status: pipeline.status,
}));
logger.table(tableData);
}

const failedPipelines = (newPipelines || []).filter(
(pipeline) => pipeline.status === "failed"
);
if (failedPipelines.length > 0) {
logger.log(
`\n${failedPipelines.length} pipeline${failedPipelines.length === 1 ? " is" : "s are"} in a failed state. Run 'wrangler pipelines get <pipeline>' for details:`
);
for (const pipeline of failedPipelines) {
logger.log(
red(
` ✘ ${pipeline.name}: ${pipeline.failure_reason ?? "Unknown failure"}\n`
)
);
}
}

if (hasLegacyPipelines) {
logger.warn(
"⚠️ You have legacy pipelines. Consider creating new pipelines by running 'wrangler pipelines setup'."
Expand Down
2 changes: 2 additions & 0 deletions packages/wrangler/src/pipelines/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export interface Pipeline {
modified_at: string;
sql: string;
status: string;
/** Free-form server message describing why the pipeline failed. Only set when `status` is "failed". */
failure_reason?: string;
tables?: PipelineTable[];
}

Expand Down
Loading