diff --git a/CHANGELOG.md b/CHANGELOG.md index b53ae188..8aaebaff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,14 @@ Pre-1.0 note: while `pg_durable` is in major version `0`, minor releases may inc ## [0.2.4] - Unreleased +### Added + +- **Skipped node terminal status:** downstream nodes that are not executed after an upstream node failure are now marked `skipped` instead of remaining indefinitely `pending` (#240). + +### Changed + +- **Failure observability:** on node-level workflow failure, pg_durable now performs a terminal reconciliation pass to mark remaining `pending` nodes as `skipped` when supported by the installed schema. + ## [0.2.3] - 2026-06-17 Provider-line note: v0.2.3 stays in the `duroxide-pg` provider compatibility line started in v0.2.2, so the upgrade source is v0.2.2 (`sql/pg_durable--0.2.2--0.2.3.sql`). diff --git a/USER_GUIDE.md b/USER_GUIDE.md index 5a86fade..de6a697c 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -1300,6 +1300,7 @@ SQL |=> 'a': SELECT 1 | `✗ Failed` | Node encountered an error | | `⏳ Running` | Node currently executing | | `○ Pending` | Node waiting to execute | +| `⊘ Skipped` | Node was not executed because an upstream node failed | ### Visualizing Complex Structures diff --git a/docs/issue-240-skipped-status-plan.md b/docs/issue-240-skipped-status-plan.md new file mode 100644 index 00000000..e09a5b69 --- /dev/null +++ b/docs/issue-240-skipped-status-plan.md @@ -0,0 +1,168 @@ +# Issue 240 Design and Implementation Plan + +## Summary + +Issue 240 requests a clear way to distinguish nodes that were never executed because the workflow already failed. Today those nodes remain in `pending`, which is ambiguous. + +Proposed enhancement: +- Add a terminal node status: `skipped`. +- On workflow failure, convert remaining `pending` nodes to `skipped` when the failure came from node execution (that is, at least one node is `failed`). + +This keeps the existing instance-level status model unchanged (`df.instances.status` still ends as `failed`) while making node-level outcomes explicit. + +## Current Behavior (Observed) + +Live repro on local pg instance: +- Step 1 SQL node: `completed` +- Step 2 SQL node (intentional error): `failed` +- Step 3 SQL node (never executed): `pending` +- Instance: `failed` + +This is the ambiguity reported in issue 240. + +## Goals + +- Make unexecuted downstream nodes observable as `skipped` after terminal workflow failure. +- Preserve backward compatibility of the new binary against old schemas. +- Keep implementation minimal and low-risk (no new tables or public function signatures). + +## Non-Goals + +- No change to `df.instances.status` vocabulary. +- No new monitoring projection table in this iteration. +- No attempt to classify every failure mode as producing `skipped` (for example, pre-execution policy rejection may remain as-is). + +## Proposed Design + +### 1. Schema: add `skipped` to allowed node statuses + +Update node status check constraints so `skipped` is valid: +- Install DDL in `src/lib.rs`: + - `nodes_status_chk`: include `skipped`. +- Upgrade DDL in next upgrade script: + - drop and recreate `nodes_status_chk` (or equivalent alteration) to include `skipped`. + +`nodes_result_status_chk` can remain unchanged because `skipped` nodes should not carry `result`. + +### 2. Runtime: mark pending nodes as skipped at terminal failure + +Add an activity that performs one set-based update for a single instance: + +- New activity (suggested): `mark_pending_nodes_skipped`. +- SQL behavior: + - `UPDATE df.nodes` + - `SET status = 'skipped', updated_at = now()` + - `WHERE instance_id = $1 AND status = 'pending'` + - guarded by `EXISTS (SELECT 1 FROM df.nodes WHERE instance_id = $1 AND status = 'failed')` + +Guard rationale: +- Avoid changing semantics for failures that occur before any node execution (for example, instance-level rejection paths that currently do not mark node failures). +- Keep behavior aligned with issue wording: downstream steps skipped due to an earlier step failure. + +### 3. Orchestration integration point + +In `execute_function_graph` top-level failure path (when instance is being moved to `failed`): +- After node failure is recorded and before/after instance status update, schedule the new activity once for that instance. +- Make the update idempotent and best-effort (safe if retried). + +Why this placement: +- Central place where terminal failure is decided. +- Avoids needing per-node graph traversal logic. +- Handles linear and composite graphs (`THEN`, `IF`, `JOIN`, `RACE`, `LOOP`) uniformly. + +### 4. Optional hardening in `update_node_status` + +No required behavior change, but add a small guard in plan review: +- Keep allowing transitions to `completed` / `failed` as today. +- Ensure no code path writes result for `skipped`. + +## Backward Compatibility and Upgrade Strategy + +### Binary backward compatibility (B1) + +New `.so` may run against an older schema where `nodes_status_chk` does not include `skipped`. +If runtime writes `skipped` in that state, updates would fail. + +Plan: +- Runtime schema detection for `skipped` support before attempting the bulk update. +- If unsupported, no-op and keep legacy behavior (`pending`). + +Implementation options: +- Option A (preferred): activity checks `pg_constraint` definition for `nodes_status_chk` containing `skipped`. +- Option B: attempt update inside savepoint-like handling and ignore check-constraint violation. + +Option A is clearer and avoids noisy errors. + +### Schema upgrade (A/B2) + +- Create next upgrade script `sql/pg_durable--0.2.2--0.2.3.sql` (version number illustrative; use actual next version). +- Add DDL to update `nodes_status_chk` to include `skipped`. +- Ensure fresh-install schema (from current `src/lib.rs` extension SQL) matches upgraded schema. + +## Test Plan + +### Unit / Rust-level + +- Activity test: when instance has a failed node plus pending nodes, only pending nodes become `skipped`. +- Activity test: when no failed node exists, no rows are changed. +- Compatibility test hook: when schema does not support `skipped`, activity no-ops without error. + +### E2E SQL + +Add a new E2E SQL test (for example `tests/e2e/sql/49_failed_downstream_nodes_skipped.sql`): +- Build a 3-step sequence where step 2 fails. +- Wait for terminal instance status `failed`. +- Assert: + - step 1 node is `completed` + - step 2 node is `failed` + - step 3 node is `skipped` (not `pending`) +- Include clear failure messages. + +Also verify an instance-level failure path with no node failure (if represented in existing tests) does not force all nodes to `skipped`. + +### Upgrade tests + +Run: +- `./scripts/test-upgrade.sh` + +Focus expectations: +- Scenario A: fresh install vs upgraded schema parity for `nodes_status_chk`. +- Scenario B1: new `.so` still works against old schema; `skipped` behavior degrades safely to legacy (`pending`) until upgrade. +- Scenario B2: data remains accessible post-upgrade. + +## Docs Plan + +Update user-facing status vocabulary references: +- `USER_GUIDE.md` (node status semantics) +- `docs/api-reference.md` if status values are documented there +- Optional release note entry in `CHANGELOG.md` + +## Rollout and Risk + +Risks: +- Writing `skipped` against non-upgraded schema (mitigated by runtime check). +- Unexpected interactions with in-flight parallel constructs (mitigated by set-based terminal update and idempotence). + +Rollout: +1. Land schema + runtime + tests in one PR. +2. Validate full local matrix: fmt, clippy, unit, e2e, upgrade. +3. Document behavior change as node-level observability enhancement. + +## Acceptance Criteria + +- Failed workflow with downstream unexecuted nodes shows `skipped` for those nodes (post-upgrade schema). +- No regression to existing instance terminal statuses. +- New binary remains functional against pre-upgrade schema. +- E2E and upgrade tests pass. + +## Implementation Checklist + +- [ ] Add `skipped` to node status check constraint in install DDL (`src/lib.rs`). +- [ ] Add upgrade script change for `nodes_status_chk`. +- [ ] Add new activity to mark pending nodes as skipped for failed instances. +- [ ] Register activity in `src/registry.rs`. +- [ ] Call activity from orchestration failure path. +- [ ] Add schema-compatibility guard for pre-upgrade schemas. +- [ ] Add E2E SQL coverage. +- [ ] Update docs and changelog notes. +- [ ] Run fmt, clippy, unit, e2e, upgrade tests. diff --git a/docs/upgrade-testing.md b/docs/upgrade-testing.md index 51059edd..825e9134 100644 --- a/docs/upgrade-testing.md +++ b/docs/upgrade-testing.md @@ -203,6 +203,14 @@ gate, so they never need to be added to the exclude list. Each schema-changing PR should add a section here documenting what changed, what the upgrade script handles, and any backward compatibility considerations. +### v0.2.3 → v0.2.4 + +#### #240 node-level `skipped` status for downstream unexecuted steps +- **DDL change:** `df.nodes.status` constraint (`nodes_status_chk`) now allows `skipped` in addition to `pending`, `running`, `completed`, and `failed`. Upgrade script: `sql/pg_durable--0.2.3--0.2.4.sql`. +- **Scenario A considerations:** fresh-install and upgraded schemas must agree on the `nodes_status_chk` status vocabulary including `skipped`. +- **Scenario B1 considerations:** the new `.so` must still run against pre-0.2.4 schemas where `nodes_status_chk` does not allow `skipped`. Runtime logic therefore detects schema support first and no-ops (retains legacy `pending` behavior) when unsupported. +- **Scenario B2 considerations:** no data migration required. Existing rows are preserved; only new terminal reconciliation on failed runs can mark unexecuted nodes as `skipped` post-upgrade. + ### v0.2.2 → v0.2.3 #### Rename duroxide provider schema to `_duroxide` for fresh installs diff --git a/src/activities/mark_pending_nodes_skipped.rs b/src/activities/mark_pending_nodes_skipped.rs new file mode 100644 index 00000000..7bc10749 --- /dev/null +++ b/src/activities/mark_pending_nodes_skipped.rs @@ -0,0 +1,77 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the PostgreSQL License. + +//! MarkPendingNodesSkipped activity - marks unexecuted nodes as skipped +//! after a node-level failure. + +use duroxide::ActivityContext; +use sqlx::PgPool; +use std::sync::Arc; + +/// Activity name for registration and scheduling +pub const NAME: &str = "pg_durable::activity::mark-pending-nodes-skipped"; + +/// Mark pending nodes as skipped for a failed instance. +/// +/// Behavior: +/// - No-op on schemas that do not support 'skipped' in nodes_status_chk. +/// - No-op unless the instance has at least one failed node. +/// - Updates only nodes still in 'pending'. +pub async fn execute( + ctx: ActivityContext, + pool: Arc, + input_json: String, +) -> Result { + let input: serde_json::Value = serde_json::from_str(&input_json) + .map_err(|e| format!("Failed to parse skipped-status input: {e}"))?; + + let instance_id = input["instance_id"].as_str().ok_or("Missing instance_id")?; + + let skipped_supported: bool = sqlx::query_scalar( + "SELECT COALESCE( + ( + SELECT pg_catalog.pg_get_constraintdef(c.oid) + FROM pg_catalog.pg_constraint c + JOIN pg_catalog.pg_class t ON t.oid = c.conrelid + JOIN pg_catalog.pg_namespace n ON n.oid = t.relnamespace + WHERE n.nspname = 'df' + AND t.relname = 'nodes' + AND c.conname = 'nodes_status_chk' + LIMIT 1 + ) LIKE '%''skipped''%', + false + )", + ) + .fetch_one(pool.as_ref()) + .await + .map_err(|e| format!("Failed to detect skipped status support: {e}"))?; + + if !skipped_supported { + ctx.trace_info(format!( + "Schema does not support node status 'skipped'; leaving pending nodes unchanged for instance {instance_id}" + )); + return Ok("Skipped status unsupported on schema; no-op".to_string()); + } + + let rows_affected = sqlx::query( + "UPDATE df.nodes n + SET status = 'skipped', updated_at = now() + WHERE n.instance_id = $1 + AND n.status = 'pending' + AND EXISTS ( + SELECT 1 + FROM df.nodes f + WHERE f.instance_id = $1 + AND f.status = 'failed' + )", + ) + .bind(instance_id) + .execute(pool.as_ref()) + .await + .map_err(|e| format!("Failed to mark pending nodes as skipped: {e}"))? + .rows_affected(); + + let msg = format!("Marked {rows_affected} pending nodes as skipped for instance {instance_id}"); + ctx.trace_info(&msg); + Ok(msg) +} diff --git a/src/activities/mod.rs b/src/activities/mod.rs index 8d619bad..f9f8a4d6 100644 --- a/src/activities/mod.rs +++ b/src/activities/mod.rs @@ -9,5 +9,6 @@ pub mod execute_http; pub mod execute_sql; pub mod load_function_graph; +pub mod mark_pending_nodes_skipped; pub mod update_instance_status; pub mod update_node_status; diff --git a/src/lib.rs b/src/lib.rs index 681b88d4..980c5821 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -248,7 +248,7 @@ ALTER TABLE df.nodes ADD CONSTRAINT nodes_result_name_chk CHECK (result_name IS NULL OR result_name OPERATOR(pg_catalog.~) '^[A-Za-z_][A-Za-z0-9_]*$') NOT VALID, ADD CONSTRAINT nodes_status_chk - CHECK (status OPERATOR(pg_catalog.=) ANY (ARRAY['pending', 'running', 'completed', 'failed'])) NOT VALID, + CHECK (status OPERATOR(pg_catalog.=) ANY (ARRAY['pending', 'running', 'completed', 'failed', 'skipped'])) NOT VALID, ADD CONSTRAINT nodes_result_status_chk CHECK (result IS NULL OR status OPERATOR(pg_catalog.=) ANY (ARRAY['completed', 'failed'])) NOT VALID, ADD CONSTRAINT nodes_structure_chk diff --git a/src/orchestrations/execute_function_graph.rs b/src/orchestrations/execute_function_graph.rs index fd177227..ed637c3f 100644 --- a/src/orchestrations/execute_function_graph.rs +++ b/src/orchestrations/execute_function_graph.rs @@ -216,8 +216,28 @@ pub async fn execute(ctx: OrchestrationContext, input_json: String) -> Result { ctx.trace_info(format!("Function failed with error: {err}")); + let instance_id = input.instance_id.clone(); + + // If this was a node-level failure, mark any remaining pending nodes + // as skipped for clearer terminal observability. + let skipped_input = serde_json::json!({ + "instance_id": instance_id, + }); + if let Err(e) = ctx + .schedule_activity( + activities::mark_pending_nodes_skipped::NAME, + skipped_input.to_string(), + ) + .await + { + ctx.trace_info(format!( + "Failed to mark pending nodes as skipped for instance {}: {}", + input.instance_id, e + )); + } + let status_input = serde_json::json!({ - "instance_id": input.instance_id, + "instance_id": instance_id, "status": "failed" }); let _ = ctx diff --git a/src/registry.rs b/src/registry.rs index 03e5c73d..2925b8b5 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -18,6 +18,7 @@ pub fn create_activity_registry(pool: Arc, semaphore: Arc) -> let graph_pool = pool.clone(); let status_pool = pool.clone(); let node_status_pool = pool.clone(); + let skipped_pool = pool.clone(); let http_pool = pool.clone(); ActivityRegistry::builder() @@ -37,6 +38,10 @@ pub fn create_activity_registry(pool: Arc, semaphore: Arc) -> let pool = node_status_pool.clone(); async move { activities::update_node_status::execute(ctx, pool, input_json).await } }) + .register(activities::mark_pending_nodes_skipped::NAME, move |ctx: ActivityContext, input_json: String| { + let pool = skipped_pool.clone(); + async move { activities::mark_pending_nodes_skipped::execute(ctx, pool, input_json).await } + }) .register(activities::execute_http::NAME, move |ctx: ActivityContext, config_json: String| { let pool = http_pool.clone(); async move { activities::execute_http::execute(ctx, pool, config_json).await } diff --git a/tests/e2e/sql/49_failed_downstream_nodes_skipped.sql b/tests/e2e/sql/49_failed_downstream_nodes_skipped.sql new file mode 100644 index 00000000..3b0ee899 --- /dev/null +++ b/tests/e2e/sql/49_failed_downstream_nodes_skipped.sql @@ -0,0 +1,114 @@ +-- Copyright (c) Microsoft Corporation. +-- Licensed under the PostgreSQL License. + +-- E2E Test: downstream nodes are marked 'skipped' after node-level failure. + +SET SESSION AUTHORIZATION df_e2e_user; + +-- ============================================================================ +-- Test 1: THEN chain failure marks downstream SQL node as skipped +-- ============================================================================ + +CREATE TEMP TABLE _test_skip1 (instance_id TEXT); + +INSERT INTO _test_skip1 +SELECT df.start( + df.sql($$SELECT 'skip-test-step1'::text$$) + ~> df.sql($$SELECT 1/0$$) + ~> df.sql($$SELECT 'skip-test-step3'::text$$), + 'test-failed-downstream-skipped-seq' +); + +DO $$ +DECLARE + inst_id TEXT; + wf_status TEXT; + step1_status TEXT; + fail_status TEXT; + step3_status TEXT; +BEGIN + SELECT instance_id INTO inst_id FROM _test_skip1; + SELECT df.wait_for_completion(inst_id) INTO wf_status; + + IF lower(wf_status) != 'failed' THEN + RAISE EXCEPTION 'TEST FAILED [skipped-seq]: expected failed, got %', wf_status; + END IF; + + SELECT status INTO step1_status + FROM df.nodes + WHERE instance_id = inst_id AND node_type = 'SQL' AND query = 'SELECT ''skip-test-step1''::text' + LIMIT 1; + + IF step1_status != 'completed' THEN + RAISE EXCEPTION 'TEST FAILED [skipped-seq]: step1 expected completed, got %', step1_status; + END IF; + + SELECT status INTO fail_status + FROM df.nodes + WHERE instance_id = inst_id AND node_type = 'SQL' AND query = 'SELECT 1/0' + LIMIT 1; + + IF fail_status != 'failed' THEN + RAISE EXCEPTION 'TEST FAILED [skipped-seq]: failing step expected failed, got %', fail_status; + END IF; + + SELECT status INTO step3_status + FROM df.nodes + WHERE instance_id = inst_id AND node_type = 'SQL' AND query = 'SELECT ''skip-test-step3''::text' + LIMIT 1; + + IF step3_status != 'skipped' THEN + RAISE EXCEPTION 'TEST FAILED [skipped-seq]: downstream step expected skipped, got %', step3_status; + END IF; + + RAISE NOTICE 'TEST PASSED: skipped status on failed sequence'; +END $$; + +DROP TABLE _test_skip1; + +-- ============================================================================ +-- Test 2: IF condition failure marks both branches as skipped +-- ============================================================================ + +CREATE TEMP TABLE _test_skip2 (instance_id TEXT); + +INSERT INTO _test_skip2 +SELECT df.start( + df.if( + $$SELECT 1/0$$, + $$SELECT 'skip-then-branch'::text$$, + $$SELECT 'skip-else-branch'::text$$ + ), + 'test-failed-downstream-skipped-if' +); + +DO $$ +DECLARE + inst_id TEXT; + wf_status TEXT; + skipped_branch_count INT; +BEGIN + SELECT instance_id INTO inst_id FROM _test_skip2; + SELECT df.wait_for_completion(inst_id) INTO wf_status; + + IF lower(wf_status) != 'failed' THEN + RAISE EXCEPTION 'TEST FAILED [skipped-if]: expected failed, got %', wf_status; + END IF; + + SELECT COUNT(*) INTO skipped_branch_count + FROM df.nodes + WHERE instance_id = inst_id + AND node_type = 'SQL' + AND query IN ('SELECT ''skip-then-branch''::text', 'SELECT ''skip-else-branch''::text') + AND status = 'skipped'; + + IF skipped_branch_count != 2 THEN + RAISE EXCEPTION 'TEST FAILED [skipped-if]: expected 2 skipped branches, got %', skipped_branch_count; + END IF; + + RAISE NOTICE 'TEST PASSED: skipped status on IF branches after condition failure'; +END $$; + +DROP TABLE _test_skip2; + +SELECT 'TEST PASSED' AS result; \ No newline at end of file