From d52a32f08a7dd1f66fa80de1ae2dee23c2efecd7 Mon Sep 17 00:00:00 2001 From: Todd Green Date: Fri, 5 Jun 2026 23:21:39 +0000 Subject: [PATCH 1/5] Fail sub-orchestration fast when its instance id is already terminal A sub-orchestration's child instance id is auto-generated as {parent}::sub::{event_id}. If an instance with that id already exists in a terminal state, the orchestration dispatcher's terminal fast-ack path discarded the incoming StartOrchestration without notifying the scheduling parent, leaving the parent awaiting a completion that never arrived. The dispatcher now enqueues a SubOrchFailed back to the scheduling parent when a discarded terminal batch contains a StartOrchestration whose parent differs from the terminal instance's own recorded parent, so the parent fails fast. Genuine redelivery of a completed child's start (same parent and id) is left untouched. Client::start_orchestration and start_orchestration_versioned also now reject instance ids that use the reserved sub:: marker, so a caller cannot occupy a future child id. Other uses of :: remain valid. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/client/mod.rs | 33 +++++++- src/runtime/dispatchers/orchestration.rs | 65 +++++++++++++++- tests/instance_id_validation_tests.rs | 74 ++++++++++++++++++ tests/scenarios.rs | 3 + tests/scenarios/suborch_id_collision.rs | 95 ++++++++++++++++++++++++ 5 files changed, 267 insertions(+), 3 deletions(-) create mode 100644 tests/instance_id_validation_tests.rs create mode 100644 tests/scenarios/suborch_id_collision.rs diff --git a/src/client/mod.rs b/src/client/mod.rs index d24ebce..57f7a01 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -168,6 +168,24 @@ pub struct Client { store: Arc, } +/// Reject instance ids that collide with the reserved sub-orchestration markers. +/// +/// Child sub-orchestration instance ids are auto-generated as +/// `{parent}::sub::{event_id}` (see [`crate::build_child_instance_id`]). A +/// user-supplied id matching that form could pre-occupy a future child id, so +/// the `sub::` prefix and `::sub::` infix are reserved. Other uses of `::` remain +/// valid. +fn validate_instance_id(instance: &str) -> Result<(), ClientError> { + if instance.starts_with(crate::SUB_ORCH_AUTO_PREFIX) || instance.contains("::sub::") { + return Err(ClientError::InvalidInput { + message: format!( + "instance id '{instance}' uses the reserved sub-orchestration marker 'sub::'" + ), + }); + } + Ok(()) +} + impl Client { /// Create a client bound to a Provider instance. /// @@ -211,6 +229,9 @@ impl Client { /// - Must be unique across all orchestrations /// - Can be any string (alphanumeric + hyphens recommended) /// - Reusing an instance ID that already exists will fail + /// - Must not use the reserved sub-orchestration marker `sub::` (as a prefix + /// or in the `::sub::` form); these are reserved for auto-generated child + /// instance ids. Such ids are rejected with [`ClientError::InvalidInput`]. /// /// # Example /// @@ -230,6 +251,8 @@ impl Client { /// /// # Errors /// + /// Returns `ClientError::InvalidInput` if the instance id uses the reserved + /// `sub::` marker. /// Returns `ClientError::Provider` if the provider fails to enqueue the orchestration. pub async fn start_orchestration( &self, @@ -237,8 +260,10 @@ impl Client { orchestration: impl Into, input: impl Into, ) -> Result<(), ClientError> { + let instance = instance.into(); + validate_instance_id(&instance)?; let item = WorkItem::StartOrchestration { - instance: instance.into(), + instance, orchestration: orchestration.into(), input: input.into(), version: None, @@ -256,6 +281,8 @@ impl Client { /// /// # Errors /// + /// Returns `ClientError::InvalidInput` if the instance id uses the reserved + /// `sub::` marker. /// Returns `ClientError::Provider` if the provider fails to enqueue the orchestration. pub async fn start_orchestration_versioned( &self, @@ -264,8 +291,10 @@ impl Client { version: impl Into, input: impl Into, ) -> Result<(), ClientError> { + let instance = instance.into(); + validate_instance_id(&instance)?; let item = WorkItem::StartOrchestration { - instance: instance.into(), + instance, orchestration: orchestration.into(), input: input.into(), version: Some(version.into()), diff --git a/src/runtime/dispatchers/orchestration.rs b/src/runtime/dispatchers/orchestration.rs index 72488ab..9a83eb5 100644 --- a/src/runtime/dispatchers/orchestration.rs +++ b/src/runtime/dispatchers/orchestration.rs @@ -593,13 +593,18 @@ impl Runtime { || (temp_history_mgr.is_continued_as_new && !workitem_reader.is_continue_as_new) { warn!(instance = %instance, "Instance is terminal (completed/failed or CAN without start), acking batch without processing"); + // If a StartOrchestration in this discarded batch is a sub-orchestration whose + // parent differs from this instance's own recorded parent, the instance id was + // reused for an unrelated child. Notify that parent with a SubOrchFailed so it + // fails fast instead of awaiting a completion that will never arrive. + let orchestrator_items = self.terminal_collision_notifications(&item).await; let _ = self .ack_orchestration_with_changes( lock_token, item.execution_id, vec![], vec![], - vec![], + orchestrator_items, ExecutionMetadata::default(), vec![], // cancelled_activities - none for terminal instances ) @@ -1399,4 +1404,62 @@ impl Runtime { // Record metrics for poison detection self.record_orchestration_poison(); } + + /// Build `SubOrchFailed` notifications for a terminal instance that received a + /// `StartOrchestration` belonging to a different parent. + /// + /// Sub-orchestration child instance ids are auto-generated as + /// `{parent}::sub::{event_id}`. If that id already names a terminal instance, the + /// incoming `StartOrchestration` is discarded by the terminal fast-ack path. Without + /// this notification the scheduling parent would await a completion forever. We only + /// notify when the incoming work item's parent differs from the terminal instance's + /// own recorded parent, so genuine redelivery of a completed child's start (parent + /// already notified) does not spuriously fail the parent again. + async fn terminal_collision_notifications(&self, item: &crate::providers::OrchestrationItem) -> Vec { + // The terminal instance's own parent, as recorded in its history. + let own_parent = item.history.iter().find_map(|e| match &e.kind { + EventKind::OrchestrationStarted { + parent_instance: Some(pi), + parent_id: Some(pid), + .. + } => Some((pi.clone(), *pid)), + _ => None, + }); + + let mut notifications = Vec::new(); + for msg in &item.messages { + if let WorkItem::StartOrchestration { + parent_instance: Some(parent_instance), + parent_id: Some(parent_id), + .. + } = msg + { + // Skip genuine redelivery: same parent that already owns this instance. + if own_parent.as_ref() == Some(&(parent_instance.clone(), *parent_id)) { + continue; + } + warn!( + instance = %item.instance, + parent_instance = %parent_instance, + parent_id = %parent_id, + "Sub-orchestration target instance id already exists and is terminal; notifying parent of failure" + ); + let parent_execution_id = self.get_execution_id_for_instance(parent_instance, None).await; + notifications.push(WorkItem::SubOrchFailed { + parent_instance: parent_instance.clone(), + parent_execution_id, + parent_id: *parent_id, + details: crate::ErrorDetails::Application { + kind: crate::AppErrorKind::OrchestrationFailed, + message: format!( + "sub-orchestration instance id '{}' already exists and is terminal", + item.instance + ), + retryable: false, + }, + }); + } + } + notifications + } } diff --git a/tests/instance_id_validation_tests.rs b/tests/instance_id_validation_tests.rs new file mode 100644 index 0000000..5a1e3af --- /dev/null +++ b/tests/instance_id_validation_tests.rs @@ -0,0 +1,74 @@ +//! Instance id validation: reserved sub-orchestration markers are rejected. +//! +//! Child sub-orchestration instance ids are auto-generated as +//! `{parent}::sub::{event_id}`. User-supplied instance ids must not collide with +//! that reserved form, otherwise they can squat a future child id. Other uses of +//! `::` (e.g. `i4::child-1`) remain valid. + +#![allow(clippy::unwrap_used)] +#![allow(clippy::expect_used)] + +use duroxide::Client; +use duroxide::providers::Provider; +use duroxide::providers::sqlite::SqliteProvider; +use std::sync::Arc; + +async fn client() -> Client { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + Client::new(store) +} + +#[tokio::test] +async fn start_orchestration_rejects_reserved_infix() { + let err = client() + .await + .start_orchestration("victim::sub::2", "AnyOrch", "") + .await + .expect_err("instance id containing the reserved '::sub::' marker must be rejected"); + + assert!( + matches!(err, duroxide::ClientError::InvalidInput { .. }), + "expected InvalidInput, got {err:?}" + ); +} + +#[tokio::test] +async fn start_orchestration_rejects_reserved_prefix() { + let err = client() + .await + .start_orchestration("sub::pending_1", "AnyOrch", "") + .await + .expect_err("instance id starting with the reserved 'sub::' marker must be rejected"); + + assert!(matches!(err, duroxide::ClientError::InvalidInput { .. }), "got {err:?}"); +} + +#[tokio::test] +async fn start_orchestration_versioned_rejects_reserved_infix() { + let err = client() + .await + .start_orchestration_versioned("a::sub::3", "AnyOrch", "1.0.0", "") + .await + .expect_err("instance id containing the reserved '::sub::' marker must be rejected"); + + assert!(matches!(err, duroxide::ClientError::InvalidInput { .. }), "got {err:?}"); +} + +#[tokio::test] +async fn start_orchestration_accepts_normal_id() { + client() + .await + .start_orchestration("order-123", "AnyOrch", "") + .await + .expect("normal instance id must be accepted"); +} + +#[tokio::test] +async fn start_orchestration_accepts_non_reserved_double_colon() { + // `::` is only reserved in the `sub::` form; other uses remain valid. + client() + .await + .start_orchestration("i4::child-1", "AnyOrch", "") + .await + .expect("non-reserved '::' instance id must be accepted"); +} diff --git a/tests/scenarios.rs b/tests/scenarios.rs index 86437ce..f3da9a9 100644 --- a/tests/scenarios.rs +++ b/tests/scenarios.rs @@ -39,3 +39,6 @@ mod replay_versioning; #[path = "scenarios/copilot_chat.rs"] mod copilot_chat; + +#[path = "scenarios/suborch_id_collision.rs"] +mod suborch_id_collision; diff --git a/tests/scenarios/suborch_id_collision.rs b/tests/scenarios/suborch_id_collision.rs new file mode 100644 index 0000000..ec99d13 --- /dev/null +++ b/tests/scenarios/suborch_id_collision.rs @@ -0,0 +1,95 @@ +//! Sub-orchestration instance-id collision scenario. +//! +//! Child sub-orchestration instance ids are auto-generated as +//! `{parent}::sub::{event_id}`. If an instance with that exact id already exists +//! in a terminal state when the parent schedules its child, the parent must not +//! hang forever waiting for a completion that never arrives — it must observe a +//! sub-orchestration failure and reach a terminal state. + +#![allow(clippy::unwrap_used)] +#![allow(clippy::expect_used)] + +use duroxide::providers::Provider; +use duroxide::providers::WorkItem; +use duroxide::providers::sqlite::SqliteProvider; +use duroxide::runtime::registry::ActivityRegistry; +use duroxide::runtime::{self}; +use duroxide::{Client, OrchestrationContext, OrchestrationRegistry, OrchestrationStatus}; +use std::sync::Arc; +use std::time::Duration; + +/// A pre-existing terminal instance occupying the parent's auto-generated child id +/// must not cause the parent to hang. The parent should reach a terminal state. +/// +/// The colliding instance is enqueued directly through the provider to model a +/// client that does not validate instance ids (e.g. an older node during a rolling +/// upgrade), so this exercises the dispatcher's defense independently of the +/// client-side validation. +#[tokio::test] +async fn parent_does_not_hang_when_child_id_already_terminal() { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + + // Parent's first action is a sub-orchestration call. Event 1 = OrchestrationStarted, + // Event 2 = SubOrchestrationScheduled, so the auto-generated child id is + // "{parent}::sub::2". + let parent = |ctx: OrchestrationContext, _input: String| async move { + match ctx.schedule_sub_orchestration("Child", "child-input").await { + Ok(r) => Ok(format!("parent-got:{r}")), + Err(e) => Err(format!("child-failed:{e}")), + } + }; + let child = |_ctx: OrchestrationContext, input: String| async move { Ok(format!("child-done:{input}")) }; + // Unrelated orchestration that completes immediately, used to occupy the child id. + let squatter = |_ctx: OrchestrationContext, _input: String| async move { Ok("squatted".to_string()) }; + + let orchs = OrchestrationRegistry::builder() + .register("Parent", parent) + .register("Child", child) + .register("Squatter", squatter) + .build(); + let acts = ActivityRegistry::builder().build(); + let rt = runtime::Runtime::start_with_store(store.clone(), acts, orchs).await; + let client = Client::new(store.clone()); + + // Occupy the predicted child id with an unrelated, already-completed instance. + // Enqueued directly (bypassing client-side validation) to model a non-validating client. + let squat_id = "job-1::sub::2"; + store + .enqueue_for_orchestrator( + WorkItem::StartOrchestration { + instance: squat_id.to_string(), + orchestration: "Squatter".to_string(), + input: String::new(), + version: None, + parent_instance: None, + parent_id: None, + execution_id: 1, + }, + None, + ) + .await + .unwrap(); + let squat_status = client + .wait_for_orchestration(squat_id, Duration::from_secs(5)) + .await + .unwrap(); + assert!( + matches!(squat_status, OrchestrationStatus::Completed { .. }), + "squatter must complete first; got {squat_status:?}" + ); + + // Start the parent. Its child id collides with the terminal squatter instance. + client.start_orchestration("job-1", "Parent", "").await.unwrap(); + + let status = client + .wait_for_orchestration("job-1", Duration::from_secs(10)) + .await + .expect("parent must reach a terminal state, not hang"); + + assert!( + matches!(status, OrchestrationStatus::Failed { .. }), + "parent should fail fast due to the child-id collision; got {status:?}" + ); + + rt.shutdown(None).await; +} From b0990dbfc3e1c8be92170b16e6bc12c53d7ca75c Mon Sep 17 00:00:00 2001 From: Todd Green Date: Mon, 8 Jun 2026 02:51:18 +0000 Subject: [PATCH 2/5] Keep continue-as-new sub-orchestration ids unique per execution Event ids reset on continue-as-new, so a parent that schedules a sub-orchestration at the same position on every iteration regenerated an identical child id (`{parent}::sub::{event_id}`) and collided with the now-terminal child from the previous iteration, hanging the parent. Auto-generated child ids now include the parent execution after the first (`{parent}::sub::{exec}_{event_id}`), and the runtime records the parent's current execution at turn start so the child's completion routes to the right iteration. The first execution's id format is unchanged and child ids are persisted in history, so in-flight instances and mixed-version clusters are unaffected. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/lib.rs | 18 ++++ src/runtime/execution.rs | 6 ++ src/runtime/replay_engine.rs | 6 +- tests/scenarios/suborch_id_collision.rs | 117 +++++++++++++++++++++++- 4 files changed, 142 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 6faa754..a23477b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -873,6 +873,24 @@ pub fn is_auto_generated_sub_orch_id(instance: &str) -> bool { instance.starts_with(SUB_ORCH_AUTO_PREFIX) } +/// Build the auto-generated sub-orchestration suffix for a given parent execution +/// and scheduling event id. +/// +/// The first execution uses `sub::{event_id}` for backward compatibility. Later +/// executions (after `continue_as_new`) include the execution id as +/// `sub::{execution_id}_{event_id}`: event ids reset on continue-as-new, so a parent +/// that schedules a sub-orchestration at the same position on each iteration would +/// otherwise regenerate an identical child id and collide with the now-terminal +/// child from the previous iteration. +#[inline] +pub(crate) fn auto_sub_orch_suffix(execution_id: u64, event_id: u64) -> String { + if execution_id == INITIAL_EXECUTION_ID { + format!("{SUB_ORCH_AUTO_PREFIX}{event_id}") + } else { + format!("{SUB_ORCH_AUTO_PREFIX}{execution_id}_{event_id}") + } +} + /// Build the full child instance ID, adding parent prefix only for auto-generated IDs. /// /// - Auto-generated IDs (starting with "sub::"): `{parent}::{child}` (e.g., `parent-1::sub::5`) diff --git a/src/runtime/execution.rs b/src/runtime/execution.rs index 3d70804..d9aa5c2 100644 --- a/src/runtime/execution.rs +++ b/src/runtime/execution.rs @@ -85,6 +85,12 @@ impl Runtime { // Execute orchestration turn let messages = &workitem_reader.completion_messages; + // Record this instance's current execution id so sub-orchestrations scheduled in + // this turn route their completions back to the right execution. A turn that only + // schedules a sub-orchestration would otherwise leave a post-continue-as-new + // execution unrecorded, dropping the child's completion. + self.get_execution_id_for_instance(instance, Some(execution_id)).await; + debug!( instance = %instance, message_count = messages.len(), diff --git a/src/runtime/replay_engine.rs b/src/runtime/replay_engine.rs index 7dc67e9..2e35546 100644 --- a/src/runtime/replay_engine.rs +++ b/src/runtime/replay_engine.rs @@ -1079,7 +1079,7 @@ impl ReplayEngine { ctx.bind_token(token, event_id); - let updated_action = update_action_event_id(action, event_id); + let updated_action = update_action_event_id(action, self.execution_id, event_id); if let crate::Action::StartSubOrchestration { instance, .. } = &updated_action { ctx.bind_sub_orchestration_instance(token, instance.clone()); @@ -1747,7 +1747,7 @@ fn action_to_event(action: &Action, instance: &str, execution_id: u64, event_id: /// Update an action's scheduling_event_id to the correct event_id. /// Also generates the actual sub-orchestration instance ID from the event_id /// (unless an explicit instance ID was provided, indicated by not starting with SUB_ORCH_PENDING_PREFIX). -fn update_action_event_id(action: Action, event_id: u64) -> Action { +fn update_action_event_id(action: Action, execution_id: u64, event_id: u64) -> Action { match action { Action::CallActivity { name, @@ -1790,7 +1790,7 @@ fn update_action_event_id(action: Action, event_id: u64) -> Action { // If instance starts with the pending prefix, it's a placeholder that needs to be replaced. // Otherwise, it's an explicit instance ID provided by the user. let final_instance = if instance.starts_with(crate::SUB_ORCH_PENDING_PREFIX) { - format!("{}{event_id}", crate::SUB_ORCH_AUTO_PREFIX) + crate::auto_sub_orch_suffix(execution_id, event_id) } else { instance }; diff --git a/tests/scenarios/suborch_id_collision.rs b/tests/scenarios/suborch_id_collision.rs index ec99d13..ca2691f 100644 --- a/tests/scenarios/suborch_id_collision.rs +++ b/tests/scenarios/suborch_id_collision.rs @@ -86,9 +86,122 @@ async fn parent_does_not_hang_when_child_id_already_terminal() { .await .expect("parent must reach a terminal state, not hang"); + match status { + OrchestrationStatus::Failed { details, .. } => { + let msg = details.display_message(); + assert!( + msg.contains("already exists"), + "failure should reflect the child-id collision; got {msg:?}" + ); + } + other => panic!("parent should fail fast due to the child-id collision; got {other:?}"), + } + + rt.shutdown(None).await; +} + +/// Genuine at-least-once redelivery of a completed child's own `StartOrchestration` +/// must not spuriously fail the parent. The child id already names a terminal instance, +/// but the incoming work item's parent matches that instance's recorded parent, so the +/// dispatcher must skip the collision notification and leave the parent completed. +#[tokio::test] +async fn redelivered_child_start_does_not_fail_parent() { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + + let parent = |ctx: OrchestrationContext, _input: String| async move { + let r = ctx.schedule_sub_orchestration("Child", "child-input").await?; + Ok(format!("parent-got:{r}")) + }; + let child = |_ctx: OrchestrationContext, input: String| async move { Ok(format!("child-done:{input}")) }; + + let orchs = OrchestrationRegistry::builder() + .register("Parent", parent) + .register("Child", child) + .build(); + let acts = ActivityRegistry::builder().build(); + let rt = runtime::Runtime::start_with_store(store.clone(), acts, orchs).await; + let client = Client::new(store.clone()); + + client.start_orchestration("job-2", "Parent", "").await.unwrap(); + let status = client + .wait_for_orchestration("job-2", Duration::from_secs(10)) + .await + .unwrap(); + assert!( + matches!(&status, OrchestrationStatus::Completed { output, .. } if output == "parent-got:child-done:child-input"), + "parent should complete normally first; got {status:?}" + ); + + // Redeliver the completed child's own StartOrchestration (same parent linkage). + // The child id "job-2::sub::2" is now terminal; the dispatcher must treat this as + // redelivery and not fail the parent. + store + .enqueue_for_orchestrator( + WorkItem::StartOrchestration { + instance: "job-2::sub::2".to_string(), + orchestration: "Child".to_string(), + input: "child-input".to_string(), + version: None, + parent_instance: Some("job-2".to_string()), + parent_id: Some(2), + execution_id: 1, + }, + None, + ) + .await + .unwrap(); + + // Give the dispatcher time to process the redelivered item, then confirm the parent + // is still Completed (not spuriously failed). + tokio::time::sleep(Duration::from_secs(1)).await; + let after = client.get_orchestration_status("job-2").await.unwrap(); + assert!( + matches!(after, OrchestrationStatus::Completed { .. }), + "redelivery must not fail the parent; got {after:?}" + ); + + rt.shutdown(None).await; +} + +/// A parent that schedules a sub-orchestration on every continue-as-new iteration +/// must not collide with itself. Event ids reset on continue-as-new, so without +/// execution-scoped child ids the second iteration would regenerate the same child +/// id as the first (now terminal) and hang. The auto-generated id includes the +/// parent execution after the first execution, keeping each iteration's child unique. +#[tokio::test] +async fn parent_with_suborch_survives_continue_as_new() { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + + // Each execution's first action is a sub-orchestration call, then it continues as + // new with an incremented counter until it reaches the limit. + let parent = |ctx: OrchestrationContext, input: String| async move { + let n: u32 = input.parse().unwrap_or(0); + let r = ctx.schedule_sub_orchestration("Child", n.to_string()).await?; + if n < 3 { + return ctx.continue_as_new((n + 1).to_string()).await; + } + Ok(format!("done:{n}:{r}")) + }; + let child = |_ctx: OrchestrationContext, input: String| async move { Ok(format!("child-done:{input}")) }; + + let orchs = OrchestrationRegistry::builder() + .register("Parent", parent) + .register("Child", child) + .build(); + let acts = ActivityRegistry::builder().build(); + let rt = runtime::Runtime::start_with_store(store.clone(), acts, orchs).await; + let client = Client::new(store.clone()); + + client.start_orchestration("can-job", "Parent", "0").await.unwrap(); + + let status = client + .wait_for_orchestration("can-job", Duration::from_secs(10)) + .await + .expect("parent must run through all continue-as-new iterations, not hang"); + assert!( - matches!(status, OrchestrationStatus::Failed { .. }), - "parent should fail fast due to the child-id collision; got {status:?}" + matches!(&status, OrchestrationStatus::Completed { output, .. } if output == "done:3:child-done:3"), + "parent should complete after looping with sub-orchestrations; got {status:?}" ); rt.shutdown(None).await; From eeee0a2e649fa1d01ae93813a394c530bc0b963a Mon Sep 17 00:00:00 2001 From: Todd Green Date: Tue, 9 Jun 2026 20:35:48 +0000 Subject: [PATCH 3/5] Address PR review: durable routing for sub-orch collision failures Route sub-orchestration completion/failure notifications using the parent's current execution id read from durable provider state (Provider::read) instead of a process-local cache. A restarted or different runtime previously defaulted to execution 1 on a cache miss, so a parent on execution 2+ filtered the notification during replay and hung forever. Removes the current_execution_ids cache entirely (also eliminating its unbounded per-turn growth), updates comments to cover the post-continue-as-new child-id form, documents the explicit-id escape hatch, adds CHANGELOG/migration notes for the reserved sub:: marker, makes the redelivery test deterministic, and adds a cross-runtime regression test where a fresh runtime that never ran the parent must still route the failure to execution 2. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- CHANGELOG.md | 25 ++ docs/migration-guide.md | 30 ++ src/client/mod.rs | 11 +- src/lib.rs | 21 +- src/runtime/dispatchers/orchestration.rs | 20 +- src/runtime/execution.rs | 15 +- src/runtime/mod.rs | 48 +-- tests/scenarios/suborch_id_collision.rs | 428 ++++++++++++++++++++++- 8 files changed, 539 insertions(+), 59 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 71a396e..d70b48d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,31 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Changed + +- Reserved the `sub::` marker for runtime-generated sub-orchestration instance ids. + `Client::start_orchestration` and `Client::start_orchestration_versioned` now + return `ClientError::InvalidInput` for root instance ids that start with `sub::` + or contain `::sub::`; other uses of `::` remain supported. Applications that used + the reserved marker in root instance ids must rename those ids before upgrading. + See [docs/migration-guide.md](docs/migration-guide.md) for guidance. + +### Fixed + +- **Parent hang on sub-orchestration instance-id collision** — When an auto-generated + child instance id already named a terminal instance, the scheduling parent could await + a completion that never arrived. The runtime now notifies the parent with a + sub-orchestration failure so it fails fast. The failure (and all sub-orchestration + completion/failure notifications) is routed to the parent's current execution using + durable provider state instead of process-local memory, so routing stays correct across + runtime restarts and multiple dispatcher nodes. +- **Sub-orchestration id reuse across continue-as-new** — Child instance ids generated + after a parent `continue_as_new` now include the parent execution id + (`{parent}::sub::{execution_id}_{event_id}`), preventing collisions with the terminal + child of a previous iteration that schedules at the same position. + ## [0.1.29] - 2026-05-08 **Release:** diff --git a/docs/migration-guide.md b/docs/migration-guide.md index c73e826..aae80bd 100644 --- a/docs/migration-guide.md +++ b/docs/migration-guide.md @@ -2,6 +2,36 @@ This guide helps you migrate between Duroxide versions and handle orchestration versioning. +## Reserved `sub::` instance-id marker (Unreleased) + +The `sub::` marker is now reserved for runtime-generated sub-orchestration instance ids. +`Client::start_orchestration` and `Client::start_orchestration_versioned` reject root +instance ids that: + +- start with `sub::`, or +- contain the `::sub::` infix. + +Such ids return `ClientError::InvalidInput`. Ordinary uses of `::` in instance ids remain +valid (e.g. `tenant-7::order-42`); only the `sub::` marker is reserved. + +This prevents a root instance id from pre-occupying an auto-generated child id. Child +sub-orchestration ids take the form `{parent}::sub::{event_id}` on the first parent +execution and `{parent}::sub::{execution_id}_{event_id}` after `continue_as_new`. + +Before upgrading client code, audit your root instance-id scheme for the reserved marker: + +```text +# Reject — start with `sub::` or contain `::sub::` +sub::job-1 +tenant-7::sub::order-42 + +# Accept — ordinary `::` is fine +tenant-7::order-42 +order-2026-06-09 +``` + +Rename any root instance ids that use the reserved marker before upgrading. + ## Orchestration Versioning Duroxide supports versioning to handle code evolution while maintaining compatibility with running instances. diff --git a/src/client/mod.rs b/src/client/mod.rs index 57f7a01..66e135c 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -170,11 +170,12 @@ pub struct Client { /// Reject instance ids that collide with the reserved sub-orchestration markers. /// -/// Child sub-orchestration instance ids are auto-generated as -/// `{parent}::sub::{event_id}` (see [`crate::build_child_instance_id`]). A -/// user-supplied id matching that form could pre-occupy a future child id, so -/// the `sub::` prefix and `::sub::` infix are reserved. Other uses of `::` remain -/// valid. +/// Child sub-orchestration instance ids reserve the `sub::` marker (see +/// [`crate::auto_sub_orch_suffix`], the canonical formatter). The first parent +/// execution uses `{parent}::sub::{event_id}`; executions after continue-as-new use +/// `{parent}::sub::{execution_id}_{event_id}`. A user-supplied id matching either form +/// could pre-occupy a future child id, so the `sub::` prefix and `::sub::` infix are +/// reserved. Other uses of `::` remain valid. fn validate_instance_id(instance: &str) -> Result<(), ClientError> { if instance.starts_with(crate::SUB_ORCH_AUTO_PREFIX) || instance.contains("::sub::") { return Err(ClientError::InvalidInput { diff --git a/src/lib.rs b/src/lib.rs index a23477b..126f66e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3774,7 +3774,22 @@ impl OrchestrationContext { /// without any parent prefix. Use this when you need to control the exact /// instance ID for the sub-orchestration. /// - /// For auto-generated instance IDs, use [`schedule_sub_orchestration`] instead. + /// For auto-generated instance IDs, use [`schedule_sub_orchestration`](Self::schedule_sub_orchestration) + /// instead. + /// + /// # Reserved marker (advanced escape hatch) + /// + /// Unlike [`crate::Client::start_orchestration`], explicit child ids are **not** + /// validated against the reserved `sub::` marker — they are an advanced escape hatch + /// where the caller owns the full id space. Two consequences to be aware of: + /// + /// - An explicit id of the runtime-generated shape (e.g. `parent::sub::2`) is allowed + /// and may therefore collide with an auto-generated child id. The runtime defends + /// against the resulting collision: if the id already names a terminal instance the + /// scheduling parent receives a sub-orchestration failure instead of hanging. + /// - An explicit id that *starts with* `sub::` is treated as auto-generated by + /// [`crate::build_child_instance_id`] and therefore gets the parent prefix added, + /// so it is **not** used verbatim. Avoid leading `sub::` in explicit ids. pub fn schedule_sub_orchestration_with_id( &self, name: impl Into, @@ -3802,6 +3817,10 @@ impl OrchestrationContext { /// The provided `instance` value is used exactly as the child instance ID, /// without any parent prefix. /// + /// Like [`schedule_sub_orchestration_with_id`](Self::schedule_sub_orchestration_with_id), + /// explicit child ids are an advanced escape hatch and are **not** validated against the + /// reserved `sub::` marker; see that method for the collision and leading-`sub::` caveats. + /// /// Returns a [`DurableFuture`] that supports cancellation on drop. If the future /// is dropped without completing, a `CancelInstance` work item will be enqueued /// for the child orchestration. diff --git a/src/runtime/dispatchers/orchestration.rs b/src/runtime/dispatchers/orchestration.rs index 9a83eb5..09b3861 100644 --- a/src/runtime/dispatchers/orchestration.rs +++ b/src/runtime/dispatchers/orchestration.rs @@ -1378,7 +1378,7 @@ impl Runtime { parent_id = %parent_id, "Enqueue SubOrchFailed to parent (poison)" ); - let parent_execution_id = self.get_execution_id_for_instance(&parent_instance, None).await; + let parent_execution_id = self.parent_execution_id_for_routing(&parent_instance).await; vec![WorkItem::SubOrchFailed { parent_instance, parent_execution_id, @@ -1408,13 +1408,15 @@ impl Runtime { /// Build `SubOrchFailed` notifications for a terminal instance that received a /// `StartOrchestration` belonging to a different parent. /// - /// Sub-orchestration child instance ids are auto-generated as - /// `{parent}::sub::{event_id}`. If that id already names a terminal instance, the - /// incoming `StartOrchestration` is discarded by the terminal fast-ack path. Without - /// this notification the scheduling parent would await a completion forever. We only - /// notify when the incoming work item's parent differs from the terminal instance's - /// own recorded parent, so genuine redelivery of a completed child's start (parent - /// already notified) does not spuriously fail the parent again. + /// Sub-orchestration child instance ids reserve the `sub::` marker (see + /// [`crate::auto_sub_orch_suffix`]): the first parent execution uses + /// `{parent}::sub::{event_id}` and executions after continue-as-new use + /// `{parent}::sub::{execution_id}_{event_id}`. If such an id already names a terminal + /// instance, the incoming `StartOrchestration` is discarded by the terminal fast-ack + /// path. Without this notification the scheduling parent would await a completion + /// forever. We only notify when the incoming work item's parent differs from the + /// terminal instance's own recorded parent, so genuine redelivery of a completed + /// child's start (parent already notified) does not spuriously fail the parent again. async fn terminal_collision_notifications(&self, item: &crate::providers::OrchestrationItem) -> Vec { // The terminal instance's own parent, as recorded in its history. let own_parent = item.history.iter().find_map(|e| match &e.kind { @@ -1444,7 +1446,7 @@ impl Runtime { parent_id = %parent_id, "Sub-orchestration target instance id already exists and is terminal; notifying parent of failure" ); - let parent_execution_id = self.get_execution_id_for_instance(parent_instance, None).await; + let parent_execution_id = self.parent_execution_id_for_routing(parent_instance).await; notifications.push(WorkItem::SubOrchFailed { parent_instance: parent_instance.clone(), parent_execution_id, diff --git a/src/runtime/execution.rs b/src/runtime/execution.rs index d9aa5c2..927f607 100644 --- a/src/runtime/execution.rs +++ b/src/runtime/execution.rs @@ -85,12 +85,6 @@ impl Runtime { // Execute orchestration turn let messages = &workitem_reader.completion_messages; - // Record this instance's current execution id so sub-orchestrations scheduled in - // this turn route their completions back to the right execution. A turn that only - // schedules a sub-orchestration would otherwise leave a post-continue-as-new - // execution unrecorded, dropping the child's completion. - self.get_execution_id_for_instance(instance, Some(execution_id)).await; - debug!( instance = %instance, message_count = messages.len(), @@ -157,7 +151,6 @@ impl Runtime { session_id, tag, } => { - let execution_id = self.get_execution_id_for_instance(instance, Some(execution_id)).await; worker_items.push(WorkItem::ActivityExecute { instance: instance.to_string(), execution_id, @@ -172,8 +165,6 @@ impl Runtime { scheduling_event_id, fire_at_ms, } => { - let execution_id = self.get_execution_id_for_instance(instance, Some(execution_id)).await; - // Enqueue TimerFired to orchestrator queue with delayed visibility // Provider will use fire_at_ms for the visible_at timestamp // Note: fire_at_ms is computed at scheduling time (wall-clock), @@ -250,7 +241,7 @@ impl Runtime { tracing::debug!(target = "duroxide::runtime::execution", instance=%instance, parent_instance=%parent_instance, parent_id=%parent_id, "Enqueue SubOrchCompleted to parent"); orchestrator_items.push(WorkItem::SubOrchCompleted { parent_instance: parent_instance.clone(), - parent_execution_id: self.get_execution_id_for_instance(&parent_instance, None).await, + parent_execution_id: self.parent_execution_id_for_routing(&parent_instance).await, parent_id, result: output.clone(), }); @@ -283,7 +274,7 @@ impl Runtime { tracing::debug!(target = "duroxide::runtime::execution", instance=%instance, parent_instance=%parent_instance, parent_id=%parent_id, "Enqueue SubOrchFailed to parent"); orchestrator_items.push(WorkItem::SubOrchFailed { parent_instance: parent_instance.clone(), - parent_execution_id: self.get_execution_id_for_instance(&parent_instance, None).await, + parent_execution_id: self.parent_execution_id_for_routing(&parent_instance).await, parent_id, details: details.clone(), }); @@ -370,7 +361,7 @@ impl Runtime { if let Some((parent_instance, parent_id)) = parent_link { orchestrator_items.push(WorkItem::SubOrchFailed { parent_instance: parent_instance.clone(), - parent_execution_id: self.get_execution_id_for_instance(&parent_instance, None).await, + parent_execution_id: self.parent_execution_id_for_routing(&parent_instance).await, parent_id, details: details.clone(), }); diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index fef0b09..1310874 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -6,7 +6,6 @@ // use crate::providers::{ExecutionMetadata, Provider, WorkItem}; use crate::{Event, EventKind, OrchestrationContext}; -use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; @@ -492,8 +491,6 @@ pub struct Runtime { joins: Mutex>>, history_store: Arc, orchestration_registry: OrchestrationRegistry, - /// Track the current execution ID for each active instance - current_execution_ids: Mutex>, /// Shutdown flag checked by dispatchers shutdown_flag: Arc, /// Runtime configuration options @@ -842,28 +839,32 @@ impl Runtime { None } - /// Get the current execution ID for an instance, or fetch from store if not tracked + /// Resolve a parent instance's current execution id for routing a sub-orchestration + /// completion or failure back to it. /// - /// If `current_execution_id` is provided and the instance matches, use it directly. - /// Otherwise, check in-memory tracking, then fall back to INITIAL_EXECUTION_ID. - async fn get_execution_id_for_instance(&self, instance: &str, current_execution_id: Option) -> u64 { - // If this is the current instance being processed, use the provided execution_id - if let Some(exec_id) = current_execution_id { - // Update in-memory tracking for future calls - self.current_execution_ids - .lock() - .await - .insert(instance.to_string(), exec_id); - return exec_id; - } - - // First check in-memory tracking - if let Some(&exec_id) = self.current_execution_ids.lock().await.get(instance) { - return exec_id; + /// Reads the durable execution id from the provider rather than process-local memory, + /// so routing is correct when a restarted runtime or a different dispatcher node emits + /// the notification. A misrouted notification (e.g. defaulting to execution 1 while the + /// parent is on execution 2+) is filtered out during replay and would leave the parent + /// awaiting forever. `Provider::read` returns the parent's current-execution history, so + /// any event's `execution_id` is the current one. On a read error (or no history yet) we + /// fall back to `INITIAL_EXECUTION_ID`. + async fn parent_execution_id_for_routing(&self, parent_instance: &str) -> u64 { + match self.history_store.read(parent_instance).await { + Ok(events) => events + .iter() + .map(|e| e.execution_id) + .max() + .unwrap_or(crate::INITIAL_EXECUTION_ID), + Err(e) => { + tracing::warn!( + parent_instance = %parent_instance, + error = %e, + "failed to read parent history for sub-orchestration routing; defaulting to INITIAL_EXECUTION_ID" + ); + crate::INITIAL_EXECUTION_ID + } } - - // Fall back to INITIAL_EXECUTION_ID (no longer querying Provider::latest_execution_id) - crate::INITIAL_EXECUTION_ID } /// Start a new runtime using the in-memory SQLite provider. @@ -971,7 +972,6 @@ impl Runtime { joins: Mutex::new(joins), history_store, orchestration_registry, - current_execution_ids: Mutex::new(HashMap::new()), shutdown_flag: Arc::new(AtomicBool::new(false)), options, diff --git a/tests/scenarios/suborch_id_collision.rs b/tests/scenarios/suborch_id_collision.rs index ca2691f..c2e0949 100644 --- a/tests/scenarios/suborch_id_collision.rs +++ b/tests/scenarios/suborch_id_collision.rs @@ -1,8 +1,9 @@ //! Sub-orchestration instance-id collision scenario. //! -//! Child sub-orchestration instance ids are auto-generated as -//! `{parent}::sub::{event_id}`. If an instance with that exact id already exists -//! in a terminal state when the parent schedules its child, the parent must not +//! Child sub-orchestration instance ids reserve the `sub::` marker: the first parent +//! execution uses `{parent}::sub::{event_id}` and executions after continue-as-new use +//! `{parent}::sub::{execution_id}_{event_id}`. If an instance with that exact id already +//! exists in a terminal state when the parent schedules its child, the parent must not //! hang forever waiting for a completion that never arrives — it must observe a //! sub-orchestration failure and reach a terminal state. @@ -12,12 +13,16 @@ use duroxide::providers::Provider; use duroxide::providers::WorkItem; use duroxide::providers::sqlite::SqliteProvider; +use duroxide::providers::ExecutionMetadata; use duroxide::runtime::registry::ActivityRegistry; use duroxide::runtime::{self}; -use duroxide::{Client, OrchestrationContext, OrchestrationRegistry, OrchestrationStatus}; +use duroxide::{Client, Event, EventKind, OrchestrationContext, OrchestrationRegistry, OrchestrationStatus}; use std::sync::Arc; use std::time::Duration; +#[path = "../common/mod.rs"] +mod common; + /// A pre-existing terminal instance occupying the parent's auto-generated child id /// must not cause the parent to hang. The parent should reach a terminal state. /// @@ -132,9 +137,12 @@ async fn redelivered_child_start_does_not_fail_parent() { "parent should complete normally first; got {status:?}" ); + // Snapshot the parent's history before redelivery so we can prove nothing was appended. + let parent_history_before = store.read("job-2").await.unwrap(); + // Redeliver the completed child's own StartOrchestration (same parent linkage). // The child id "job-2::sub::2" is now terminal; the dispatcher must treat this as - // redelivery and not fail the parent. + // redelivery and not enqueue a SubOrchFailed for the parent. store .enqueue_for_orchestrator( WorkItem::StartOrchestration { @@ -151,9 +159,38 @@ async fn redelivered_child_start_does_not_fail_parent() { .await .unwrap(); - // Give the dispatcher time to process the redelivered item, then confirm the parent - // is still Completed (not spuriously failed). - tokio::time::sleep(Duration::from_secs(1)).await; + // Wait deterministically until the redelivered child start has drained from the + // orchestrator queue (and the queue has settled), rather than sleeping a fixed time. + wait_for_orchestrator_queue_drained(&store, Duration::from_secs(10)).await; + + // A spurious notification would be a parent-targeted SubOrchFailed. Assert none is + // queued and none was appended to the parent's history. (Checking only that the parent + // still reports Completed is insufficient: a SubOrchFailed delivered to an already + // terminal parent is discarded by the terminal fast-ack path without a trace.) + let depths = store + .as_management_capability() + .unwrap() + .get_queue_depths() + .await + .unwrap(); + assert_eq!( + depths.orchestrator_queue, 0, + "no parent-targeted SubOrchFailed should remain queued after redelivery" + ); + + let parent_history_after = store.read("job-2").await.unwrap(); + assert_eq!( + parent_history_after.len(), + parent_history_before.len(), + "redelivery must not append any event (e.g. SubOrchestrationFailed) to the parent" + ); + assert!( + !parent_history_after + .iter() + .any(|e| matches!(e.kind, duroxide::EventKind::OrchestrationFailed { .. })), + "parent history must not contain an OrchestrationFailed event after redelivery" + ); + let after = client.get_orchestration_status("job-2").await.unwrap(); assert!( matches!(after, OrchestrationStatus::Completed { .. }), @@ -206,3 +243,378 @@ async fn parent_with_suborch_survives_continue_as_new() { rt.shutdown(None).await; } + +/// Regression for execution-scoped routing of the terminal-collision failure within a +/// single end-to-end run. +/// +/// A parent continues as new and, on execution 2, schedules a sub-orchestration whose +/// auto-generated child id (`{parent}::sub::{execution_id}_{event_id}`) already names a +/// terminal instance. The collision failure must be recorded in execution 2, not +/// misrouted to execution 1. This drives the full flow through one runtime; the +/// `terminal_collision_routes_to_parent_current_execution_on_fresh_runtime` test below +/// is the stronger cross-runtime guard. +#[tokio::test] +async fn parent_on_execution_two_fails_fast_on_child_id_collision() { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + + // On execution 1 the parent immediately continues as new; on execution 2 its first + // action is a sub-orchestration call. Event 1 = OrchestrationStarted, event 2 = + // SubOrchestrationScheduled, so the execution-2 child id is "coll::sub::2_2". + let parent = |ctx: OrchestrationContext, input: String| async move { + let n: u32 = input.parse().unwrap_or(0); + if n == 0 { + return ctx.continue_as_new("1").await; + } + match ctx.schedule_sub_orchestration("Child", "x").await { + Ok(r) => Ok(format!("parent-got:{r}")), + Err(e) => Err(format!("child-failed:{e}")), + } + }; + let child = |_ctx: OrchestrationContext, input: String| async move { Ok(format!("child-done:{input}")) }; + let squatter = |_ctx: OrchestrationContext, _input: String| async move { Ok("squatted".to_string()) }; + + let squat_id = "coll::sub::2_2"; + + // Runtime A occupies the predicted execution-2 child id with an unrelated terminal + // instance, then shuts down so it holds no in-memory routing state for the parent. + { + let orchs = OrchestrationRegistry::builder().register("Squatter", squatter).build(); + let acts = ActivityRegistry::builder().build(); + let rt_a = runtime::Runtime::start_with_store(store.clone(), acts, orchs).await; + let client_a = Client::new(store.clone()); + store + .enqueue_for_orchestrator( + WorkItem::StartOrchestration { + instance: squat_id.to_string(), + orchestration: "Squatter".to_string(), + input: String::new(), + version: None, + parent_instance: None, + parent_id: None, + execution_id: 1, + }, + None, + ) + .await + .unwrap(); + let squat_status = client_a + .wait_for_orchestration(squat_id, Duration::from_secs(5)) + .await + .unwrap(); + assert!( + matches!(squat_status, OrchestrationStatus::Completed { .. }), + "squatter must complete first; got {squat_status:?}" + ); + rt_a.shutdown(None).await; + } + + // Runtime B is a fresh runtime (no cached execution ids). It drives the parent to + // execution 2, where the child id collides with the terminal squatter. + let orchs = OrchestrationRegistry::builder() + .register("Parent", parent) + .register("Child", child) + .build(); + let acts = ActivityRegistry::builder().build(); + let rt_b = runtime::Runtime::start_with_store(store.clone(), acts, orchs).await; + let client_b = Client::new(store.clone()); + + client_b.start_orchestration("coll", "Parent", "0").await.unwrap(); + + let status = client_b + .wait_for_orchestration("coll", Duration::from_secs(10)) + .await + .expect("parent on execution 2 must fail fast, not hang"); + + match status { + OrchestrationStatus::Failed { details, .. } => { + let msg = details.display_message(); + assert!( + msg.contains("already exists"), + "failure should reflect the child-id collision; got {msg:?}" + ); + } + other => panic!("parent should fail fast due to the child-id collision; got {other:?}"), + } + + // The failure must be recorded in execution 2 (proves the notification routed to the + // parent's current execution, not execution 1). + let exec2 = store.read_with_execution("coll", 2).await.unwrap(); + assert!( + exec2 + .iter() + .any(|e| matches!(e.kind, duroxide::EventKind::OrchestrationFailed { .. })), + "execution 2 history must contain the OrchestrationFailed event" + ); + + rt_b.shutdown(None).await; +} + +/// Stronger cross-runtime regression for terminal-collision routing. +/// +/// Here the runtime that processes the colliding child start has *never* run the parent, +/// so it holds no in-memory association between the parent and its current execution. The +/// parent's execution-2 state (parked awaiting a sub-orchestration whose id collides with a +/// foreign terminal instance) is seeded directly into the provider. A fresh runtime must +/// read the parent's current execution (2) from durable provider state when routing the +/// `SubOrchFailed`, so the failure lands in execution 2 and the parent fails fast. If the +/// failure were routed to execution 1 (as a process-local cache miss would default to), the +/// parent's replay would filter it out and the parent would hang. +#[tokio::test] +async fn terminal_collision_routes_to_parent_current_execution_on_fresh_runtime() { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + + let parent_id = "seeded-parent"; + let child_id = "seeded-parent::sub::2_2"; + + // 1. Seed a foreign terminal instance occupying the parent's execution-2 child id. + common::seed_history_turn( + store.as_ref(), + WorkItem::StartOrchestration { + instance: child_id.to_string(), + orchestration: "Squatter".to_string(), + input: String::new(), + version: Some("1.0.0".to_string()), + parent_instance: None, + parent_id: None, + execution_id: 1, + }, + 1, + vec![ + Event::with_event_id( + 1, + child_id, + 1, + None, + EventKind::OrchestrationStarted { + name: "Squatter".to_string(), + version: "1.0.0".to_string(), + input: String::new(), + parent_instance: None, + parent_id: None, + carry_forward_events: None, + initial_custom_status: None, + }, + ), + Event::with_event_id( + 2, + child_id, + 1, + None, + EventKind::OrchestrationCompleted { + output: "squatted".to_string(), + }, + ), + ], + vec![], + ExecutionMetadata { + orchestration_name: Some("Squatter".to_string()), + orchestration_version: Some("1.0.0".to_string()), + ..Default::default() + }, + ) + .await; + + // 2. Seed the parent directly on execution 2, parked awaiting the colliding child. + // On execution 2 its first action is the sub-orchestration call: event 1 = + // OrchestrationStarted, event 2 = SubOrchestrationScheduled, id "...::sub::2_2". + common::seed_history_turn( + store.as_ref(), + WorkItem::StartOrchestration { + instance: parent_id.to_string(), + orchestration: "Parent".to_string(), + input: "1".to_string(), + version: Some("1.0.0".to_string()), + parent_instance: None, + parent_id: None, + execution_id: 2, + }, + 2, + vec![ + Event::with_event_id( + 1, + parent_id, + 2, + None, + EventKind::OrchestrationStarted { + name: "Parent".to_string(), + version: "1.0.0".to_string(), + input: "1".to_string(), + parent_instance: None, + parent_id: None, + carry_forward_events: None, + initial_custom_status: None, + }, + ), + Event::with_event_id( + 2, + parent_id, + 2, + None, + EventKind::SubOrchestrationScheduled { + name: "Child".to_string(), + instance: child_id.to_string(), + input: "x".to_string(), + }, + ), + ], + vec![], + ExecutionMetadata { + orchestration_name: Some("Parent".to_string()), + orchestration_version: Some("1.0.0".to_string()), + ..Default::default() + }, + ) + .await; + + // Sanity: durable state reports the parent on execution 2. + assert_eq!( + store.read(parent_id).await.unwrap().iter().map(|e| e.execution_id).max(), + Some(2), + "seeded parent must be on execution 2" + ); + + // 3. Enqueue the colliding child start, as a runtime would have when the parent + // scheduled the sub-orchestration. Its target id is already terminal (the foreign + // squatter), and its parent differs, so this is a genuine collision. + store + .enqueue_for_orchestrator( + WorkItem::StartOrchestration { + instance: child_id.to_string(), + orchestration: "Child".to_string(), + input: "x".to_string(), + version: Some("1.0.0".to_string()), + parent_instance: Some(parent_id.to_string()), + parent_id: Some(2), + execution_id: 1, + }, + None, + ) + .await + .unwrap(); + + // 4. A fresh runtime that never ran the parent processes the collision and must route + // the failure to the parent's current execution (2), read from durable state. + let parent = |ctx: OrchestrationContext, input: String| async move { + let n: u32 = input.parse().unwrap_or(0); + if n == 0 { + return ctx.continue_as_new("1").await; + } + match ctx.schedule_sub_orchestration("Child", "x").await { + Ok(r) => Ok(format!("parent-got:{r}")), + Err(e) => Err(format!("child-failed:{e}")), + } + }; + let child = |_ctx: OrchestrationContext, input: String| async move { Ok(format!("child-done:{input}")) }; + + let orchs = OrchestrationRegistry::builder() + .register("Parent", parent) + .register("Child", child) + .build(); + let acts = ActivityRegistry::builder().build(); + let rt = runtime::Runtime::start_with_store(store.clone(), acts, orchs).await; + let client = Client::new(store.clone()); + + let status = client + .wait_for_orchestration(parent_id, Duration::from_secs(10)) + .await + .expect("fresh runtime must route the failure to execution 2, not hang"); + + match status { + OrchestrationStatus::Failed { details, .. } => { + let msg = details.display_message(); + assert!( + msg.contains("already exists"), + "failure should reflect the child-id collision; got {msg:?}" + ); + } + other => panic!("parent should fail fast due to the child-id collision; got {other:?}"), + } + + let exec2 = store.read_with_execution(parent_id, 2).await.unwrap(); + assert!( + exec2 + .iter() + .any(|e| matches!(e.kind, EventKind::OrchestrationFailed { .. })), + "execution 2 history must contain the OrchestrationFailed event" + ); + + rt.shutdown(None).await; +} +/// `sub::` marker validation that `Client::start_orchestration` enforces. An orchestration +/// may therefore use an id that a top-level client start would reject, and it is used as +/// the exact child instance id. +#[tokio::test] +async fn explicit_sub_orchestration_id_bypasses_reserved_marker_validation() { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + + // An id a top-level client start would reject (contains the reserved `::sub::` infix), + // used verbatim as an explicit child id. + let explicit_id = "tenant::sub::99"; + + let parent = |ctx: OrchestrationContext, _input: String| async move { + let r = ctx + .schedule_sub_orchestration_with_id("Child", "tenant::sub::99", "child-input") + .await?; + Ok(format!("parent-got:{r}")) + }; + let child = |_ctx: OrchestrationContext, input: String| async move { Ok(format!("child-done:{input}")) }; + + let orchs = OrchestrationRegistry::builder() + .register("Parent", parent) + .register("Child", child) + .build(); + let acts = ActivityRegistry::builder().build(); + let rt = runtime::Runtime::start_with_store(store.clone(), acts, orchs).await; + let client = Client::new(store.clone()); + + // The same id is rejected for a top-level start. + let rejected = client.start_orchestration(explicit_id, "Parent", "").await; + assert!( + matches!(rejected, Err(duroxide::ClientError::InvalidInput { .. })), + "top-level start with the reserved marker must be rejected; got {rejected:?}" + ); + + // But the explicit sub-orchestration escape hatch allows it. + client.start_orchestration("tenant-parent", "Parent", "").await.unwrap(); + let status = client + .wait_for_orchestration("tenant-parent", Duration::from_secs(10)) + .await + .expect("parent using an explicit reserved-shaped child id should complete"); + assert!( + matches!(&status, OrchestrationStatus::Completed { output, .. } if output == "parent-got:child-done:child-input"), + "parent should complete with the explicit child id; got {status:?}" + ); + + // The child ran under the exact explicit id (no parent prefix). + let child_status = client.get_orchestration_status(explicit_id).await.unwrap(); + assert!( + matches!(child_status, OrchestrationStatus::Completed { .. }), + "explicit child id must be used verbatim; got {child_status:?}" + ); + + rt.shutdown(None).await; +} + +/// Poll until the orchestrator queue has drained and stayed empty across several reads, +/// so a transient parent-targeted `SubOrchFailed` (if one were wrongly enqueued) is given +/// a chance to appear before we assert its absence. +async fn wait_for_orchestrator_queue_drained(store: &Arc, timeout: Duration) { + let mgmt = store.as_management_capability().expect("management capability"); + let deadline = std::time::Instant::now() + timeout; + let mut consecutive_empty = 0; + loop { + let depth = mgmt.get_queue_depths().await.unwrap().orchestrator_queue; + if depth == 0 { + consecutive_empty += 1; + if consecutive_empty >= 5 { + return; + } + } else { + consecutive_empty = 0; + } + if std::time::Instant::now() >= deadline { + panic!("orchestrator queue did not drain within {timeout:?}"); + } + tokio::time::sleep(Duration::from_millis(50)).await; + } +} From 45849ec01ed87add815dad1257a12dee13662737 Mon Sep 17 00:00:00 2001 From: Todd Green Date: Thu, 18 Jun 2026 03:08:03 +0000 Subject: [PATCH 4/5] Stamp parent_execution_id for durable sub-orchestration routing Route sub-orchestration completion/failure notifications to the exact parent execution that scheduled the child by stamping parent_execution_id onto the child StartOrchestration work item at schedule time and persisting it in the child's OrchestrationStarted event. All notification sites now use the stamped value, falling back to a durable provider read only when it is absent (children/work items from older runtimes). This removes the prior TOCTOU window where the parent's current execution at completion time could differ from the execution that scheduled the child, and avoids loading parent history on every child completion. Both new fields are Option with serde default + skip_serializing_if, so the wire and history formats stay backward compatible and mixed-version clusters route correctly during rolling upgrades. Restructure the collision scenario tests: make the same-parent continue-as-new self-collision the primary regression, asserting the exact per-execution child suffixes (sub::2, sub::2_2, sub::3_2). Relabel and rename the foreign-terminal "squatter" tests to legacy_provider_bypass_* and document them as legacy/provider-bypass defenses rather than the normal auto-generated collision case. Update CHANGELOG and the migration guide. --- CHANGELOG.md | 13 +- docs/migration-guide.md | 22 ++ src/client/mod.rs | 2 + src/lib.rs | 7 + src/provider_validation/atomicity.rs | 6 + src/provider_validation/bulk_deletion.rs | 2 + src/provider_validation/cancellation.rs | 11 + .../capability_filtering.rs | 2 + src/provider_validation/deletion.rs | 11 + src/provider_validation/error_handling.rs | 2 + src/provider_validation/instance_creation.rs | 8 + src/provider_validation/kv_store.rs | 3 + src/provider_validation/lock_expiration.rs | 2 + src/provider_validation/management.rs | 6 + src/provider_validation/mod.rs | 3 + src/provider_validation/multi_execution.rs | 7 + src/provider_validation/poison_message.rs | 4 + src/provider_validation/prune.rs | 2 + src/provider_validation/queue_semantics.rs | 2 + src/provider_validation/tag_filtering.rs | 1 + src/providers/mod.rs | 7 + src/providers/sqlite.rs | 15 ++ src/runtime/dispatchers/orchestration.rs | 89 ++++---- src/runtime/execution.rs | 26 ++- src/runtime/mod.rs | 30 ++- src/runtime/replay_engine.rs | 1 + src/runtime/replay_engine_tests.rs | 6 + src/runtime/state_helpers.rs | 48 ++++- tests/cancellation_tests.rs | 1 + tests/capability_filtering_tests.rs | 1 + tests/common/mod.rs | 4 + tests/observability_tests.rs | 3 + tests/poison_message_tests.rs | 1 + tests/provider_atomic_tests.rs | 7 + tests/replay_engine/helpers.rs | 1 + tests/scenarios/replay_versioning.rs | 2 + tests/scenarios/suborch_id_collision.rs | 200 ++++++++++++++---- tests/scenarios/version_replay_bug.rs | 6 + tests/sqlite_tests.rs | 16 ++ 39 files changed, 477 insertions(+), 103 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6fa6f25..56af077 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,10 +30,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Parent hang on sub-orchestration instance-id collision** — When an auto-generated child instance id already named a terminal instance, the scheduling parent could await a completion that never arrived. The runtime now notifies the parent with a - sub-orchestration failure so it fails fast. The failure (and all sub-orchestration - completion/failure notifications) is routed to the parent's current execution using - durable provider state instead of process-local memory, so routing stays correct across - runtime restarts and multiple dispatcher nodes. + sub-orchestration failure so it fails fast. The parent execution that scheduled the child + is stamped onto the child start at schedule time and persisted in the child's + `OrchestrationStarted` event, so the failure (and all sub-orchestration completion/failure + notifications) is routed to exactly that parent execution. This is correct across runtime + restarts and multiple dispatcher nodes, and avoids a TOCTOU window where the parent's + *current* execution at completion time could differ from the execution that scheduled the + child. When the stamp is absent (children started by an older runtime, or work items from + before this change), routing falls back to a durable provider read, keeping mixed-version + clusters correct during rolling upgrades. - **Sub-orchestration id reuse across continue-as-new** — Child instance ids generated after a parent `continue_as_new` now include the parent execution id (`{parent}::sub::{execution_id}_{event_id}`), preventing collisions with the terminal diff --git a/docs/migration-guide.md b/docs/migration-guide.md index aae80bd..94dff6b 100644 --- a/docs/migration-guide.md +++ b/docs/migration-guide.md @@ -32,6 +32,28 @@ order-2026-06-09 Rename any root instance ids that use the reserved marker before upgrading. +## Durable sub-orchestration routing (`parent_execution_id`) + +Sub-orchestration completion and failure notifications are now routed to the exact parent +execution that scheduled the child. To do this, the scheduling parent's execution id is +stamped onto the child's start and persisted in the child's history: + +- `WorkItem::StartOrchestration` gains an optional `parent_execution_id` field. +- `EventKind::OrchestrationStarted` gains an optional `parent_execution_id` field. + +Both fields are `Option`, serialized with `#[serde(default, skip_serializing_if = "Option::is_none")]`, +so the wire and history formats remain backward compatible: + +- **Old → new:** A new runtime reading an old child history (or an old work item) sees + `parent_execution_id = None` and falls back to a durable provider read of the parent's + current execution — the previous behavior. +- **New → old:** An old runtime ignores the extra field (it is skipped when absent and not + required when deserializing). + +No action is required to upgrade. Mixed-version clusters route correctly during a rolling +upgrade. The provider-read fallback is retained only for histories/work items created before +this change. + ## Orchestration Versioning Duroxide supports versioning to handle code evolution while maintaining compatibility with running instances. diff --git a/src/client/mod.rs b/src/client/mod.rs index 66e135c..0edcb87 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -270,6 +270,7 @@ impl Client { version: None, parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; self.store @@ -301,6 +302,7 @@ impl Client { version: Some(version.into()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; self.store diff --git a/src/lib.rs b/src/lib.rs index 139f027..20ffbb6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1117,6 +1117,13 @@ pub enum EventKind { input: String, parent_instance: Option, parent_id: Option, + /// Execution id of the parent that scheduled this sub-orchestration, persisted at + /// child-start time. Used to route this child's completion/failure back to the + /// exact parent execution that awaited it. `None` for root orchestrations and for + /// children started by older runtimes (routing then falls back to a provider read). + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + parent_execution_id: Option, /// Persistent events carried forward from the previous execution during continue-as-new. /// Present only on CAN-initiated executions for audit trail. Each tuple is (event_name, data). #[serde(skip_serializing_if = "Option::is_none")] diff --git a/src/provider_validation/atomicity.rs b/src/provider_validation/atomicity.rs index 794e0ad..da335ee 100644 --- a/src/provider_validation/atomicity.rs +++ b/src/provider_validation/atomicity.rs @@ -37,6 +37,7 @@ pub async fn test_atomicity_failure_rollback(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -80,6 +81,7 @@ pub async fn test_atomicity_failure_rollback(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -142,6 +144,7 @@ pub async fn test_multi_operation_atomic_ack(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -327,6 +330,7 @@ pub async fn test_lock_released_only_on_successful_ack(facto input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -402,6 +406,7 @@ pub async fn test_concurrent_ack_prevention(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -430,6 +435,7 @@ pub async fn test_concurrent_ack_prevention(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/bulk_deletion.rs b/src/provider_validation/bulk_deletion.rs index e092ee2..d36789d 100644 --- a/src/provider_validation/bulk_deletion.rs +++ b/src/provider_validation/bulk_deletion.rs @@ -351,6 +351,7 @@ async fn create_completed_instance_with_parent( version: Some("1.0.0".to_string()), parent_instance: Some(parent_id.to_string()), parent_id: Some(1), + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, }, Some(parent_id.to_string()), @@ -378,6 +379,7 @@ async fn create_completed_instance_with_parent( input: "{}".to_string(), parent_instance: parent_instance_id.clone(), parent_id: if parent_instance_id.is_some() { Some(1) } else { None }, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/cancellation.rs b/src/provider_validation/cancellation.rs index 86a9866..ad502b3 100644 --- a/src/provider_validation/cancellation.rs +++ b/src/provider_validation/cancellation.rs @@ -54,6 +54,7 @@ pub async fn test_fetch_returns_running_state_for_active_orchestration WorkItem { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, } } @@ -39,6 +40,7 @@ fn orchestration_started_event(instance: &str, duroxide_version: &str) -> Event input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/deletion.rs b/src/provider_validation/deletion.rs index 312edc1..9ee5bc5 100644 --- a/src/provider_validation/deletion.rs +++ b/src/provider_validation/deletion.rs @@ -194,6 +194,7 @@ pub async fn test_delete_cleans_queues_and_locks(factory: &F input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -228,6 +229,7 @@ pub async fn test_delete_cleans_queues_and_locks(factory: &F input: new_input.to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: 1, }, None, @@ -262,6 +264,7 @@ pub async fn test_delete_cleans_queues_and_locks(factory: &F input: new_input.to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -421,6 +424,7 @@ pub async fn test_force_delete_prevents_ack_recreation(facto input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -519,6 +523,7 @@ async fn create_completed_instance_with_parent( version: Some("1.0.0".to_string()), parent_instance: Some(parent.to_string()), parent_id: Some(1), + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, } } else { @@ -546,6 +551,7 @@ async fn create_completed_instance_with_parent( input: "{}".to_string(), parent_instance: parent_id.map(|s| s.to_string()), parent_id: parent_id.map(|_| 1), + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -592,6 +598,7 @@ async fn create_failed_instance(provider: &dyn crate::providers::Provider, insta input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -636,6 +643,7 @@ async fn create_cancelled_instance(provider: &dyn crate::providers::Provider, in input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -1002,6 +1010,7 @@ pub async fn test_stale_activity_after_delete_recreate(facto input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -1084,6 +1093,7 @@ pub async fn test_stale_activity_after_delete_recreate(facto input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -1219,6 +1229,7 @@ async fn create_child_instance(provider: &dyn crate::providers::Provider, instan input: "{}".to_string(), parent_instance: Some(parent_id.to_string()), parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/error_handling.rs b/src/provider_validation/error_handling.rs index c14623f..00b7aa1 100644 --- a/src/provider_validation/error_handling.rs +++ b/src/provider_validation/error_handling.rs @@ -61,6 +61,7 @@ pub async fn test_duplicate_event_id_rejection(factory: &F) input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -183,6 +184,7 @@ pub async fn test_lock_expiration_during_ack(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/instance_creation.rs b/src/provider_validation/instance_creation.rs index 367b447..b9a6e8f 100644 --- a/src/provider_validation/instance_creation.rs +++ b/src/provider_validation/instance_creation.rs @@ -49,6 +49,7 @@ pub async fn test_instance_creation_via_metadata(factory: &F input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -139,6 +140,7 @@ pub async fn test_no_instance_creation_on_enqueue(factory: & input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -172,6 +174,7 @@ pub async fn test_null_version_handling(factory: &F) { version: None, // No version provided parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: 1, }; @@ -207,6 +210,7 @@ pub async fn test_null_version_handling(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -279,6 +283,7 @@ pub async fn test_sub_orchestration_instance_creation(factor input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -291,6 +296,7 @@ pub async fn test_sub_orchestration_instance_creation(factor version: None, parent_instance: Some("parent-instance".to_string()), parent_id: Some(1), + parent_execution_id: None, execution_id: 1, }], ExecutionMetadata { @@ -329,6 +335,7 @@ pub async fn test_sub_orchestration_instance_creation(factor input: "{}".to_string(), parent_instance: Some("parent-instance".to_string()), parent_id: Some(1), + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -355,6 +362,7 @@ pub async fn test_sub_orchestration_instance_creation(factor version: None, parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: 1, }, None, diff --git a/src/provider_validation/kv_store.rs b/src/provider_validation/kv_store.rs index 7fec4cc..21497fc 100644 --- a/src/provider_validation/kv_store.rs +++ b/src/provider_validation/kv_store.rs @@ -1468,6 +1468,7 @@ pub async fn test_kv_delete_instance_with_children(factory: input: "{}".to_string(), parent_instance: Some("kv-parent".to_string()), parent_id: Some(1), + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; provider.enqueue_for_orchestrator(child_start, None).await.unwrap(); @@ -1493,6 +1494,7 @@ pub async fn test_kv_delete_instance_with_children(factory: input: "{}".to_string(), parent_instance: Some("kv-parent".to_string()), parent_id: Some(1), + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -2405,6 +2407,7 @@ async fn continue_as_new(provider: &dyn crate::providers::Provider, instance: &s input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/lock_expiration.rs b/src/provider_validation/lock_expiration.rs index f3c6ec3..0c7d37e 100644 --- a/src/provider_validation/lock_expiration.rs +++ b/src/provider_validation/lock_expiration.rs @@ -164,6 +164,7 @@ pub async fn test_lock_renewal_on_ack(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -404,6 +405,7 @@ pub async fn test_worker_lock_renewal_extends_timeout(factor input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/management.rs b/src/provider_validation/management.rs index 9a42b62..9c7abf4 100644 --- a/src/provider_validation/management.rs +++ b/src/provider_validation/management.rs @@ -63,6 +63,7 @@ pub async fn test_list_instances_by_status(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -121,6 +122,7 @@ pub async fn test_list_executions(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -170,6 +172,7 @@ pub async fn test_get_instance_info(factory: &F) { input: "test".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -226,6 +229,7 @@ pub async fn test_get_execution_info(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -285,6 +289,7 @@ pub async fn test_get_system_metrics(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -497,6 +502,7 @@ pub async fn test_get_instance_stats_carry_forward(factory: input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: Some(vec![ ("raised-1".to_string(), r#"{"data":"a"}"#.to_string()), ("raised-2".to_string(), r#"{"data":"b"}"#.to_string()), diff --git a/src/provider_validation/mod.rs b/src/provider_validation/mod.rs index 72a1182..5cee692 100644 --- a/src/provider_validation/mod.rs +++ b/src/provider_validation/mod.rs @@ -65,6 +65,7 @@ pub(crate) fn start_item(instance: &str) -> WorkItem { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, } } @@ -98,6 +99,7 @@ pub(crate) async fn create_instance(provider: &dyn crate::providers::Provider, i input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -136,6 +138,7 @@ pub(crate) async fn create_instance_with_parent( input: "{}".to_string(), parent_instance: parent_instance_id, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; diff --git a/src/provider_validation/multi_execution.rs b/src/provider_validation/multi_execution.rs index b3bdc4e..db82c71 100644 --- a/src/provider_validation/multi_execution.rs +++ b/src/provider_validation/multi_execution.rs @@ -12,6 +12,7 @@ fn start_item_with_execution(instance: &str, execution_id: u64) -> WorkItem { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id, } } @@ -48,6 +49,7 @@ pub async fn test_execution_isolation(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -108,6 +110,7 @@ pub async fn test_execution_isolation(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -279,6 +282,7 @@ pub async fn test_execution_id_sequencing(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -322,6 +326,7 @@ pub async fn test_execution_id_sequencing(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -383,6 +388,7 @@ pub async fn test_continue_as_new_creates_new_execution(fact input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -433,6 +439,7 @@ pub async fn test_continue_as_new_creates_new_execution(fact input: "new-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/poison_message.rs b/src/provider_validation/poison_message.rs index 1a1d0e7..a85232a 100644 --- a/src/provider_validation/poison_message.rs +++ b/src/provider_validation/poison_message.rs @@ -26,6 +26,7 @@ pub async fn orchestration_attempt_count_starts_at_one(factory: &dyn ProviderFac version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, }, None, @@ -75,6 +76,7 @@ pub async fn orchestration_attempt_count_increments_on_refetch(factory: &dyn Pro version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, }, None, @@ -420,6 +422,7 @@ pub async fn abandon_orchestration_item_ignore_attempt_decrements(factory: &dyn version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, }, None, @@ -579,6 +582,7 @@ pub async fn max_attempt_count_across_message_batch(factory: &dyn ProviderFactor version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, }, None, diff --git a/src/provider_validation/prune.rs b/src/provider_validation/prune.rs index af4e8bd..cf8349f 100644 --- a/src/provider_validation/prune.rs +++ b/src/provider_validation/prune.rs @@ -328,6 +328,7 @@ async fn create_running_multi_execution_instance( input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -393,6 +394,7 @@ async fn create_multi_execution_instance( input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/queue_semantics.rs b/src/provider_validation/queue_semantics.rs index c69acbc..e6aaef8 100644 --- a/src/provider_validation/queue_semantics.rs +++ b/src/provider_validation/queue_semantics.rs @@ -145,6 +145,7 @@ pub async fn test_worker_ack_atomicity(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -248,6 +249,7 @@ pub async fn test_timer_delayed_visibility(factory: &F) { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/provider_validation/tag_filtering.rs b/src/provider_validation/tag_filtering.rs index a72d695..14696b1 100644 --- a/src/provider_validation/tag_filtering.rs +++ b/src/provider_validation/tag_filtering.rs @@ -581,6 +581,7 @@ pub async fn test_tag_preserved_through_ack_orchestration_item(factory: &dyn Pro input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/providers/mod.rs b/src/providers/mod.rs index f1315cd..c7f5bd4 100644 --- a/src/providers/mod.rs +++ b/src/providers/mod.rs @@ -440,6 +440,13 @@ pub enum WorkItem { version: Option, parent_instance: Option, parent_id: Option, + /// Execution id of the parent that scheduled this sub-orchestration, stamped at + /// schedule time so completion/failure notifications route back to the exact + /// parent execution that awaited the child (rather than the parent's current + /// execution at completion time). `None` for top-level starts and for work items + /// produced by older runtimes; routing then falls back to a durable provider read. + #[serde(default, skip_serializing_if = "Option::is_none")] + parent_execution_id: Option, execution_id: u64, }, diff --git a/src/providers/sqlite.rs b/src/providers/sqlite.rs index eda3c3a..fbaa2d8 100644 --- a/src/providers/sqlite.rs +++ b/src/providers/sqlite.rs @@ -3437,6 +3437,7 @@ mod tests { input: input.to_string(), parent_instance: parent_instance.map(|s| s.to_string()), parent_id, + parent_execution_id: None, execution_id: next_execution_id, }, None, @@ -3463,6 +3464,7 @@ mod tests { input: input.to_string(), parent_instance: parent_instance.map(|s| s.to_string()), parent_id, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -3499,6 +3501,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; @@ -3529,6 +3532,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -3574,6 +3578,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; @@ -3598,6 +3603,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -3731,6 +3737,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; @@ -3887,6 +3894,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; store.enqueue_for_orchestrator(item, None).await.unwrap(); @@ -4017,6 +4025,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; @@ -4073,6 +4082,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; @@ -4152,6 +4162,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; @@ -4205,6 +4216,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: 1, }; store.enqueue_for_orchestrator(start_item, None).await.unwrap(); @@ -4228,6 +4240,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -4414,6 +4427,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; store.enqueue_for_orchestrator(item, None).await.unwrap(); @@ -4516,6 +4530,7 @@ mod tests { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }; store.enqueue_for_orchestrator(item, None).await.unwrap(); diff --git a/src/runtime/dispatchers/orchestration.rs b/src/runtime/dispatchers/orchestration.rs index 09b3861..e0ef360 100644 --- a/src/runtime/dispatchers/orchestration.rs +++ b/src/runtime/dispatchers/orchestration.rs @@ -1091,6 +1091,7 @@ impl Runtime { input: workitem_reader.input.clone(), parent_instance: workitem_reader.parent_instance.clone(), parent_id: workitem_reader.parent_id, + parent_execution_id: workitem_reader.parent_execution_id, carry_forward_events, initial_custom_status, }, @@ -1285,47 +1286,50 @@ impl Runtime { let mut history_mgr = HistoryManager::from_history(&item.history); // Track parent info for sub-orchestration failure notification - let mut parent_link: Option<(String, u64)> = None; + let mut parent_link: Option<(String, Option, u64)> = None; // If history is empty, we need to create an OrchestrationStarted event first if history_mgr.is_empty() { // Try to extract orchestration name from work items - let (orchestration_name, input, parent_instance, parent_id, carry_forward_events) = item - .messages - .iter() - .find_map(|msg| match msg { - WorkItem::StartOrchestration { - orchestration, - input, - parent_instance, - parent_id, - .. - } => Some(( - orchestration.clone(), - input.clone(), - parent_instance.clone(), - *parent_id, - None, - )), - WorkItem::ContinueAsNew { - orchestration, - input, - carry_forward_events, - .. - } => Some(( - orchestration.clone(), - input.clone(), - None, - None, - Some(carry_forward_events.clone()), - )), - _ => None, - }) - .unwrap_or_else(|| (item.orchestration_name.clone(), String::new(), None, None, None)); + let (orchestration_name, input, parent_instance, parent_id, parent_execution_id, carry_forward_events) = + item.messages + .iter() + .find_map(|msg| match msg { + WorkItem::StartOrchestration { + orchestration, + input, + parent_instance, + parent_id, + parent_execution_id, + .. + } => Some(( + orchestration.clone(), + input.clone(), + parent_instance.clone(), + *parent_id, + *parent_execution_id, + None, + )), + WorkItem::ContinueAsNew { + orchestration, + input, + carry_forward_events, + .. + } => Some(( + orchestration.clone(), + input.clone(), + None, + None, + None, + Some(carry_forward_events.clone()), + )), + _ => None, + }) + .unwrap_or_else(|| (item.orchestration_name.clone(), String::new(), None, None, None, None)); // Save parent link for notification if let (Some(pi), Some(pid)) = (&parent_instance, parent_id) { - parent_link = Some((pi.clone(), pid)); + parent_link = Some((pi.clone(), parent_execution_id, pid)); } history_mgr.append(Event::with_event_id( @@ -1339,6 +1343,7 @@ impl Runtime { input, parent_instance, parent_id, + parent_execution_id, carry_forward_events, initial_custom_status: None, }, @@ -1349,10 +1354,11 @@ impl Runtime { if let EventKind::OrchestrationStarted { parent_instance: Some(pi), parent_id: Some(pid), + parent_execution_id, .. } = &event.kind { - parent_link = Some((pi.clone(), *pid)); + parent_link = Some((pi.clone(), *parent_execution_id, *pid)); break; } } @@ -1365,12 +1371,12 @@ impl Runtime { output: Some(error.display_message()), orchestration_name: Some(item.orchestration_name.clone()), orchestration_version: Some(item.version.clone()), - parent_instance_id: parent_link.as_ref().map(|(pi, _)| pi.clone()), + parent_instance_id: parent_link.as_ref().map(|(pi, _, _)| pi.clone()), pinned_duroxide_version: None, // Poison path — version already set at creation }; // If this is a sub-orchestration, notify parent of failure - let orchestrator_items = if let Some((parent_instance, parent_id)) = parent_link { + let orchestrator_items = if let Some((parent_instance, parent_execution_id, parent_id)) = parent_link { tracing::debug!( target = "duroxide::runtime::execution", instance = %item.instance, @@ -1378,7 +1384,7 @@ impl Runtime { parent_id = %parent_id, "Enqueue SubOrchFailed to parent (poison)" ); - let parent_execution_id = self.parent_execution_id_for_routing(&parent_instance).await; + let parent_execution_id = self.resolve_parent_execution_id(&parent_instance, parent_execution_id).await; vec![WorkItem::SubOrchFailed { parent_instance, parent_execution_id, @@ -1433,6 +1439,7 @@ impl Runtime { if let WorkItem::StartOrchestration { parent_instance: Some(parent_instance), parent_id: Some(parent_id), + parent_execution_id, .. } = msg { @@ -1446,7 +1453,11 @@ impl Runtime { parent_id = %parent_id, "Sub-orchestration target instance id already exists and is terminal; notifying parent of failure" ); - let parent_execution_id = self.parent_execution_id_for_routing(parent_instance).await; + // Prefer the execution id stamped on the colliding start; fall back to a + // durable provider read for work items produced by older runtimes. + let parent_execution_id = self + .resolve_parent_execution_id(parent_instance, *parent_execution_id) + .await; notifications.push(WorkItem::SubOrchFailed { parent_instance: parent_instance.clone(), parent_execution_id, diff --git a/src/runtime/execution.rs b/src/runtime/execution.rs index 927f607..38f0abf 100644 --- a/src/runtime/execution.rs +++ b/src/runtime/execution.rs @@ -63,6 +63,7 @@ impl Runtime { version: version.clone(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }); } @@ -76,7 +77,7 @@ impl Runtime { // (works for both existing history and newly appended OrchestrationStarted in delta) let (input, parent_link) = history_mgr.extract_context(); - if let Some((ref pinst, pid)) = parent_link { + if let Some((ref pinst, _pexec, pid)) = parent_link { tracing::debug!(target = "duroxide::runtime::execution", instance=%instance, parent_instance=%pinst, parent_id=%pid, "Detected parent link for orchestration"); } else { tracing::debug!(target = "duroxide::runtime::execution", instance=%instance, "No parent link for orchestration"); @@ -191,6 +192,10 @@ impl Runtime { version: version.clone(), parent_instance: Some(instance.to_string()), parent_id: Some(*scheduling_event_id), + // Stamp the scheduling parent execution so the child's + // completion/failure routes back to this exact execution, + // even across continue-as-new, restarts, or other nodes. + parent_execution_id: Some(execution_id), execution_id: crate::INITIAL_EXECUTION_ID, }); } @@ -208,6 +213,7 @@ impl Runtime { version: version.clone(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }); } @@ -237,11 +243,13 @@ impl Runtime { )); // Notify parent if this is a sub-orchestration - if let Some((parent_instance, parent_id)) = parent_link { + if let Some((parent_instance, parent_execution_id, parent_id)) = parent_link { tracing::debug!(target = "duroxide::runtime::execution", instance=%instance, parent_instance=%parent_instance, parent_id=%parent_id, "Enqueue SubOrchCompleted to parent"); + let parent_execution_id = + self.resolve_parent_execution_id(&parent_instance, parent_execution_id).await; orchestrator_items.push(WorkItem::SubOrchCompleted { parent_instance: parent_instance.clone(), - parent_execution_id: self.parent_execution_id_for_routing(&parent_instance).await, + parent_execution_id, parent_id, result: output.clone(), }); @@ -270,11 +278,13 @@ impl Runtime { history_mgr.append_failed(details.clone()); // Notify parent if this is a sub-orchestration - if let Some((parent_instance, parent_id)) = parent_link { + if let Some((parent_instance, parent_execution_id, parent_id)) = parent_link { tracing::debug!(target = "duroxide::runtime::execution", instance=%instance, parent_instance=%parent_instance, parent_id=%parent_id, "Enqueue SubOrchFailed to parent"); + let parent_execution_id = + self.resolve_parent_execution_id(&parent_instance, parent_execution_id).await; orchestrator_items.push(WorkItem::SubOrchFailed { parent_instance: parent_instance.clone(), - parent_execution_id: self.parent_execution_id_for_routing(&parent_instance).await, + parent_execution_id, parent_id, details: details.clone(), }); @@ -358,10 +368,12 @@ impl Runtime { } // Notify parent if this is a sub-orchestration - if let Some((parent_instance, parent_id)) = parent_link { + if let Some((parent_instance, parent_execution_id, parent_id)) = parent_link { + let parent_execution_id = + self.resolve_parent_execution_id(&parent_instance, parent_execution_id).await; orchestrator_items.push(WorkItem::SubOrchFailed { parent_instance: parent_instance.clone(), - parent_execution_id: self.parent_execution_id_for_routing(&parent_instance).await, + parent_execution_id, parent_id, details: details.clone(), }); diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 1310874..9f4a2d6 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -839,16 +839,32 @@ impl Runtime { None } + /// Resolve the parent execution id for routing a sub-orchestration completion or + /// failure back to its parent. + /// + /// Prefers the `stamped` value carried durably from schedule time (the exact parent + /// execution that scheduled this child). When absent — children started by an older + /// runtime, or work items produced before this field existed — falls back to a durable + /// provider read via [`Self::parent_execution_id_for_routing`]. This keeps mixed-version + /// clusters correct during rolling upgrades. + async fn resolve_parent_execution_id(&self, parent_instance: &str, stamped: Option) -> u64 { + match stamped { + Some(execution_id) => execution_id, + None => self.parent_execution_id_for_routing(parent_instance).await, + } + } + /// Resolve a parent instance's current execution id for routing a sub-orchestration /// completion or failure back to it. /// - /// Reads the durable execution id from the provider rather than process-local memory, - /// so routing is correct when a restarted runtime or a different dispatcher node emits - /// the notification. A misrouted notification (e.g. defaulting to execution 1 while the - /// parent is on execution 2+) is filtered out during replay and would leave the parent - /// awaiting forever. `Provider::read` returns the parent's current-execution history, so - /// any event's `execution_id` is the current one. On a read error (or no history yet) we - /// fall back to `INITIAL_EXECUTION_ID`. + /// This is the legacy fallback used only when the durable `parent_execution_id` stamp + /// is missing (old child histories / old work items). It reads the execution id from the + /// provider rather than process-local memory, so routing is correct when a restarted + /// runtime or a different dispatcher node emits the notification. A misrouted notification + /// (e.g. defaulting to execution 1 while the parent is on execution 2+) is filtered out + /// during replay and would leave the parent awaiting forever. `Provider::read` returns the + /// parent's current-execution history, so any event's `execution_id` is the current one. + /// On a read error (or no history yet) we fall back to `INITIAL_EXECUTION_ID`. async fn parent_execution_id_for_routing(&self, parent_instance: &str) -> u64 { match self.history_store.read(parent_instance).await { Ok(events) => events diff --git a/src/runtime/replay_engine.rs b/src/runtime/replay_engine.rs index 2e35546..aad01d7 100644 --- a/src/runtime/replay_engine.rs +++ b/src/runtime/replay_engine.rs @@ -1921,6 +1921,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/runtime/replay_engine_tests.rs b/src/runtime/replay_engine_tests.rs index dec034d..139c099 100644 --- a/src/runtime/replay_engine_tests.rs +++ b/src/runtime/replay_engine_tests.rs @@ -31,6 +31,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -117,6 +118,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -213,6 +215,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -257,6 +260,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -308,6 +312,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -376,6 +381,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/src/runtime/state_helpers.rs b/src/runtime/state_helpers.rs index 0d2e210..2675f88 100644 --- a/src/runtime/state_helpers.rs +++ b/src/runtime/state_helpers.rs @@ -25,6 +25,10 @@ pub struct HistoryManager { /// Parent event ID if this is a sub-orchestration pub parent_id: Option, + /// Parent execution id if this is a sub-orchestration (from OrchestrationStarted). + /// Used to route this child's completion/failure back to the exact parent execution. + pub parent_execution_id: Option, + /// Whether the orchestration has completed successfully pub is_completed: bool, @@ -56,6 +60,7 @@ impl HistoryManager { orchestration_input: None, parent_instance: None, parent_id: None, + parent_execution_id: None, is_completed: false, is_failed: false, is_continued_as_new: false, @@ -76,6 +81,7 @@ impl HistoryManager { input, parent_instance, parent_id, + parent_execution_id, .. } = &event.kind { @@ -85,6 +91,7 @@ impl HistoryManager { metadata.orchestration_input = Some(input.clone()); metadata.parent_instance = parent_instance.clone(); metadata.parent_id = *parent_id; + metadata.parent_execution_id = *parent_execution_id; metadata.current_execution_id = Some(execution_id_counter); last_started_index = Some(idx); // Don't break - we want the LAST (most recent) OrchestrationStarted @@ -250,12 +257,16 @@ impl HistoryManager { } /// Extract input and parent linkage from history for orchestration context - /// This looks at the full history including any newly appended events in the delta - pub fn extract_context(&self) -> (String, Option<(String, u64)>) { + /// This looks at the full history including any newly appended events in the delta. + /// + /// The returned parent link is `(parent_instance, parent_execution_id, parent_id)`, + /// where `parent_execution_id` is the durable execution id stamped at child-start time + /// (`None` for children started by older runtimes; callers fall back to a provider read). + pub fn extract_context(&self) -> (String, Option<(String, Option, u64)>) { // First check if we have metadata from the initial scan if let Some(ref input) = self.orchestration_input { let parent_link = if let (Some(parent_inst), Some(parent_id)) = (&self.parent_instance, self.parent_id) { - Some((parent_inst.clone(), parent_id)) + Some((parent_inst.clone(), self.parent_execution_id, parent_id)) } else { None }; @@ -268,11 +279,12 @@ impl HistoryManager { input, parent_instance, parent_id, + parent_execution_id, .. } = &e.kind { let parent_link = if let (Some(pinst), Some(pid)) = (parent_instance.clone(), *parent_id) { - Some((pinst, pid)) + Some((pinst, *parent_execution_id, pid)) } else { None }; @@ -384,6 +396,10 @@ pub struct WorkItemReader { /// Parent event ID (from start item or None) pub parent_id: Option, + /// Parent execution id (from start item or None) — used to route this child's + /// completion/failure back to the exact parent execution that scheduled it. + pub parent_execution_id: Option, + /// Whether this is a ContinueAsNew pub is_continue_as_new: bool, } @@ -428,7 +444,7 @@ impl WorkItemReader { } // Extract parameters from start item or use defaults - let (orchestration_name, input, version, parent_instance, parent_id, is_continue_as_new) = + let (orchestration_name, input, version, parent_instance, parent_id, parent_execution_id, is_continue_as_new) = if let Some(ref item) = start_item { match item { WorkItem::StartOrchestration { @@ -437,6 +453,7 @@ impl WorkItemReader { version, parent_instance, parent_id, + parent_execution_id, .. } => ( orchestration.clone(), @@ -444,6 +461,7 @@ impl WorkItemReader { version.clone(), parent_instance.clone(), *parent_id, + *parent_execution_id, false, ), WorkItem::ContinueAsNew { @@ -467,7 +485,7 @@ impl WorkItemReader { carried.append(&mut completion_messages); completion_messages = carried; - (orchestration.clone(), input.clone(), version.clone(), None, None, true) + (orchestration.clone(), input.clone(), version.clone(), None, None, None, true) } _ => unreachable!(), } @@ -485,7 +503,8 @@ impl WorkItemReader { let version = history_mgr.version(); let parent_instance = history_mgr.parent_instance.clone(); let parent_id = history_mgr.parent_id; - (orchestration_name, input, version, parent_instance, parent_id, false) + let parent_execution_id = history_mgr.parent_execution_id; + (orchestration_name, input, version, parent_instance, parent_id, parent_execution_id, false) }; Self { @@ -496,6 +515,7 @@ impl WorkItemReader { version, parent_instance, parent_id, + parent_execution_id, is_continue_as_new, } } @@ -536,6 +556,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -564,6 +585,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -599,6 +621,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -638,6 +661,7 @@ mod tests { input: "input1".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -662,6 +686,7 @@ mod tests { input: "input2".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -688,6 +713,7 @@ mod tests { input: "test".to_string(), parent_instance: Some("parent-instance".to_string()), parent_id: Some(42), + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -708,6 +734,7 @@ mod tests { version: Some("1.0.0".to_string()), parent_instance: Some("parent".to_string()), parent_id: Some(42), + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }, WorkItem::ActivityCompleted { @@ -781,6 +808,7 @@ mod tests { input: "test-input".to_string(), parent_instance: Some("parent-inst".to_string()), parent_id: Some(42), + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -822,6 +850,7 @@ mod tests { version: None, parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }, WorkItem::StartOrchestration { @@ -831,6 +860,7 @@ mod tests { version: None, parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: crate::INITIAL_EXECUTION_ID, }, ]; @@ -861,6 +891,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -901,6 +932,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -921,6 +953,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -941,6 +974,7 @@ mod tests { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/tests/cancellation_tests.rs b/tests/cancellation_tests.rs index 6753924..1012c74 100644 --- a/tests/cancellation_tests.rs +++ b/tests/cancellation_tests.rs @@ -1236,6 +1236,7 @@ async fn cancel_before_orchestration_starts() { input: "input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: 1, }, None, diff --git a/tests/capability_filtering_tests.rs b/tests/capability_filtering_tests.rs index 4b5a4ae..2d64525 100644 --- a/tests/capability_filtering_tests.rs +++ b/tests/capability_filtering_tests.rs @@ -314,6 +314,7 @@ async fn provider_seed_without_pinned_version(provider: &dyn Provider, instance: version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, }, None, diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 112c381..393ff15 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -119,6 +119,7 @@ pub async fn test_create_execution( input: input.to_string(), parent_instance: parent_instance.map(|s| s.to_string()), parent_id, + parent_execution_id: None, execution_id: next_execution_id, }, None, @@ -152,6 +153,7 @@ pub async fn test_create_execution( input: input.to_string(), parent_instance: parent_instance.map(|s| s.to_string()), parent_id, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -255,6 +257,7 @@ pub async fn seed_instance_with_pinned_version( input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -270,6 +273,7 @@ pub async fn seed_instance_with_pinned_version( version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, }, INITIAL_EXECUTION_ID, diff --git a/tests/observability_tests.rs b/tests/observability_tests.rs index 6b55279..ef4f23e 100644 --- a/tests/observability_tests.rs +++ b/tests/observability_tests.rs @@ -366,6 +366,7 @@ async fn test_fetch_orchestration_item_fault_injection() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -1198,6 +1199,7 @@ async fn test_queue_depth_gauges_initialization() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -1214,6 +1216,7 @@ async fn test_queue_depth_gauges_initialization() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, diff --git a/tests/poison_message_tests.rs b/tests/poison_message_tests.rs index bbde348..03645db 100644 --- a/tests/poison_message_tests.rs +++ b/tests/poison_message_tests.rs @@ -40,6 +40,7 @@ async fn orchestration_attempt_count_increments_on_abandon() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: INITIAL_EXECUTION_ID, }, None, diff --git a/tests/provider_atomic_tests.rs b/tests/provider_atomic_tests.rs index 5073d4c..fb25dee 100644 --- a/tests/provider_atomic_tests.rs +++ b/tests/provider_atomic_tests.rs @@ -116,6 +116,7 @@ async fn test_fetch_orchestration_item_new_instance() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -249,6 +250,7 @@ async fn test_ack_orchestration_item_atomic() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -276,6 +278,7 @@ async fn test_ack_orchestration_item_atomic() { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -385,6 +388,7 @@ async fn test_abandon_orchestration_item() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -436,6 +440,7 @@ async fn test_abandon_orchestration_item_with_delay() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -503,6 +508,7 @@ async fn test_in_memory_provider_atomic_operations() { version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -531,6 +537,7 @@ async fn test_in_memory_provider_atomic_operations() { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/tests/replay_engine/helpers.rs b/tests/replay_engine/helpers.rs index 7d64148..dad3ca8 100644 --- a/tests/replay_engine/helpers.rs +++ b/tests/replay_engine/helpers.rs @@ -38,6 +38,7 @@ pub fn started_event(event_id: u64) -> Event { input: "test-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/tests/scenarios/replay_versioning.rs b/tests/scenarios/replay_versioning.rs index ca9c67e..74bf339 100644 --- a/tests/scenarios/replay_versioning.rs +++ b/tests/scenarios/replay_versioning.rs @@ -344,6 +344,7 @@ async fn e2e_upgrade_with_preexisting_v1_history() { input: "seed".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id, }, execution_id, @@ -356,6 +357,7 @@ async fn e2e_upgrade_with_preexisting_v1_history() { input: "seed".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/tests/scenarios/suborch_id_collision.rs b/tests/scenarios/suborch_id_collision.rs index c2e0949..89afe97 100644 --- a/tests/scenarios/suborch_id_collision.rs +++ b/tests/scenarios/suborch_id_collision.rs @@ -1,11 +1,29 @@ -//! Sub-orchestration instance-id collision scenario. +//! Sub-orchestration instance-id collision scenarios. //! -//! Child sub-orchestration instance ids reserve the `sub::` marker: the first parent -//! execution uses `{parent}::sub::{event_id}` and executions after continue-as-new use -//! `{parent}::sub::{execution_id}_{event_id}`. If an instance with that exact id already -//! exists in a terminal state when the parent schedules its child, the parent must not -//! hang forever waiting for a completion that never arrives — it must observe a -//! sub-orchestration failure and reach a terminal state. +//! Child sub-orchestration instance ids reserve the `sub::` marker. The suffix is produced +//! by [`duroxide::auto_sub_orch_suffix`]: the first parent execution uses +//! `{parent}::sub::{event_id}`, and executions after `continue_as_new` use +//! `{parent}::sub::{execution_id}_{event_id}`. +//! +//! ## The real collision class: same parent across continue-as-new +//! +//! Because the full child id embeds the parent instance id, children of *different* parents +//! never collide as long as root instance ids are unique across the provider. The genuine +//! collision the execution-scoped suffix defends against is a single parent that schedules a +//! sub-orchestration at the same event position on each `continue_as_new` generation: event +//! ids reset on continue-as-new, so without the execution-scoped suffix execution 2 would +//! regenerate execution 1's (now terminal) child id and the parent would hang forever. The +//! `continue_as_new_*` tests below are the primary regressions for this. +//! +//! ## Legacy / provider-bypass defense +//! +//! If an id with the reserved marker somehow already names a terminal instance when a parent +//! schedules its child — e.g. an older, non-validating client enqueued it directly through +//! the provider during a rolling upgrade — the parent must still not hang: the dispatcher +//! observes the terminal collision and fails the parent fast. The tests that *inject* a +//! foreign terminal "squatter" via the provider exercise that defense and the durable +//! routing of the failure notification; they are not the normal auto-generated collision +//! case and are labeled accordingly. #![allow(clippy::unwrap_used)] #![allow(clippy::expect_used)] @@ -23,15 +41,17 @@ use std::time::Duration; #[path = "../common/mod.rs"] mod common; -/// A pre-existing terminal instance occupying the parent's auto-generated child id -/// must not cause the parent to hang. The parent should reach a terminal state. +/// Legacy / provider-bypass defense (not the normal auto-generated collision case). +/// +/// A pre-existing terminal instance occupying the parent's auto-generated child id must not +/// cause the parent to hang. The parent should reach a terminal state. /// -/// The colliding instance is enqueued directly through the provider to model a -/// client that does not validate instance ids (e.g. an older node during a rolling -/// upgrade), so this exercises the dispatcher's defense independently of the -/// client-side validation. +/// Under unique root ids this collision cannot arise from auto-generated ids alone, so the +/// colliding instance is enqueued directly through the provider to model a client that does +/// not validate instance ids (e.g. an older node during a rolling upgrade). This exercises +/// the dispatcher's terminal-collision defense independently of client-side validation. #[tokio::test] -async fn parent_does_not_hang_when_child_id_already_terminal() { +async fn legacy_provider_bypass_terminal_collision_does_not_hang_parent() { let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); // Parent's first action is a sub-orchestration call. Event 1 = OrchestrationStarted, @@ -68,6 +88,7 @@ async fn parent_does_not_hang_when_child_id_already_terminal() { version: None, parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: 1, }, None, @@ -152,6 +173,7 @@ async fn redelivered_child_start_does_not_fail_parent() { version: None, parent_instance: Some("job-2".to_string()), parent_id: Some(2), + parent_execution_id: None, execution_id: 1, }, None, @@ -200,11 +222,14 @@ async fn redelivered_child_start_does_not_fail_parent() { rt.shutdown(None).await; } -/// A parent that schedules a sub-orchestration on every continue-as-new iteration -/// must not collide with itself. Event ids reset on continue-as-new, so without -/// execution-scoped child ids the second iteration would regenerate the same child -/// id as the first (now terminal) and hang. The auto-generated id includes the -/// parent execution after the first execution, keeping each iteration's child unique. +/// PRIMARY regression for the real collision class: a parent that schedules a +/// sub-orchestration on every continue-as-new iteration must not collide with itself. +/// +/// Event ids reset on continue-as-new, so without execution-scoped child ids the second +/// iteration would regenerate the same child id as the first (now terminal) and hang. The +/// auto-generated suffix includes the parent execution after the first execution, keeping +/// each iteration's child unique. This asserts both that the parent completes and that the +/// per-execution child suffixes are exactly `sub::2`, `sub::2_2`, `sub::3_2`, `sub::4_2`. #[tokio::test] async fn parent_with_suborch_survives_continue_as_new() { let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); @@ -241,20 +266,108 @@ async fn parent_with_suborch_survives_continue_as_new() { "parent should complete after looping with sub-orchestrations; got {status:?}" ); + // Each execution must schedule a distinctly-suffixed child: the first keeps the legacy + // `sub::{event_id}` form; later executions include the execution id, so no two iterations + // ever regenerate the same (now terminal) child id. + let mut suffixes = Vec::new(); + for execution_id in 1..=4 { + suffixes.push(scheduled_child_suffix(&store, "can-job", execution_id).await); + } + assert_eq!( + suffixes, + vec![ + "sub::2".to_string(), + "sub::2_2".to_string(), + "sub::3_2".to_string(), + "sub::4_2".to_string(), + ], + "each continue-as-new execution must generate a unique, execution-scoped child id" + ); + + rt.shutdown(None).await; +} + +/// Focused regression (affandar): after the first continue-as-new generation the child id +/// must include the execution id, so a parent that schedules a sub-orchestration at the same +/// event position on each generation never reuses a now-terminal child id. +/// +/// With the old id generation this hangs: execution 2 tries to start `P::sub::2`, finds +/// execution 1's child already terminal, and the parent never receives a completion. The +/// assertion pins the exact generated suffixes (`sub::2`, `sub::2_2`, `sub::3_2`). +#[tokio::test] +async fn continue_as_new_suborch_child_ids_include_execution_after_first() { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + let parent_id = "can-child-id-job"; + + let parent = |ctx: OrchestrationContext, input: String| async move { + let n: u32 = input.parse().unwrap_or(0); + let result = ctx.schedule_sub_orchestration("Child", n.to_string()).await?; + if n < 2 { + return ctx.continue_as_new((n + 1).to_string()).await; + } + Ok(format!("done:{n}:{result}")) + }; + let child = |_ctx: OrchestrationContext, input: String| async move { Ok(format!("child-done:{input}")) }; + + let orchestrations = OrchestrationRegistry::builder() + .register("Parent", parent) + .register("Child", child) + .build(); + let activities = ActivityRegistry::builder().build(); + let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await; + let client = Client::new(store.clone()); + + client.start_orchestration(parent_id, "Parent", "0").await.unwrap(); + + let status = client + .wait_for_orchestration(parent_id, Duration::from_secs(5)) + .await + .expect("parent must not hang from reusing the same child id after continue-as-new"); + + assert!( + matches!(&status, OrchestrationStatus::Completed { output, .. } if output == "done:2:child-done:2"), + "parent should complete after three executions; got {status:?}" + ); + + let mut scheduled_child_suffixes = Vec::new(); + for execution_id in 1..=3 { + scheduled_child_suffixes.push(scheduled_child_suffix(&store, parent_id, execution_id).await); + } + assert_eq!( + scheduled_child_suffixes, + vec!["sub::2".to_string(), "sub::2_2".to_string(), "sub::3_2".to_string()], + ); + rt.shutdown(None).await; } -/// Regression for execution-scoped routing of the terminal-collision failure within a -/// single end-to-end run. +/// Read the auto-generated child suffix recorded by the `SubOrchestrationScheduled` event in +/// the given parent execution. The event stores the suffix (e.g. `sub::2`), not the full +/// `{parent}::sub::...` id. +async fn scheduled_child_suffix(store: &Arc, parent_instance: &str, execution_id: u64) -> String { + let history = store.read_with_execution(parent_instance, execution_id).await.unwrap(); + history + .iter() + .find_map(|event| match &event.kind { + EventKind::SubOrchestrationScheduled { instance, .. } => Some(instance.clone()), + _ => None, + }) + .unwrap_or_else(|| panic!("execution {execution_id} should schedule a sub-orchestration")) +} + +/// Legacy / provider-bypass defense: execution-scoped routing of a terminal-collision +/// failure within a single end-to-end run. /// /// A parent continues as new and, on execution 2, schedules a sub-orchestration whose /// auto-generated child id (`{parent}::sub::{execution_id}_{event_id}`) already names a -/// terminal instance. The collision failure must be recorded in execution 2, not -/// misrouted to execution 1. This drives the full flow through one runtime; the -/// `terminal_collision_routes_to_parent_current_execution_on_fresh_runtime` test below -/// is the stronger cross-runtime guard. +/// terminal instance injected via the provider. The collision failure must be recorded in +/// execution 2, not misrouted to execution 1. Here the failure is produced while the parent's +/// own turn is running, so the `parent_execution_id` is stamped onto the colliding start and +/// used directly. This drives the full flow through one runtime; the +/// `legacy_provider_bypass_terminal_collision_routes_via_fallback_on_fresh_runtime` test below +/// is the stronger cross-runtime guard for the provider-read fallback. #[tokio::test] -async fn parent_on_execution_two_fails_fast_on_child_id_collision() { +async fn legacy_provider_bypass_terminal_collision_fails_fast_on_execution_two() { let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); // On execution 1 the parent immediately continues as new; on execution 2 its first @@ -291,6 +404,7 @@ async fn parent_on_execution_two_fails_fast_on_child_id_collision() { version: None, parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: 1, }, None, @@ -349,18 +463,20 @@ async fn parent_on_execution_two_fails_fast_on_child_id_collision() { rt_b.shutdown(None).await; } -/// Stronger cross-runtime regression for terminal-collision routing. +/// Legacy / provider-bypass defense: stronger cross-runtime regression for the +/// terminal-collision routing *fallback*. /// -/// Here the runtime that processes the colliding child start has *never* run the parent, -/// so it holds no in-memory association between the parent and its current execution. The -/// parent's execution-2 state (parked awaiting a sub-orchestration whose id collides with a -/// foreign terminal instance) is seeded directly into the provider. A fresh runtime must -/// read the parent's current execution (2) from durable provider state when routing the -/// `SubOrchFailed`, so the failure lands in execution 2 and the parent fails fast. If the -/// failure were routed to execution 1 (as a process-local cache miss would default to), the -/// parent's replay would filter it out and the parent would hang. +/// Here the runtime that processes the colliding child start has *never* run the parent, and +/// the colliding start carries no stamped `parent_execution_id` (it is seeded directly into +/// the provider, modeling an old work item). The parent's execution-2 state (parked awaiting a +/// sub-orchestration whose id collides with a foreign terminal instance) is seeded directly +/// into the provider. With no stamp to use, the dispatcher must fall back to reading the +/// parent's current execution (2) from durable provider state when routing the `SubOrchFailed`, +/// so the failure lands in execution 2 and the parent fails fast. If the failure were routed to +/// execution 1 (as a process-local cache miss would default to), the parent's replay would +/// filter it out and the parent would hang. #[tokio::test] -async fn terminal_collision_routes_to_parent_current_execution_on_fresh_runtime() { +async fn legacy_provider_bypass_terminal_collision_routes_via_fallback_on_fresh_runtime() { let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); let parent_id = "seeded-parent"; @@ -376,6 +492,7 @@ async fn terminal_collision_routes_to_parent_current_execution_on_fresh_runtime( version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: 1, }, 1, @@ -391,6 +508,7 @@ async fn terminal_collision_routes_to_parent_current_execution_on_fresh_runtime( input: String::new(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -426,6 +544,7 @@ async fn terminal_collision_routes_to_parent_current_execution_on_fresh_runtime( version: Some("1.0.0".to_string()), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: 2, }, 2, @@ -441,6 +560,7 @@ async fn terminal_collision_routes_to_parent_current_execution_on_fresh_runtime( input: "1".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -473,9 +593,10 @@ async fn terminal_collision_routes_to_parent_current_execution_on_fresh_runtime( "seeded parent must be on execution 2" ); - // 3. Enqueue the colliding child start, as a runtime would have when the parent - // scheduled the sub-orchestration. Its target id is already terminal (the foreign - // squatter), and its parent differs, so this is a genuine collision. + // 3. Enqueue the colliding child start with NO stamped parent_execution_id, modeling a + // work item produced before this field existed (rolling upgrade). Its target id is + // already terminal (the foreign squatter), and its parent differs, so this is a + // genuine collision. With no stamp, routing must fall back to a durable provider read. store .enqueue_for_orchestrator( WorkItem::StartOrchestration { @@ -485,6 +606,7 @@ async fn terminal_collision_routes_to_parent_current_execution_on_fresh_runtime( version: Some("1.0.0".to_string()), parent_instance: Some(parent_id.to_string()), parent_id: Some(2), + parent_execution_id: None, execution_id: 1, }, None, diff --git a/tests/scenarios/version_replay_bug.rs b/tests/scenarios/version_replay_bug.rs index 7236dad..485cdfc 100644 --- a/tests/scenarios/version_replay_bug.rs +++ b/tests/scenarios/version_replay_bug.rs @@ -326,6 +326,7 @@ fn unit_workitem_reader_completion_only_must_preserve_version() { input: "original-input".to_string(), parent_instance: Some("parent-inst".to_string()), parent_id: Some(42), + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -393,6 +394,7 @@ fn unit_workitem_reader_nth_execution_must_preserve_version() { input: "can-input".to_string(), parent_instance: Some("parent-for-v2".to_string()), parent_id: Some(99), + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -479,6 +481,7 @@ fn unit_workitem_reader_completion_only_tuple_field_analysis() { input: "original-input".to_string(), parent_instance: Some("parent-inst".to_string()), parent_id: Some(42), + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -550,6 +553,7 @@ fn unit_input_comes_from_extract_context_not_workitem_reader() { input: "ORIGINAL-INPUT-FROM-FIRST-TURN".to_string(), // The actual input parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -615,6 +619,7 @@ fn unit_nth_execution_history_starts_with_orchestration_started() { input: "can-input".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -825,6 +830,7 @@ fn unit_completion_only_replay_uses_nth_execution_input() { input: "CAN-INPUT-FOR-EXECUTION-2".to_string(), // Different from execution 1! parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, diff --git a/tests/sqlite_tests.rs b/tests/sqlite_tests.rs index b9ef01a..838141e 100644 --- a/tests/sqlite_tests.rs +++ b/tests/sqlite_tests.rs @@ -53,6 +53,7 @@ async fn test_sqlite_provider_basic() { input: r#"{"test": true}"#.to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }; @@ -88,6 +89,7 @@ async fn test_sqlite_provider_basic() { input: r#"{"test": true}"#.to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -140,6 +142,7 @@ async fn test_execution_status_completed() { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -209,6 +212,7 @@ async fn test_execution_status_failed() { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -420,6 +424,7 @@ async fn test_sqlite_file_concurrent_access() { input: format!("{{\"id\": {i}}}"), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }; @@ -457,6 +462,7 @@ async fn test_sqlite_file_concurrent_access() { input: format!("{{\"id\": {acked_count}}}"), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -586,6 +592,7 @@ async fn test_sqlite_provider_transactional() { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }; @@ -613,6 +620,7 @@ async fn test_sqlite_provider_transactional() { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -748,6 +756,7 @@ async fn test_sqlite_provider_timer_queue() { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }, None, @@ -776,6 +785,7 @@ async fn test_sqlite_provider_timer_queue() { input: "{}".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -806,6 +816,7 @@ async fn test_execution_status_running() { input: "test".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }; @@ -831,6 +842,7 @@ async fn test_execution_status_running() { input: "test".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -883,6 +895,7 @@ async fn test_execution_output_captured_on_continue_as_new() { input: "test".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: duroxide::INITIAL_EXECUTION_ID, }; @@ -908,6 +921,7 @@ async fn test_execution_output_captured_on_continue_as_new() { input: "test".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, @@ -972,6 +986,7 @@ async fn test_instrumented_provider_semantic_equivalence() { version: None, parent_instance: None, parent_id: None, + parent_execution_id: None, execution_id: 1, }; @@ -1004,6 +1019,7 @@ async fn test_instrumented_provider_semantic_equivalence() { input: "test".to_string(), parent_instance: None, parent_id: None, + parent_execution_id: None, carry_forward_events: None, initial_custom_status: None, }, From 091fdd742ac0ddea51342300fc942a444272e99a Mon Sep 17 00:00:00 2001 From: Todd Green Date: Fri, 19 Jun 2026 00:45:15 +0000 Subject: [PATCH 5/5] Add regression: sub-orch completion routes to scheduling execution Adds suborch_completion_routes_to_scheduling_execution_not_current, proving the durable parent_execution_id stamp addresses a child's completion to the execution that scheduled it rather than the parent's current execution at completion time. A child outlives execution 1 via continue-as-new (which does not cancel outstanding children) and completes while the parent is on execution 2. With the stamp, the SubOrchCompleted is addressed to execution 1 (terminal) and the replay execution filter discards it, leaving execution 2 untouched. Without it, current-execution routing addresses the notification to execution 2, where event id 2 is not a sub-orchestration schedule, so it is applied as a nondeterministic completion and poisons the parent. Verified the test fails ("no matching schedule for sub-orchestration id=2") when routing falls back to the parent's current execution. --- tests/scenarios/suborch_id_collision.rs | 113 ++++++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/tests/scenarios/suborch_id_collision.rs b/tests/scenarios/suborch_id_collision.rs index 89afe97..7d5978c 100644 --- a/tests/scenarios/suborch_id_collision.rs +++ b/tests/scenarios/suborch_id_collision.rs @@ -355,6 +355,119 @@ async fn scheduled_child_suffix(store: &Arc, parent_instance: &str .unwrap_or_else(|| panic!("execution {execution_id} should schedule a sub-orchestration")) } +/// Routing regression (affandar): a sub-orchestration's completion/failure must be addressed +/// to the parent execution that *scheduled* the child, not to whatever execution is current +/// when the child finishes. +/// +/// This is the scenario the durable `parent_execution_id` stamp defends against, distinct from +/// the awaited case (where the parent is blocked and the two coincide). Here the child outlives +/// the execution that scheduled it: +/// +/// 1. Execution 1 schedules a child, lets it start, then continues-as-new **without awaiting** +/// it. The child keeps running (continue-as-new does not cancel outstanding children) and +/// finishes only when the test releases it. +/// 2. Execution 2 schedules **no** sub-orchestration and parks on an external event, so it is +/// still alive when the child's late completion notification arrives. +/// 3. The child's `SubOrchCompleted` is emitted while the parent's current execution is 2. +/// +/// With the stamp, the notification is addressed to execution 1 (terminal) and the replay +/// execution filter discards it, so execution 2 is untouched. Without the stamp — routing via +/// the parent's *current* execution at completion time — the notification is addressed to +/// execution 2, where event id 2 is not a sub-orchestration schedule, so it is applied as a +/// nondeterministic completion and poisons the parent. The assertion that execution 2 is still +/// running (then completes cleanly once released) fails under that buggy routing. +#[tokio::test] +async fn suborch_completion_routes_to_scheduling_execution_not_current() { + let store: Arc = Arc::new(SqliteProvider::new_in_memory().await.unwrap()); + let parent_id = "stale-suborch-route-job"; + + let parent = |ctx: OrchestrationContext, input: String| async move { + let n: u32 = input.parse().unwrap_or(0); + if n == 0 { + // Execution 1: schedule the child (event id 2), give it a turn to start, then + // continue-as-new without ever awaiting it. The child outlives this parent + // execution and finishes only once the test releases it, by which time the + // parent's current execution is already 2. + let _child = ctx.schedule_sub_orchestration("Child", "x"); + ctx.schedule_timer(Duration::from_millis(50)).await; + return ctx.continue_as_new("1").await; + } + // Execution 2: schedule no sub-orchestration. Park on an external event so this + // execution stays alive while the child's stale notification is processed. + ctx.schedule_wait("Release").await; + Ok("done".to_string()) + }; + // Child parks on an external event so the test controls exactly when it completes — + // after execution 2 has been established. + let child = |ctx: OrchestrationContext, _input: String| async move { + ctx.schedule_wait("ChildGo").await; + Ok("child-late".to_string()) + }; + + let orchestrations = OrchestrationRegistry::builder() + .register("Parent", parent) + .register("Child", child) + .build(); + let activities = ActivityRegistry::builder().build(); + let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await; + let client = Client::new(store.clone()); + + client.start_orchestration(parent_id, "Parent", "0").await.unwrap(); + + // Wait until (a) the parent has advanced to execution 2 and (b) the child (scheduled in + // execution 1 as `{parent}::sub::2`) is running and subscribed to its release event. + let child_instance = format!("{parent_id}::sub::2"); + let deadline = std::time::Instant::now() + Duration::from_secs(5); + loop { + let parent_on_exec_2 = store.read_with_execution(parent_id, 2).await.is_ok_and(|h| { + h.iter() + .any(|e| matches!(&e.kind, EventKind::OrchestrationStarted { .. })) + }); + let child_running = matches!( + client.get_orchestration_status(&child_instance).await.unwrap(), + OrchestrationStatus::Running { .. } + ); + if parent_on_exec_2 && child_running { + break; + } + assert!( + std::time::Instant::now() < deadline, + "parent never reached execution 2 with a running child" + ); + tokio::time::sleep(Duration::from_millis(25)).await; + } + + // Release the child *after* execution 2 is established, so its completion notification is + // emitted while the parent's current execution is 2 (the divergence the stamp guards). + client.raise_event(&child_instance, "ChildGo", "go").await.unwrap(); + + // Let the child's stale notification drain through the orchestrator queue and be processed + // by the parent *before* we release execution 2. + wait_for_orchestrator_queue_drained(&store, Duration::from_secs(5)).await; + + // Correct routing addresses the stale notification to execution 1 (terminal), so the + // replay filter discards it and execution 2 is still running, waiting for Release. + // Buggy current-execution routing applies it to execution 2 and poisons the parent. + let status = client.get_orchestration_status(parent_id).await.unwrap(); + assert!( + matches!(status, OrchestrationStatus::Running { .. }), + "execution 2 must still be running (stale child notification must not poison it); got {status:?}" + ); + + // Release execution 2 and confirm it completes cleanly. + client.raise_event(parent_id, "Release", "go").await.unwrap(); + let final_status = client + .wait_for_orchestration(parent_id, Duration::from_secs(5)) + .await + .expect("parent should complete after release"); + assert!( + matches!(&final_status, OrchestrationStatus::Completed { output, .. } if output == "done"), + "parent should complete with \"done\"; got {final_status:?}" + ); + + rt.shutdown(None).await; +} + /// Legacy / provider-bypass defense: execution-scoped routing of a terminal-collision /// failure within a single end-to-end run. ///