diff --git a/.autover/changes/110c67f8-15d8-44b5-beae-6f1223933c27.json b/.autover/changes/110c67f8-15d8-44b5-beae-6f1223933c27.json new file mode 100644 index 000000000..292e95fa0 --- /dev/null +++ b/.autover/changes/110c67f8-15d8-44b5-beae-6f1223933c27.json @@ -0,0 +1,11 @@ +{ + "Projects": [ + { + "Name": "Amazon.Lambda.DurableExecution", + "Type": "Patch", + "ChangelogMessages": [ + "Thread CancellationToken into every user Func accepted by IDurableContext (StepAsync, RunInChildContextAsync, WaitForCallbackAsync, WaitForConditionAsync). The token links the caller-supplied cancellation token with an SDK-owned workflow-shutdown signal so user step bodies unwind cleanly when the workflow is being torn down. Cancellation via the linked token is not checkpointed; user-thrown OperationCanceledException unrelated to the linked token continues to be treated as a normal step failure." + ] + } + ] +} \ No newline at end of file diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/CallbackException.cs b/Libraries/src/Amazon.Lambda.DurableExecution/CallbackException.cs index 2d1244b2b..61fb4a4f6 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/CallbackException.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/CallbackException.cs @@ -7,7 +7,7 @@ namespace Amazon.Lambda.DurableExecution; /// Base exception type for callback failures surfaced from /// /// or -/// . +/// . /// Concrete subclasses distinguish failure modes — pattern-match /// , , /// or in catch clauses. @@ -71,7 +71,7 @@ public CallbackTimeoutException(string message, Exception innerException) : base /// /// Thrown only from -/// +/// /// when the user-supplied submitter delegate (the step that hands the callback /// ID to the external system) fails after retries are exhausted. Wraps the /// underlying as . diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/ChildContextConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/ChildContextConfig.cs index c97418a6a..c00adf909 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/ChildContextConfig.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/ChildContextConfig.cs @@ -9,7 +9,7 @@ namespace Amazon.Lambda.DurableExecution; /// /// A child context is a logical sub-workflow with its own deterministic /// operation-ID space, persisted as a CONTEXT operation. Use -/// +/// /// (and overloads) to run code inside one. /// public sealed class ChildContextConfig diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs index e0f36720e..1064e6965 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs @@ -16,6 +16,7 @@ internal sealed class DurableContext : IDurableContext { private readonly ExecutionState _state; private readonly TerminationManager _terminationManager; + private readonly WorkflowCancellation _workflowCancellation; private readonly OperationIdGenerator _idGenerator; private readonly string _durableExecutionArn; private readonly CheckpointBatcher? _batcher; @@ -24,6 +25,7 @@ internal sealed class DurableContext : IDurableContext public DurableContext( ExecutionState state, TerminationManager terminationManager, + WorkflowCancellation workflowCancellation, OperationIdGenerator idGenerator, string durableExecutionArn, ILambdaContext lambdaContext, @@ -31,6 +33,7 @@ public DurableContext( { _state = state; _terminationManager = terminationManager; + _workflowCancellation = workflowCancellation; _idGenerator = idGenerator; _durableExecutionArn = durableExecutionArn; _batcher = batcher; @@ -55,14 +58,14 @@ public void ConfigureLogger(LoggerConfig config) } public Task StepAsync( - Func> func, + Func> func, string? name = null, StepConfig? config = null, CancellationToken cancellationToken = default) => RunStep(func, name, config, cancellationToken); public async Task StepAsync( - Func func, + Func func, string? name = null, StepConfig? config = null, CancellationToken cancellationToken = default) @@ -71,12 +74,12 @@ public async Task StepAsync( // step that always returns null. The serializer isn't actually invoked // with a non-null value, so any registered ILambdaSerializer suffices. await RunStep( - async (ctx) => { await func(ctx); return null; }, + async (ctx, ct) => { await func(ctx, ct); return null; }, name, config, cancellationToken); } private Task RunStep( - Func> func, + Func> func, string? name, StepConfig? config, CancellationToken cancellationToken) @@ -86,7 +89,7 @@ private Task RunStep( var operationId = _idGenerator.NextId(); var op = new StepOperation( operationId, name, _idGenerator.ParentId, func, config, serializer, Logger, - _state, _terminationManager, _durableExecutionArn, _batcher); + _state, _terminationManager, _workflowCancellation, _durableExecutionArn, _batcher); return op.ExecuteAsync(cancellationToken); } @@ -114,14 +117,14 @@ public Task WaitAsync( } public Task RunInChildContextAsync( - Func> func, + Func> func, string? name = null, ChildContextConfig? config = null, CancellationToken cancellationToken = default) => RunChildContext(func, name, config, cancellationToken); public async Task RunInChildContextAsync( - Func func, + Func func, string? name = null, ChildContextConfig? config = null, CancellationToken cancellationToken = default) @@ -130,12 +133,12 @@ public async Task RunInChildContextAsync( // returns null so the registered ILambdaSerializer is never asked to // serialize a real value. await RunChildContext( - async (ctx) => { await func(ctx); return null; }, + async (ctx, ct) => { await func(ctx, ct); return null; }, name, config, cancellationToken); } public Task WaitForConditionAsync( - Func> check, + Func> check, WaitForConditionConfig config, string? name = null, CancellationToken cancellationToken = default) @@ -148,12 +151,12 @@ public Task WaitForConditionAsync( var operationId = _idGenerator.NextId(); var op = new WaitForConditionOperation( operationId, name, _idGenerator.ParentId, check, config, serializer, Logger, - _state, _terminationManager, _durableExecutionArn, _batcher); + _state, _terminationManager, _workflowCancellation, _durableExecutionArn, _batcher); return op.ExecuteAsync(cancellationToken); } private Task RunChildContext( - Func> func, + Func> func, string? name, ChildContextConfig? config, CancellationToken cancellationToken) @@ -163,16 +166,16 @@ private Task RunChildContext( var operationId = _idGenerator.NextId(); // Capture this DurableContext's collaborators; the child shares state, - // termination, batcher, ARN, and Lambda context — but uses a child - // OperationIdGenerator so its operation IDs are deterministically - // namespaced under the parent op ID. + // termination, workflow cancellation, batcher, ARN, and Lambda context — + // but uses a child OperationIdGenerator so its operation IDs are + // deterministically namespaced under the parent op ID. IDurableContext ChildFactory(string parentOpId) => new DurableContext( - _state, _terminationManager, _idGenerator.CreateChild(parentOpId), + _state, _terminationManager, _workflowCancellation, _idGenerator.CreateChild(parentOpId), _durableExecutionArn, LambdaContext, _batcher); var op = new ChildContextOperation( operationId, name, _idGenerator.ParentId, func, config, serializer, ChildFactory, - _state, _terminationManager, _durableExecutionArn, _batcher); + _state, _terminationManager, _workflowCancellation, _durableExecutionArn, _batcher); return op.ExecuteAsync(cancellationToken); } @@ -197,7 +200,7 @@ private Task> RunCallback( } public Task WaitForCallbackAsync( - Func submitter, + Func submitter, string? name = null, WaitForCallbackConfig? config = null, CancellationToken cancellationToken = default) @@ -218,7 +221,7 @@ public Task WaitForCallbackAsync( /// /// private Task RunWaitForCallback( - Func submitter, + Func submitter, string? name, WaitForCallbackConfig? config, CancellationToken cancellationToken) @@ -240,8 +243,18 @@ private Task RunWaitForCallback( // StepAsync calls each pull the registered ILambdaSerializer from // ILambdaContext.Serializer, so AOT and reflection-based scenarios share // the same code path. + // + // Pass the OUTER cancellationToken (not childCtx's linked token) into the + // inner operations. Each inner operation will re-link the caller's token + // with the workflow-shutdown CTS itself when it invokes its user Func, so + // the submitter still observes both signals. Threading the already-linked + // childToken through here would propagate the workflow-shutdown signal + // into the inner operations' checkpoint writes (EnqueueAsync uses the + // cancellationToken parameter directly), which would risk lost START / + // SUCCEED checkpoints when termination fires mid-flush. See §7 of + // docs/design/cancellation-design.md. return RunInChildContextAsync( - async childCtx => + async (childCtx, _) => { var callback = await childCtx.CreateCallbackAsync( name: callbackName, @@ -249,10 +262,10 @@ private Task RunWaitForCallback( cancellationToken: cancellationToken); await childCtx.StepAsync( - async (stepCtx) => + async (stepCtx, stepToken) => { var submitterCtx = new WaitForCallbackContext(stepCtx.Logger); - await submitter(callback.CallbackId, submitterCtx); + await submitter(callback.CallbackId, submitterCtx, stepToken); }, name: submitterName, config: stepConfig, diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs index cb5a7a297..09fe15331 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableFunction.cs @@ -95,6 +95,7 @@ private static async Task WrapAsyncCore(invocationInput, serializer); var terminationManager = new TerminationManager(); + using var workflowCancellation = new WorkflowCancellation(terminationManager); var idGenerator = new OperationIdGenerator(); await using var batcher = new CheckpointBatcher( @@ -108,7 +109,7 @@ private static async Task WrapAsyncCore result; diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs index 960090a99..356a3ffcd 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs @@ -54,7 +54,11 @@ public interface IDurableContext /// The type of the step's result. /// /// The step body to execute. Receives an exposing - /// the step's logger, attempt number, and operation ID. + /// the step's logger, attempt number, and operation ID, and a + /// linking the caller-supplied token with + /// the SDK's workflow-shutdown signal — pass it to cancellation-aware APIs + /// (HttpClient.SendAsync, Task.Delay, AWS SDK calls) so the + /// step body unwinds cleanly when the workflow is being torn down. /// /// /// An optional name for the step, used for observability and to derive the @@ -63,10 +67,13 @@ public interface IDurableContext /// /// Optional step configuration (e.g. retry policy). Defaults are used when null. /// - /// A token to observe for cancellation. + /// + /// A token to observe for cancellation. Linked with an SDK-owned workflow + /// shutdown source; the resulting token is forwarded to . + /// /// The deserialized result of the step. Task StepAsync( - Func> func, + Func> func, string? name = null, StepConfig? config = null, CancellationToken cancellationToken = default); @@ -76,7 +83,9 @@ Task StepAsync( /// /// /// The step body to execute. Receives an exposing - /// the step's logger, attempt number, and operation ID. + /// the step's logger, attempt number, and operation ID, and a + /// linking the caller-supplied token with + /// the SDK's workflow-shutdown signal. /// /// /// An optional name for the step, used for observability and to derive the @@ -85,9 +94,12 @@ Task StepAsync( /// /// Optional step configuration (e.g. retry policy). Defaults are used when null. /// - /// A token to observe for cancellation. + /// + /// A token to observe for cancellation. Linked with an SDK-owned workflow + /// shutdown source; the resulting token is forwarded to . + /// Task StepAsync( - Func func, + Func func, string? name = null, StepConfig? config = null, CancellationToken cancellationToken = default); @@ -129,7 +141,9 @@ Task WaitAsync( /// The type of the child context's result. /// /// The user function to run inside the child context. Receives a nested - /// with its own deterministic operation-ID space. + /// with its own deterministic operation-ID space, + /// and a linking the caller-supplied token with + /// the SDK's workflow-shutdown signal. /// /// /// An optional name for the child context, used for observability and to derive @@ -139,10 +153,13 @@ Task WaitAsync( /// Optional child context configuration (e.g. /// ). Defaults are used when null. /// - /// A token to observe for cancellation. + /// + /// A token to observe for cancellation. Linked with an SDK-owned workflow + /// shutdown source; the resulting token is forwarded to . + /// /// The deserialized result of the child context. Task RunInChildContextAsync( - Func> func, + Func> func, string? name = null, ChildContextConfig? config = null, CancellationToken cancellationToken = default); @@ -162,7 +179,9 @@ Task RunInChildContextAsync( /// /// /// The user function to run inside the child context. Receives a nested - /// with its own deterministic operation-ID space. + /// with its own deterministic operation-ID space, + /// and a linking the caller-supplied token with + /// the SDK's workflow-shutdown signal. /// /// /// An optional name for the child context, used for observability and to derive @@ -172,9 +191,12 @@ Task RunInChildContextAsync( /// Optional child context configuration (e.g. /// ). Defaults are used when null. /// - /// A token to observe for cancellation. + /// + /// A token to observe for cancellation. Linked with an SDK-owned workflow + /// shutdown source; the resulting token is forwarded to . + /// Task RunInChildContextAsync( - Func func, + Func func, string? name = null, ChildContextConfig? config = null, CancellationToken cancellationToken = default); @@ -222,7 +244,7 @@ Task> CreateCallbackAsync( /// (which hands the callbackId to an external system), and suspends /// until the external system delivers a result. Equivalent to manually /// composing - /// + + /// + /// + /// inside a child context. /// @@ -235,7 +257,9 @@ Task> CreateCallbackAsync( /// The type of the result the callback will deliver. /// /// A function that hands the service-allocated callbackId to the external - /// system. Receives the callback ID and an . + /// system. Receives the callback ID, an , + /// and a linking the caller-supplied token with + /// the SDK's workflow-shutdown signal. /// /// /// An optional name for the operation, used for observability and to derive the @@ -245,10 +269,13 @@ Task> CreateCallbackAsync( /// Optional configuration (e.g. submitter retry policy and callback timeout). /// Defaults are used when null. /// - /// A token to observe for cancellation. + /// + /// A token to observe for cancellation. Linked with an SDK-owned workflow + /// shutdown source; the resulting token is forwarded to . + /// /// The deserialized result delivered by the external system. Task WaitForCallbackAsync( - Func submitter, + Func submitter, string? name = null, WaitForCallbackConfig? config = null, CancellationToken cancellationToken = default); @@ -316,8 +343,10 @@ Task InvokeAsync( /// /// The condition check invoked on each poll. Receives the state returned by the /// previous invocation (seeded by - /// on the first call) - /// and an , and returns the next state. + /// on the first call), + /// an , and a + /// linking the caller-supplied token with the SDK's workflow-shutdown signal, + /// and returns the next state. /// /// /// The configuration controlling polling, including the @@ -327,10 +356,13 @@ Task InvokeAsync( /// An optional name for the operation, used for observability and to derive the /// deterministic operation ID. Defaults to a name inferred from the call site. /// - /// A token to observe for cancellation. + /// + /// A token to observe for cancellation. Linked with an SDK-owned workflow + /// shutdown source; the resulting token is forwarded to . + /// /// The final state observed when the strategy decides to stop. Task WaitForConditionAsync( - Func> check, + Func> check, WaitForConditionConfig config, string? name = null, CancellationToken cancellationToken = default); diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/IWaitForCallbackContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/IWaitForCallbackContext.cs index 866fb3bab..d282e2f72 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/IWaitForCallbackContext.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/IWaitForCallbackContext.cs @@ -7,7 +7,7 @@ namespace Amazon.Lambda.DurableExecution; /// /// Context passed to the submitter delegate of -/// . +/// . /// Provides a replay-safe logger scoped to the submitter step. /// /// diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs index a0abbf99e..4a25990fc 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ChildContextOperation.cs @@ -38,21 +38,23 @@ namespace Amazon.Lambda.DurableExecution.Internal; /// internal sealed class ChildContextOperation : DurableOperation { - private readonly Func> _func; + private readonly Func> _func; private readonly ChildContextConfig? _config; private readonly ILambdaSerializer _serializer; private readonly Func _childContextFactory; + private readonly WorkflowCancellation _workflowCancellation; public ChildContextOperation( string operationId, string? name, string? parentId, - Func> func, + Func> func, ChildContextConfig? config, ILambdaSerializer serializer, Func childContextFactory, ExecutionState state, TerminationManager termination, + WorkflowCancellation workflowCancellation, string durableExecutionArn, CheckpointBatcher? batcher = null) : base(operationId, name, parentId, state, termination, durableExecutionArn, batcher) @@ -61,6 +63,7 @@ public ChildContextOperation( _config = config; _serializer = serializer; _childContextFactory = childContextFactory; + _workflowCancellation = workflowCancellation; } protected override string OperationType => OperationTypes.Context; @@ -116,13 +119,22 @@ private async Task ExecuteFunc(CancellationToken cancellationToken) var childContext = _childContextFactory(OperationId); + // Link the caller's token with the workflow-shutdown token. The user + // func observes both signals; the SDK's checkpoint writes (CONTEXT + // FAIL / SUCCEED below) continue to use the caller's token only. + using var linked = CancellationTokenSource.CreateLinkedTokenSource( + cancellationToken, _workflowCancellation.Token); + T result; try { - result = await _func(childContext); + result = await _func(childContext, linked.Token); } - catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + catch (OperationCanceledException) when (linked.IsCancellationRequested) { + // Cancellation owned by the linked source — caller cancel or workflow + // shutdown. Do NOT checkpoint CONTEXT FAIL: the termination signal + // (or upstream cancel) owns the outcome. throw; } catch (NonDeterministicExecutionException) diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs index 4d04d8a72..76780957a 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/StepOperation.cs @@ -38,21 +38,23 @@ namespace Amazon.Lambda.DurableExecution.Internal; /// internal sealed class StepOperation : DurableOperation { - private readonly Func> _func; + private readonly Func> _func; private readonly StepConfig? _config; private readonly ILambdaSerializer _serializer; private readonly ILogger _logger; + private readonly WorkflowCancellation _workflowCancellation; public StepOperation( string operationId, string? name, string? parentId, - Func> func, + Func> func, StepConfig? config, ILambdaSerializer serializer, ILogger logger, ExecutionState state, TerminationManager termination, + WorkflowCancellation workflowCancellation, string durableExecutionArn, CheckpointBatcher? batcher = null) : base(operationId, name, parentId, state, termination, durableExecutionArn, batcher) @@ -61,6 +63,7 @@ public StepOperation( _config = config; _serializer = serializer; _logger = logger; + _workflowCancellation = workflowCancellation; } protected override string OperationType => OperationTypes.Step; @@ -204,6 +207,14 @@ private async Task ExecuteFunc(int attemptNumber, CancellationToken cancellat } + // Link the caller's token with the workflow-shutdown token so the user + // step body observes both upstream cancel intent and SDK-driven workflow + // teardown. The linked token is passed to the user Func only; checkpoint + // writes still use the caller's token (workflow shutdown must NOT abort + // a successful step's SUCCEED checkpoint — see cancellation-design.md §7). + using var linked = CancellationTokenSource.CreateLinkedTokenSource( + cancellationToken, _workflowCancellation.Token); + try { var stepContext = new StepContext(OperationId, attemptNumber, _logger); @@ -220,7 +231,7 @@ private async Task ExecuteFunc(int attemptNumber, CancellationToken cancellat ["attempt"] = attemptNumber, })) { - result = await _func(stepContext); + result = await _func(stepContext, linked.Token); } await EnqueueAsync(new SdkOperationUpdate @@ -236,14 +247,21 @@ await EnqueueAsync(new SdkOperationUpdate return result; } - catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + catch (OperationCanceledException) when (linked.IsCancellationRequested) { + // Cancellation owned by the linked source (caller-cancel or workflow + // shutdown). Do NOT checkpoint FAIL and do NOT consult the retry + // strategy — the termination signal that fired (if any) owns the + // suspend/abort decision; an upstream caller-cancel propagates up + // as a fault on the workflow user task. throw; } catch (Exception ex) { // Funnel into the retry/fail decision tree. May checkpoint RETRY and - // suspend (Pending), or checkpoint FAIL and rethrow to user. + // suspend (Pending), or checkpoint FAIL and rethrow to user. A user- + // thrown OperationCanceledException unrelated to our linked token + // falls through here and is treated as a normal step failure. return await HandleStepFailureAsync(ex, attemptNumber, cancellationToken); } } diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitForConditionOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitForConditionOperation.cs index 742265782..79e011efd 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitForConditionOperation.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WaitForConditionOperation.cs @@ -51,21 +51,23 @@ namespace Amazon.Lambda.DurableExecution.Internal; /// internal sealed class WaitForConditionOperation : DurableOperation { - private readonly Func> _check; + private readonly Func> _check; private readonly WaitForConditionConfig _config; private readonly ILambdaSerializer _serializer; private readonly ILogger _logger; + private readonly WorkflowCancellation _workflowCancellation; public WaitForConditionOperation( string operationId, string? name, string? parentId, - Func> check, + Func> check, WaitForConditionConfig config, ILambdaSerializer serializer, ILogger logger, ExecutionState state, TerminationManager termination, + WorkflowCancellation workflowCancellation, string durableExecutionArn, CheckpointBatcher? batcher = null) : base(operationId, name, parentId, state, termination, durableExecutionArn, batcher) @@ -74,6 +76,7 @@ public WaitForConditionOperation( _config = config; _serializer = serializer; _logger = logger; + _workflowCancellation = workflowCancellation; } protected override string OperationType => OperationTypes.Step; @@ -167,14 +170,23 @@ await EnqueueAsync(new SdkOperationUpdate }, cancellationToken); } + // Link the caller's token with the workflow-shutdown token. The check + // function observes both signals; the SDK's RETRY/SUCCEED/FAIL + // checkpoint writes still use the caller's token only. + using var linked = CancellationTokenSource.CreateLinkedTokenSource( + cancellationToken, _workflowCancellation.Token); + TState newState; try { var checkContext = new ConditionCheckContext(attemptNumber, _logger); - newState = await _check(currentState, checkContext); + newState = await _check(currentState, checkContext, linked.Token); } - catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + catch (OperationCanceledException) when (linked.IsCancellationRequested) { + // Cancellation owned by the linked source — caller cancel or workflow + // shutdown. Do NOT checkpoint FAIL: the termination signal (or + // upstream cancel) owns the outcome. throw; } catch (Exception ex) diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WorkflowCancellation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WorkflowCancellation.cs new file mode 100644 index 000000000..115dd585e --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/WorkflowCancellation.cs @@ -0,0 +1,46 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +namespace Amazon.Lambda.DurableExecution.Internal; + +/// +/// Workflow-scoped cancellation source. Cancels when the +/// resolves so abandoned user-Func bodies +/// (the WhenAny loser in ) unwind via +/// instead of running to completion on +/// the threadpool while Lambda is mid-response. +/// +/// +/// One instance per durable function invocation, constructed and disposed by +/// . Operation classes that invoke user +/// Funcs build a per-call linked CTS combining the caller's token with +/// and pass the linked token into the user code. +/// +/// Checkpoint writes, batcher flushes, and other SDK-internal work do NOT +/// observe this token: successful work must persist even when the workflow is +/// being torn down. +/// +/// +internal sealed class WorkflowCancellation : IDisposable +{ + private readonly CancellationTokenSource _cts = new(); + + public CancellationToken Token => _cts.Token; + + public WorkflowCancellation(TerminationManager terminationManager) + { + terminationManager.TerminationTask.ContinueWith( + static (_, state) => + { + var cts = (CancellationTokenSource)state!; + try { cts.Cancel(); } + catch (ObjectDisposedException) { } + }, + _cts, + CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); + } + + public void Dispose() => _cts.Dispose(); +} diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/README.md b/Libraries/src/Amazon.Lambda.DurableExecution/README.md index 482024e0c..264703397 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/README.md +++ b/Libraries/src/Amazon.Lambda.DurableExecution/README.md @@ -22,6 +22,7 @@ Your handler delegates to `DurableFunction.WrapAsync`, which gives your workflow - `ctx.WaitForConditionAsync` — poll a check function until a condition is met, suspending between polls. ([docs](docs/core/wait-for-condition.md)) - `ctx.CreateCallbackAsync` / `ctx.WaitForCallbackAsync` — wait for external events (approvals, webhooks). ([docs](docs/core/callbacks.md)) - `ctx.RunInChildContextAsync` — run an isolated child context with its own checkpoint log. ([docs](docs/core/child-contexts.md)) +- Every user `Func` receives a `CancellationToken` linking the caller's token with the SDK's workflow-shutdown signal. ([docs](docs/core/cancellation.md)) ## Quick Start @@ -64,17 +65,17 @@ public class OrderProcessor private async Task Workflow(Order order, IDurableContext ctx) { var reservation = await ctx.StepAsync( - async _ => await InventoryService.ReserveAsync(order.Items), + async (_, ct) => await InventoryService.ReserveAsync(order.Items, ct), name: "reserve-inventory"); var payment = await ctx.StepAsync( - async _ => await PaymentService.ChargeAsync(order.PaymentMethod, order.Total), + async (_, ct) => await PaymentService.ChargeAsync(order.PaymentMethod, order.Total, ct), name: "process-payment"); await ctx.WaitAsync(TimeSpan.FromHours(2), name: "warehouse-processing"); var shipment = await ctx.StepAsync( - async _ => await ShippingService.ShipAsync(reservation, order.Address), + async (_, ct) => await ShippingService.ShipAsync(reservation, order.Address, ct), name: "confirm-shipment"); return new OrderResult(order.Id, shipment.TrackingNumber); diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/WaitForCallbackConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/WaitForCallbackConfig.cs index 90cf1f420..9aed6da08 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/WaitForCallbackConfig.cs +++ b/Libraries/src/Amazon.Lambda.DurableExecution/WaitForCallbackConfig.cs @@ -5,7 +5,7 @@ namespace Amazon.Lambda.DurableExecution; /// /// Configuration for the composite -/// +/// /// operation. Inherits the callback's and /// ; adds a /// for the submitter step. diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/callbacks.md b/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/callbacks.md index 573ad17e3..00aee3cd4 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/callbacks.md +++ b/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/callbacks.md @@ -11,13 +11,13 @@ Two APIs are available: ```csharp Task WaitForCallbackAsync( - Func submitter, + Func submitter, string? name = null, WaitForCallbackConfig? config = null, CancellationToken cancellationToken = default); ``` -The submitter receives the freshly allocated `callbackId` and an `IWaitForCallbackContext` (logger-only). Submitter failures (after retries are exhausted) surface as `CallbackSubmitterException`; callback failures and timeouts surface as `CallbackFailedException` / `CallbackTimeoutException`. +The submitter receives the freshly allocated `callbackId`, an `IWaitForCallbackContext` (logger-only), and a `CancellationToken` linking the caller-supplied token with the SDK's workflow-shutdown signal. Submitter failures (after retries are exhausted) surface as `CallbackSubmitterException`; callback failures and timeouts surface as `CallbackFailedException` / `CallbackTimeoutException`. ## `CreateCallbackAsync` @@ -78,7 +78,7 @@ public class Function // with this callback ID. The submitter is invoked once with a freshly-allocated // ID; it hands the ID to the approver and returns immediately. var result = await ctx.WaitForCallbackAsync( - submitter: async (callbackId, cbCtx) => + submitter: async (callbackId, cbCtx, ct) => { var payload = $$"""{"callbackId":"{{callbackId}}","orderId":"{{input.OrderId}}"}"""; await LambdaClient.InvokeAsync(new InvokeRequest @@ -86,7 +86,7 @@ public class Function FunctionName = approverFunctionName, InvocationType = InvocationType.Event, // fire-and-forget Payload = payload - }); + }, ct); }, name: "approve"); @@ -154,7 +154,7 @@ private async Task Workflow(OrderInput input, IDurableContext ct { var cb = await ctx.CreateCallbackAsync(name: "approve"); - await ctx.StepAsync(async _ => + await ctx.StepAsync(async (_, ct) => { var payload = $$"""{"callbackId":"{{cb.CallbackId}}","orderId":"{{input.OrderId}}"}"""; await LambdaClient.InvokeAsync(new InvokeRequest @@ -162,7 +162,7 @@ private async Task Workflow(OrderInput input, IDurableContext ct FunctionName = approverFunctionName, InvocationType = InvocationType.Event, Payload = payload - }); + }, ct); }, name: "submit"); return await cb.GetResultAsync(); diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/cancellation.md b/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/cancellation.md new file mode 100644 index 000000000..1abc75858 --- /dev/null +++ b/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/cancellation.md @@ -0,0 +1,34 @@ +# Cancellation + +Every user `Func` accepted by `IDurableContext` (`StepAsync`, `RunInChildContextAsync`, `WaitForCallbackAsync`, `WaitForConditionAsync`) receives a `CancellationToken` parameter. Pass it to cancellation-aware APIs inside the body so the workflow can tear down cleanly. + +## What the token observes + +The token is a linked source combining: + +1. The `CancellationToken` you passed to the `IDurableContext` method (so the caller's cancel intent reaches the body). +2. An SDK-owned workflow-shutdown signal that fires when the workflow is being torn down (a sibling operation suspended, a checkpoint failed, or a future parallel branch aborted). + +```csharp +var user = await ctx.StepAsync( + async (_, ct) => await httpClient.GetAsync(url, ct), + name: "fetch"); +``` + +When either trigger fires, the token transitions to `IsCancellationRequested = true` and `await`s on cancellation-aware APIs unwind via `OperationCanceledException`. + +## Semantics + +- **`OperationCanceledException` thrown out of a step body via the token** (i.e. `linked.IsCancellationRequested` is true) is treated as cancellation: no FAIL checkpoint is written, no retry is consulted. The exception propagates up. +- **`OperationCanceledException` thrown by user code for unrelated reasons** (token never fired) is treated as a normal step failure: FAIL checkpoint, retry per the configured `RetryStrategy`. +- **The SDK's own writes (checkpoints, batcher flush, the runtime API response)** never observe the workflow-shutdown signal. Successful work is never lost to teardown. + +## Guidance + +- **Do** pass `ct` into every cancellation-aware call inside the step body (`HttpClient.SendAsync(ct)`, `Task.Delay(ct)`, AWS SDK calls). This is what makes caller-cancel and shutdown-cancel actually unwind. +- **Don't** branch workflow logic on `IsCancellationRequested`. It is a runtime concern, not a workflow concern; branching on it makes the workflow non-deterministic across replays. +- **Don't** `catch (OperationCanceledException)` and continue. Either don't catch, or catch and rethrow. + +## Replay + +Cached operations short-circuit before the user `Func` is invoked. A `SUCCESS` checkpoint replays its serialized result; the token is never built or observed. Replay determinism is structural — cancellation cannot affect it. diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/child-contexts.md b/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/child-contexts.md index 4a664e11e..34904f290 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/child-contexts.md +++ b/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/child-contexts.md @@ -6,27 +6,29 @@ ```csharp Task RunInChildContextAsync( - Func> func, + Func> func, string? name = null, ChildContextConfig? config = null, CancellationToken cancellationToken = default); Task RunInChildContextAsync( - Func func, + Func func, string? name = null, ChildContextConfig? config = null, CancellationToken cancellationToken = default); ``` +The `CancellationToken` parameter is a linked token combining the caller-supplied token with the SDK's workflow-shutdown signal — forward it to `StepAsync` and other operations inside the child so cancellation propagates uniformly. + ## Example ```csharp var phaseResult = await ctx.RunInChildContextAsync( - async childCtx => + async (childCtx, ct) => { - var validated = await childCtx.StepAsync(async _ => Validate(input), name: "validate"); - await childCtx.WaitAsync(TimeSpan.FromSeconds(2), name: "short_wait"); - var processed = await childCtx.StepAsync(async _ => Process(validated), name: "process"); + var validated = await childCtx.StepAsync(async (_, c) => Validate(input, c), name: "validate", cancellationToken: ct); + await childCtx.WaitAsync(TimeSpan.FromSeconds(2), name: "short_wait", cancellationToken: ct); + var processed = await childCtx.StepAsync(async (_, c) => Process(validated, c), name: "process", cancellationToken: ct); return processed; }, name: "phase", diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/steps.md b/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/steps.md index c7f9e9f22..cb6fa48a2 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/steps.md +++ b/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/steps.md @@ -6,34 +6,34 @@ ```csharp Task StepAsync( - Func> func, + Func> func, string? name = null, StepConfig? config = null, CancellationToken cancellationToken = default); Task StepAsync( - Func func, + Func func, string? name = null, StepConfig? config = null, CancellationToken cancellationToken = default); ``` -The `IStepContext` parameter exposes the current `AttemptNumber`, the deterministic `OperationId`, and a scoped `Logger`. Returned values are serialized via the `ILambdaSerializer` registered on `ILambdaContext.Serializer`. +The `IStepContext` parameter exposes the current `AttemptNumber`, the deterministic `OperationId`, and a scoped `Logger`. The `CancellationToken` parameter is a linked token combining the caller-supplied token with the SDK's workflow-shutdown signal — pass it to cancellation-aware APIs (`HttpClient.SendAsync`, `Task.Delay`, AWS SDK calls) so the step body unwinds cleanly when the workflow is being torn down. Returned values are serialized via the `ILambdaSerializer` registered on `ILambdaContext.Serializer`. ## Basic step ```csharp var user = await ctx.StepAsync( - async _ => await userService.GetUserAsync(userId), + async (_, ct) => await userService.GetUserAsync(userId, ct), name: "fetch-user"); ``` ## Multiple steps ```csharp -var a = await ctx.StepAsync(async _ => $"a-{input.OrderId}", name: "step_1"); -var b = await ctx.StepAsync(async _ => $"{a}-b", name: "step_2"); -var c = await ctx.StepAsync(async _ => $"{b}-c", name: "step_3"); +var a = await ctx.StepAsync(async (_, _) => $"a-{input.OrderId}", name: "step_1"); +var b = await ctx.StepAsync(async (_, _) => $"{a}-b", name: "step_2"); +var c = await ctx.StepAsync(async (_, _) => $"{b}-c", name: "step_3"); ``` ## Step configuration @@ -99,7 +99,7 @@ When `retryableExceptions` and `retryableMessagePatterns` are both null (default ```csharp var result = await ctx.StepAsync( - async stepCtx => + async (stepCtx, _) => { if (stepCtx.AttemptNumber < 3) throw new InvalidOperationException($"flake on attempt {stepCtx.AttemptNumber}"); @@ -138,7 +138,7 @@ These semantics apply *per retry attempt*, not per overall execution. To achieve ```csharp var result = await ctx.StepAsync( - async _ => await paymentService.ChargeAsync(amount), + async (_, ct) => await paymentService.ChargeAsync(amount, ct), name: "charge-payment", config: new StepConfig { diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/wait-for-condition.md b/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/wait-for-condition.md index 93ea3f4d9..b900b8ffe 100644 --- a/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/wait-for-condition.md +++ b/Libraries/src/Amazon.Lambda.DurableExecution/docs/core/wait-for-condition.md @@ -8,7 +8,7 @@ Use it when you're waiting on something whose readiness you can only learn by *a ```csharp Task WaitForConditionAsync( - Func> check, + Func> check, WaitForConditionConfig config, string? name = null, CancellationToken cancellationToken = default); @@ -16,7 +16,7 @@ Task WaitForConditionAsync( On every iteration the `check` function receives the state returned by the previous invocation — seeded by `config.InitialState` on the very first call — and returns the next state. The configured `IWaitStrategy` then decides whether to keep polling and how long to wait. State is checkpointed each iteration, so the polling loop survives Lambda re-invocations deterministically and you can carry per-poll bookkeeping (a cursor, a counter) inside the state itself. -The `IConditionCheckContext` parameter exposes the current `AttemptNumber` (1-based) and a scoped `Logger`. The returned state is serialized via the `ILambdaSerializer` registered on `ILambdaContext.Serializer`. +The `IConditionCheckContext` parameter exposes the current `AttemptNumber` (1-based) and a scoped `Logger`. The `CancellationToken` parameter is a linked token combining the caller-supplied token with the SDK's workflow-shutdown signal — pass it to the underlying I/O so the check unwinds cleanly when the workflow is being torn down. The returned state is serialized via the `ILambdaSerializer` registered on `ILambdaContext.Serializer`. When the strategy stops because its `maxAttempts` limit is reached — rather than because the condition was met — the operation throws `WaitForConditionException` carrying `AttemptsExhausted` and the last observed `LastState`. @@ -26,10 +26,10 @@ Poll an order's status until it reaches a terminal value: ```csharp var finalStatus = await ctx.WaitForConditionAsync( - check: async (state, checkCtx) => + check: async (state, checkCtx, ct) => { checkCtx.Logger.LogInformation("Polling order on attempt {Attempt}", checkCtx.AttemptNumber); - return await orderService.GetStatusAsync(orderId); + return await orderService.GetStatusAsync(orderId, ct); }, config: new WaitForConditionConfig { diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.AotPublishTest/Program.cs b/Libraries/test/Amazon.Lambda.DurableExecution.AotPublishTest/Program.cs index 41404ca96..10d9dd3ca 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.AotPublishTest/Program.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.AotPublishTest/Program.cs @@ -35,7 +35,7 @@ public static Task HandlerAsync( private static async Task WorkflowAsync(OrderEvent input, IDurableContext context) { var validation = await context.StepAsync( - async (_) => + async (_, _) => { await Task.CompletedTask; return new ValidationResult { IsValid = true }; diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Function.cs index 443d05b8a..bdc9a9d55 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/AtMostOnceCrashFunction/Function.cs @@ -39,7 +39,7 @@ public Task Handler( private async Task Workflow(TestEvent input, IDurableContext context) { var result = await context.StepAsync( - async (ctx) => + async (ctx, _) => { await Task.CompletedTask; if (ctx.AttemptNumber == 1) diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/Function.cs index 721302ed3..57876259d 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CallbackFailedFunction/Function.cs @@ -40,7 +40,7 @@ private async Task Workflow(TestEvent input, IDurableContext context) var cb = await context.CreateCallbackAsync(name: "approve"); // Wrap the hand-off in a step so replays don't re-invoke the rejecter. - await context.StepAsync(async _ => + await context.StepAsync(async (_, _) => { var payload = $$"""{"callbackId":"{{cb.CallbackId}}","orderId":"{{input.OrderId}}"}"""; await LambdaClient.InvokeAsync(new InvokeRequest diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFailsFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFailsFunction/Function.cs index ae3134f24..d62207f74 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFailsFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFailsFunction/Function.cs @@ -29,10 +29,10 @@ private async Task Workflow(TestEvent input, IDurableContext context // service must record a ContextFailed event with the error details and // mark the workflow FAILED. await context.RunInChildContextAsync( - async (childCtx) => + async (childCtx, _) => { await childCtx.StepAsync( - async (_) => { await Task.CompletedTask; return $"prepared-{input.OrderId}"; }, + async (_, _) => { await Task.CompletedTask; return $"prepared-{input.OrderId}"; }, name: "prepare"); throw new InvalidOperationException("intentional child context failure for integration test"); diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFunction/Function.cs index 507f1df0f..e62cca8c0 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextFunction/Function.cs @@ -29,16 +29,16 @@ private async Task Workflow(TestEvent input, IDurableContext context // return value is checkpointed at the parent level as a CONTEXT // SUCCEED record, so on replay we'd see it returned from cache. var phaseResult = await context.RunInChildContextAsync( - async (childCtx) => + async (childCtx, _) => { var validated = await childCtx.StepAsync( - async (_) => { await Task.CompletedTask; return $"validated-{input.OrderId}"; }, + async (_, _) => { await Task.CompletedTask; return $"validated-{input.OrderId}"; }, name: "validate"); await childCtx.WaitAsync(TimeSpan.FromSeconds(2), name: "short_wait"); var processed = await childCtx.StepAsync( - async (_) => { await Task.CompletedTask; return $"processed-{validated}"; }, + async (_, _) => { await Task.CompletedTask; return $"processed-{validated}"; }, name: "process"); return processed; diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextRetryFailsFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextRetryFailsFunction/Function.cs index 521a7fa50..7c7dd4974 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextRetryFailsFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ChildContextRetryFailsFunction/Function.cs @@ -30,10 +30,10 @@ private async Task Workflow(TestEvent input, IDurableContext context // close as ContextFailed when retries are exhausted — proving the // child is a single retry/error boundary. await context.RunInChildContextAsync( - async (childCtx) => + async (childCtx, _) => { return await childCtx.StepAsync( - async (ctx) => + async (ctx, _) => { await Task.CompletedTask; throw new InvalidOperationException( diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/Function.cs index e9712e6ea..87b92f32e 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/CreateCallbackHappyPathFunction/Function.cs @@ -42,7 +42,7 @@ private async Task Workflow(TestEvent input, IDurableContext context) var cb = await context.CreateCallbackAsync(name: "approve"); // Wrap the hand-off in a step so replays don't re-invoke the approver. - await context.StepAsync(async _ => + await context.StepAsync(async (_, _) => { var payload = $$"""{"callbackId":"{{cb.CallbackId}}","orderId":"integ-test"}"""; await LambdaClient.InvokeAsync(new InvokeRequest diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeChildTenantFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeChildTenantFunction/Function.cs index 240565384..922e0b946 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeChildTenantFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeChildTenantFunction/Function.cs @@ -26,7 +26,7 @@ public Task Handler( private async Task Workflow(int input, IDurableContext context) { var formatted = await context.StepAsync( - async (_) => { await Task.CompletedTask; return $"tenant-aware-{input}"; }, + async (_, _) => { await Task.CompletedTask; return $"tenant-aware-{input}"; }, name: "tenant_step"); return formatted; } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureChildFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureChildFunction/Function.cs index 7e96ff0c8..291afbf2a 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureChildFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeFailureChildFunction/Function.cs @@ -30,7 +30,7 @@ private async Task Workflow(int input, IDurableContext context) // FAILED chained invocation and raises InvokeFailedException with the // step's error type (System.InvalidOperationException) attached. await context.StepAsync( - async (_) => + async (_, _) => { await Task.CompletedTask; throw new InvalidOperationException("intentional child failure"); diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathChildFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathChildFunction/Function.cs index 898021cdd..bc3c0fc39 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathChildFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeHappyPathChildFunction/Function.cs @@ -26,7 +26,7 @@ public Task Handler( private async Task Workflow(int input, IDurableContext context) { var prefixed = await context.StepAsync( - async (_) => { await Task.CompletedTask; return $"got-{input}"; }, + async (_, _) => { await Task.CompletedTask; return $"got-{input}"; }, name: "format"); return prefixed; } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismChildFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismChildFunction/Function.cs index 5115101e1..7d5e8b70c 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismChildFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismChildFunction/Function.cs @@ -26,7 +26,7 @@ public Task Handler( private async Task Workflow(string input, IDurableContext context) { var echoed = await context.StepAsync( - async (_) => { await Task.CompletedTask; return $"echoed:{input}"; }, + async (_, _) => { await Task.CompletedTask; return $"echoed:{input}"; }, name: "child_echo"); return echoed; } diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismParentFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismParentFunction/Function.cs index b00be9c95..4ba7dfcdc 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismParentFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/InvokeReplayDeterminismParentFunction/Function.cs @@ -32,7 +32,7 @@ private async Task Workflow(TestEvent input, IDurableContext context // checkpointed value — proves the SDK's deterministic operation IDs // line up with the service's view of the state. var generatedId = await context.StepAsync( - async (_) => { await Task.CompletedTask; return Guid.NewGuid().ToString(); }, + async (_, _) => { await Task.CompletedTask; return Guid.NewGuid().ToString(); }, name: "before_invoke"); // The chained invoke forces a suspend/resume cycle. After the resume, @@ -44,7 +44,7 @@ private async Task Workflow(TestEvent input, IDurableContext context name: "echo_invoke"); var afterInvoke = await context.StepAsync( - async (_) => { await Task.CompletedTask; return $"final:{invokeResult}"; }, + async (_, _) => { await Task.CompletedTask; return $"final:{invokeResult}"; }, name: "after_invoke"); return new TestResult { Status = "completed", Data = afterInvoke }; diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Function.cs index 7d3c0f0e1..bdbf811df 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongRetryChainFunction/Function.cs @@ -33,7 +33,7 @@ public Task Handler( private async Task Workflow(TestEvent input, IDurableContext context) { var result = await context.StepAsync( - async (ctx) => + async (ctx, _) => { await Task.CompletedTask; if (ctx.AttemptNumber < 6) diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongerWaitFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongerWaitFunction/Function.cs index 401066c0e..7d241a02f 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongerWaitFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/LongerWaitFunction/Function.cs @@ -26,13 +26,13 @@ public Task Handler( private async Task Workflow(TestEvent input, IDurableContext context) { var step1 = await context.StepAsync( - async (_) => { await Task.CompletedTask; return $"started-{input.OrderId}"; }, + async (_, _) => { await Task.CompletedTask; return $"started-{input.OrderId}"; }, name: "before_wait"); await context.WaitAsync(TimeSpan.FromSeconds(15), name: "long_wait"); var step2 = await context.StepAsync( - async (_) => { await Task.CompletedTask; return $"after_wait-{step1}"; }, + async (_, _) => { await Task.CompletedTask; return $"after_wait-{step1}"; }, name: "after_wait"); return new TestResult { Status = "completed", Data = step2 }; diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MultipleStepsFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MultipleStepsFunction/Function.cs index cdf5992b6..986126a3f 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MultipleStepsFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MultipleStepsFunction/Function.cs @@ -26,23 +26,23 @@ public Task Handler( private async Task Workflow(TestEvent input, IDurableContext context) { var step1 = await context.StepAsync( - async (_) => { await Task.CompletedTask; return $"a-{input.OrderId}"; }, + async (_, _) => { await Task.CompletedTask; return $"a-{input.OrderId}"; }, name: "step_1"); var step2 = await context.StepAsync( - async (_) => { await Task.CompletedTask; return $"{step1}-b"; }, + async (_, _) => { await Task.CompletedTask; return $"{step1}-b"; }, name: "step_2"); var step3 = await context.StepAsync( - async (_) => { await Task.CompletedTask; return $"{step2}-c"; }, + async (_, _) => { await Task.CompletedTask; return $"{step2}-c"; }, name: "step_3"); var step4 = await context.StepAsync( - async (_) => { await Task.CompletedTask; return $"{step3}-d"; }, + async (_, _) => { await Task.CompletedTask; return $"{step3}-d"; }, name: "step_4"); var step5 = await context.StepAsync( - async (_) => { await Task.CompletedTask; return $"{step4}-e"; }, + async (_, _) => { await Task.CompletedTask; return $"{step4}-e"; }, name: "step_5"); return new TestResult { Status = "completed", Data = step5 }; diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ReplayAwareLoggerFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ReplayAwareLoggerFunction/Function.cs index dbbcc24a9..55b4a5be9 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ReplayAwareLoggerFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ReplayAwareLoggerFunction/Function.cs @@ -36,7 +36,7 @@ private async Task Workflow(TestEvent input, IDurableContext context Console.WriteLine($"LOG_REPLAY_CONTROL workflow_start order={input.OrderId}"); var step1 = await context.StepAsync( - async (_) => + async (_, _) => { // Emitted inside the step's BeginScope, so the line carries // both execution-level scope (durableExecutionArn, awsRequestId) @@ -57,7 +57,7 @@ private async Task Workflow(TestEvent input, IDurableContext context // Step 2 runs fresh on invocation 2 — its EnterExecutionMode flips the // logger from suppress to passthrough. The next LogInformation lands. var step2 = await context.StepAsync( - async (_) => + async (_, _) => { await Task.CompletedTask; return $"processed-{step1}"; diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ReplayDeterminismFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ReplayDeterminismFunction/Function.cs index 22f919900..688a8227c 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ReplayDeterminismFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ReplayDeterminismFunction/Function.cs @@ -27,7 +27,7 @@ private async Task Workflow(TestEvent input, IDurableContext context { // Step 1 generates a fresh GUID. On replay, this MUST return the cached value. var generatedId = await context.StepAsync( - async (_) => { await Task.CompletedTask; return Guid.NewGuid().ToString(); }, + async (_, _) => { await Task.CompletedTask; return Guid.NewGuid().ToString(); }, name: "generate_id"); // Force a suspend/resume cycle to trigger replay @@ -35,7 +35,7 @@ private async Task Workflow(TestEvent input, IDurableContext context // Step 2 echoes the GUID. After replay, it should see the SAME GUID from step 1. var echoed = await context.StepAsync( - async (_) => { await Task.CompletedTask; return $"echo:{generatedId}"; }, + async (_, _) => { await Task.CompletedTask; return $"echo:{generatedId}"; }, name: "echo_id"); return new TestResult { Status = "completed", Data = echoed }; diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Function.cs index 3e78ffd9d..97602186e 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryExhaustionFunction/Function.cs @@ -26,7 +26,7 @@ public Task Handler( private async Task Workflow(TestEvent input, IDurableContext context) { var result = await context.StepAsync( - async (ctx) => + async (ctx, _) => { await Task.CompletedTask; throw new InvalidOperationException($"always-fails attempt {ctx.AttemptNumber}"); diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Function.cs index 800dc075f..5f81ca7dd 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/RetryFunction/Function.cs @@ -26,7 +26,7 @@ public Task Handler( private async Task Workflow(TestEvent input, IDurableContext context) { var result = await context.StepAsync( - async (ctx) => + async (ctx, _) => { await Task.CompletedTask; if (ctx.AttemptNumber < 3) diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/StepFailsFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/StepFailsFunction/Function.cs index de0246a50..293b83424 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/StepFailsFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/StepFailsFunction/Function.cs @@ -26,7 +26,7 @@ public Task Handler( private async Task Workflow(TestEvent input, IDurableContext context) { await context.StepAsync( - async (_) => + async (_, _) => { await Task.CompletedTask; throw new InvalidOperationException("intentional failure for integration test"); diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/StepWaitStepFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/StepWaitStepFunction/Function.cs index 97f7edd51..7de143800 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/StepWaitStepFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/StepWaitStepFunction/Function.cs @@ -26,13 +26,13 @@ public Task Handler( private async Task Workflow(TestEvent input, IDurableContext context) { var step1 = await context.StepAsync( - async (_) => { await Task.CompletedTask; return $"validated-{input.OrderId}"; }, + async (_, _) => { await Task.CompletedTask; return $"validated-{input.OrderId}"; }, name: "validate"); await context.WaitAsync(TimeSpan.FromSeconds(3), name: "short_wait"); var step2 = await context.StepAsync( - async (_) => { await Task.CompletedTask; return $"processed-{step1}"; }, + async (_, _) => { await Task.CompletedTask; return $"processed-{step1}"; }, name: "process"); return new TestResult { Status = "completed", Data = step2 }; diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/Function.cs index 129344d25..76e8ac3dd 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackHappyPathFunction/Function.cs @@ -41,7 +41,7 @@ private async Task Workflow(TestEvent input, IDurableContext context) ?? throw new InvalidOperationException("EXTERNAL_FUNCTION_NAME env var not set"); var result = await context.WaitForCallbackAsync( - submitter: async (callbackId, cbCtx) => + submitter: async (callbackId, cbCtx, _) => { var payload = $$"""{"callbackId":"{{callbackId}}","orderId":"{{input.OrderId}}"}"""; await LambdaClient.InvokeAsync(new InvokeRequest diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/Function.cs index 19b60d567..b9851d5ea 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForCallbackSubmitterFailsFunction/Function.cs @@ -30,7 +30,7 @@ private async Task Workflow(TestEvent input, IDurableContext context) // failure as CallbackSubmitterException. The workflow does not catch // it, so the durable execution surfaces FAILED with that exception. var result = await context.WaitForCallbackAsync( - submitter: async (callbackId, cbCtx) => + submitter: async (callbackId, cbCtx, _) => { await Task.CompletedTask; throw new InvalidOperationException("submitter intentional failure"); diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionExponentialFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionExponentialFunction/Function.cs index d73161e60..f3aad3f52 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionExponentialFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionExponentialFunction/Function.cs @@ -27,7 +27,7 @@ private async Task Workflow(TestEvent input, IDurableContext context // backoffRate=1.5, maxDelay=4s, no jitter: delays are 1s, 1.5s // (which the SDK ceilings to 2s due to 1s timer granularity). var finalState = await context.WaitForConditionAsync( - check: async (state, ctx) => + check: async (state, ctx, _) => { await Task.CompletedTask; var done = ctx.AttemptNumber >= 3; diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionHappyPathFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionHappyPathFunction/Function.cs index 086eb6bba..00d68b4c3 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionHappyPathFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionHappyPathFunction/Function.cs @@ -26,7 +26,7 @@ private async Task Workflow(TestEvent input, IDurableContext context // Each poll iteration is a separate Lambda invocation; the state is // carried across iterations via the RETRY checkpoint payload. var finalState = await context.WaitForConditionAsync( - check: async (state, ctx) => + check: async (state, ctx, _) => { await Task.CompletedTask; return new State(state.Counter + 1, ctx.AttemptNumber); diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionMaxAttemptsFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionMaxAttemptsFunction/Function.cs index 8f631fe86..8bdda540a 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionMaxAttemptsFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionMaxAttemptsFunction/Function.cs @@ -30,7 +30,7 @@ private async Task Workflow(TestEvent input, IDurableContext context try { await context.WaitForConditionAsync( - check: async (state, _) => + check: async (state, _, _) => { await Task.CompletedTask; return state + 1; diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionReplayDeterminismFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionReplayDeterminismFunction/Function.cs index 6300bb6fe..940b75ff1 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionReplayDeterminismFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionReplayDeterminismFunction/Function.cs @@ -25,14 +25,14 @@ private async Task Workflow(TestEvent input, IDurableContext context // Step 1: capture a fresh value. On replay this MUST return the // checkpointed value rather than re-executing. var generatedId = await context.StepAsync( - async (_) => { await Task.CompletedTask; return Guid.NewGuid().ToString(); }, + async (_, _) => { await Task.CompletedTask; return Guid.NewGuid().ToString(); }, name: "before_poll"); // Wait-for-condition with 3 polls. Each poll iteration is a separate // invocation, and the operation's deterministic ID + RETRY-payload // state must round-trip across re-invocations. var pollResult = await context.WaitForConditionAsync( - check: async (state, ctx) => + check: async (state, ctx, _) => { await Task.CompletedTask; return new Counter(state.Count + 1); @@ -50,7 +50,7 @@ private async Task Workflow(TestEvent input, IDurableContext context // Step 2: echo the generated ID. After replay, this should see the // SAME GUID from step 1 — proves replay returned the cached value. var echoed = await context.StepAsync( - async (_) => { await Task.CompletedTask; return $"echo:{generatedId}:{pollResult.Count}"; }, + async (_, _) => { await Task.CompletedTask; return $"echo:{generatedId}:{pollResult.Count}"; }, name: "after_poll"); return new TestResult { Status = "completed", Data = echoed }; diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionUserCheckThrowsFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionUserCheckThrowsFunction/Function.cs index 404114dc4..3e18594ab 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionUserCheckThrowsFunction/Function.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/WaitForConditionUserCheckThrowsFunction/Function.cs @@ -31,7 +31,7 @@ private async Task Workflow(TestEvent input, IDurableContext context try { await context.WaitForConditionAsync( - check: async (state, ctx) => + check: async (state, ctx, _) => { await Task.CompletedTask; if (ctx.AttemptNumber == 2) diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/CallbackOperationTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/CallbackOperationTests.cs index c70dc75fb..99a1342fe 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/CallbackOperationTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/CallbackOperationTests.cs @@ -29,7 +29,7 @@ private static (DurableContext context, RecordingBatcher recorder, TerminationMa var idGen = new OperationIdGenerator(); var lambdaContext = CreateLambdaContext(); var recorder = new RecordingBatcher(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext, recorder.Batcher); return (context, recorder, tm, state); } @@ -477,7 +477,7 @@ public async Task CreateCallbackAsync_NoSerializer_Throws() var idGen = new OperationIdGenerator(); var lambdaContext = new TestLambdaContext(); // no Serializer set var recorder = new RecordingBatcher(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext, recorder.Batcher); var ex = await Assert.ThrowsAsync(() => context.CreateCallbackAsync(name: "no-serializer")); diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs index 3aa182248..8d1d9d591 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ChildContextOperationTests.cs @@ -29,7 +29,7 @@ private static (DurableContext context, RecordingBatcher recorder, TerminationMa var lambdaContext = new TestLambdaContext { Serializer = new DefaultLambdaJsonSerializer() }; #pragma warning restore AWSLAMBDA001 var recorder = new RecordingBatcher(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext, recorder.Batcher); return (context, recorder, tm, state); } @@ -40,10 +40,10 @@ public async Task RunInChildContextAsync_FreshExecution_RunsFuncAndCheckpoints() var executed = false; var result = await context.RunInChildContextAsync( - async (childCtx) => + async (childCtx, _) => { executed = true; - return await childCtx.StepAsync(async (_) => { await Task.CompletedTask; return "inner"; }, name: "inner_step"); + return await childCtx.StepAsync(async (_, _) => { await Task.CompletedTask; return "inner"; }, name: "inner_step"); }, name: "phase"); @@ -76,10 +76,10 @@ public async Task RunInChildContextAsync_FreshExecution_ChildOperationIdsDetermi var (context, recorder, _, _) = CreateContext(); await context.RunInChildContextAsync( - async (childCtx) => + async (childCtx, _) => { - await childCtx.StepAsync(async (_) => { await Task.CompletedTask; return "a"; }, name: "first"); - await childCtx.StepAsync(async (_) => { await Task.CompletedTask; return "b"; }, name: "second"); + await childCtx.StepAsync(async (_, _) => { await Task.CompletedTask; return "a"; }, name: "first"); + await childCtx.StepAsync(async (_, _) => { await Task.CompletedTask; return "b"; }, name: "second"); return 0; }, name: "phase"); @@ -116,7 +116,7 @@ public async Task RunInChildContextAsync_ReplaySucceeded_ReturnsCachedAndDoesNot var executed = false; var result = await context.RunInChildContextAsync( - async (childCtx) => + async (childCtx, _) => { executed = true; await Task.CompletedTask; @@ -161,7 +161,7 @@ public async Task RunInChildContextAsync_ReplayFailed_ThrowsChildContextExceptio var ex = await Assert.ThrowsAsync(() => context.RunInChildContextAsync( - async (_) => { await Task.CompletedTask; return "should not run"; }, + async (_, _) => { await Task.CompletedTask; return "should not run"; }, name: "phase")); Assert.Equal("child went wrong", ex.Message); @@ -202,7 +202,7 @@ public async Task RunInChildContextAsync_ReplayFailed_AppliesErrorMapping() var ex = await Assert.ThrowsAsync(() => context.RunInChildContextAsync( - async (_) => { await Task.CompletedTask; return "x"; }, + async (_, _) => { await Task.CompletedTask; return "x"; }, name: "phase", config: new ChildContextConfig { @@ -223,7 +223,7 @@ public async Task RunInChildContextAsync_FuncThrows_CheckpointsFailAndThrows() var ex = await Assert.ThrowsAsync(() => context.RunInChildContextAsync( - async (_) => { await Task.CompletedTask; throw new InvalidOperationException("inner boom"); }, + async (_, _) => { await Task.CompletedTask; throw new InvalidOperationException("inner boom"); }, name: "phase")); Assert.Equal("inner boom", ex.Message); @@ -273,10 +273,10 @@ public async Task RunInChildContextAsync_InnerNonDeterminism_BubblesUpWithoutChe await Assert.ThrowsAsync(() => context.RunInChildContextAsync( - async (childCtx) => + async (childCtx, _) => { return await childCtx.StepAsync( - async (_) => { await Task.CompletedTask; return "x"; }, + async (_, _) => { await Task.CompletedTask; return "x"; }, name: "inner_step"); }, name: "phase")); @@ -292,7 +292,7 @@ public async Task RunInChildContextAsync_FuncThrows_AppliesErrorMapping() var ex = await Assert.ThrowsAsync(() => context.RunInChildContextAsync( - async (_) => { await Task.CompletedTask; throw new TimeoutException("inner timeout"); }, + async (_, _) => { await Task.CompletedTask; throw new TimeoutException("inner timeout"); }, name: "phase", config: new ChildContextConfig { @@ -315,7 +315,7 @@ public async Task RunInChildContextAsync_ChildSuspendsOnWait_TerminatesWithWaitS // wins on the termination signal; the test below short-circuits via // the same TerminationManager.IsTerminated check. var task = context.RunInChildContextAsync( - async (childCtx) => + async (childCtx, _) => { await childCtx.WaitAsync(TimeSpan.FromSeconds(5), name: "wait_inside"); return "should not return"; @@ -366,10 +366,10 @@ public async Task RunInChildContextAsync_ReplayStarted_ReExecutesFuncWithInnerCa var innerExecuted = false; var result = await context.RunInChildContextAsync( - async (childCtx) => + async (childCtx, _) => { return await childCtx.StepAsync( - async (_) => { innerExecuted = true; await Task.CompletedTask; return "fresh_inner"; }, + async (_, _) => { innerExecuted = true; await Task.CompletedTask; return "fresh_inner"; }, name: "inner_step"); }, name: "phase"); @@ -397,10 +397,10 @@ public async Task RunInChildContextAsync_VoidOverload_RunsAndCheckpoints() var executed = false; await context.RunInChildContextAsync( - async (childCtx) => + async (childCtx, _) => { await childCtx.StepAsync( - async (_) => { executed = true; await Task.CompletedTask; }, + async (_, _) => { executed = true; await Task.CompletedTask; }, name: "inner_void"); }, name: "phase"); @@ -444,7 +444,7 @@ public async Task RunInChildContextAsync_ReplayTypeMismatch_ThrowsNonDeterminist var ex = await Assert.ThrowsAsync(() => context.RunInChildContextAsync( - async (_) => { await Task.CompletedTask; return "x"; }, + async (_, _) => { await Task.CompletedTask; return "x"; }, name: "phase")); Assert.Contains("expected type 'CONTEXT'", ex.Message); @@ -471,7 +471,7 @@ public async Task RunInChildContextAsync_ReplayNameMismatch_ThrowsNonDeterminist var ex = await Assert.ThrowsAsync(() => context.RunInChildContextAsync( - async (_) => { await Task.CompletedTask; return "x"; }, + async (_, _) => { await Task.CompletedTask; return "x"; }, name: "new_name")); Assert.Contains("expected name 'new_name'", ex.Message); @@ -497,7 +497,7 @@ public async Task RunInChildContextAsync_ReplayUnknownStatus_ThrowsNonDeterminis await Assert.ThrowsAsync(() => context.RunInChildContextAsync( - async (_) => { await Task.CompletedTask; return "x"; }, + async (_, _) => { await Task.CompletedTask; return "x"; }, name: "phase")); } @@ -507,7 +507,7 @@ public async Task RunInChildContextAsync_SubTypeAndName_PropagateToCheckpoint() var (context, recorder, _, _) = CreateContext(); await context.RunInChildContextAsync( - async (_) => { await Task.CompletedTask; return "ok"; }, + async (_, _) => { await Task.CompletedTask; return "ok"; }, name: "phase", config: new ChildContextConfig { SubType = "WaitForCallback" }); diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs index 20411dbab..5c9e83193 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableContextTests.cs @@ -33,7 +33,7 @@ private static DurableContext CreateContext( var idGen = new OperationIdGenerator(); var lambdaContext = CreateLambdaContext(); - return new DurableContext(state, tm, idGen, "arn:aws:lambda:us-east-1:123:durable-execution:test", lambdaContext); + return new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:aws:lambda:us-east-1:123:durable-execution:test", lambdaContext); } #region StepAsync Tests @@ -44,7 +44,7 @@ public async Task StepAsync_NewExecution_RunsFunction() var context = CreateContext(); var executed = false; - var result = await context.StepAsync(async (_) => + var result = await context.StepAsync(async (_, _) => { executed = true; await Task.CompletedTask; @@ -73,7 +73,7 @@ public async Task StepAsync_Replay_ReturnsCachedResult() }); var executed = false; - var result = await context.StepAsync(async (_) => + var result = await context.StepAsync(async (_, _) => { executed = true; await Task.CompletedTask; @@ -111,7 +111,7 @@ public async Task StepAsync_ReplayFailed_ThrowsStepException() }); var ex = await Assert.ThrowsAsync(() => - context.StepAsync(async (_) => { await Task.CompletedTask; return "x"; }, name: "bad_step")); + context.StepAsync(async (_, _) => { await Task.CompletedTask; return "x"; }, name: "bad_step")); Assert.Equal("System.TimeoutException", ex.ErrorType); Assert.Equal("timed out", ex.Message); @@ -127,7 +127,7 @@ public async Task StepAsync_Throws_FailsWithStepException() var attempts = 0; await Assert.ThrowsAsync(() => - context.StepAsync(async (_) => + context.StepAsync(async (_, _) => { attempts++; await Task.CompletedTask; @@ -146,7 +146,7 @@ public async Task StepAsync_WithStepContext_ReceivesMetadata() int receivedAttempt = 0; Microsoft.Extensions.Logging.ILogger? receivedLogger = null; - await context.StepAsync(async (step) => + await context.StepAsync(async (step, _) => { receivedOpId = step.OperationId; receivedAttempt = step.AttemptNumber; @@ -166,7 +166,7 @@ public async Task StepAsync_VoidOverload_Works() var context = CreateContext(); var executed = false; - await context.StepAsync(async (_) => + await context.StepAsync(async (_, _) => { executed = true; await Task.CompletedTask; @@ -180,9 +180,9 @@ public async Task StepAsync_MultipleSteps_DeterministicIds() { var context = CreateContext(); - var r1 = await context.StepAsync(async (_) => { await Task.CompletedTask; return "a"; }, name: "first"); - var r2 = await context.StepAsync(async (_) => { await Task.CompletedTask; return "b"; }, name: "second"); - var r3 = await context.StepAsync(async (_) => { await Task.CompletedTask; return "c"; }); + var r1 = await context.StepAsync(async (_, _) => { await Task.CompletedTask; return "a"; }, name: "first"); + var r2 = await context.StepAsync(async (_, _) => { await Task.CompletedTask; return "b"; }, name: "second"); + var r3 = await context.StepAsync(async (_, _) => { await Task.CompletedTask; return "c"; }); Assert.Equal("a", r1); Assert.Equal("b", r2); @@ -207,7 +207,7 @@ public async Task StepAsync_ComplexType_SerializesCorrectly() }); var result = await context.StepAsync( - async (_) => { await Task.CompletedTask; return new TestPerson { Name = "Bob", Age = 25 }; }, + async (_, _) => { await Task.CompletedTask; return new TestPerson { Name = "Bob", Age = 25 }; }, name: "fetch"); Assert.Equal("Alice", result.Name); @@ -224,10 +224,10 @@ public async Task StepAsync_NoSerializerOnContext_ThrowsInvalidOperation() var tm = new TerminationManager(); var idGen = new OperationIdGenerator(); var lambdaContext = new TestLambdaContext(); // no Serializer set - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext); var ex = await Assert.ThrowsAsync(() => - context.StepAsync(async (_) => { await Task.CompletedTask; return "x"; }, name: "no_serializer")); + context.StepAsync(async (_, _) => { await Task.CompletedTask; return "x"; }, name: "no_serializer")); Assert.Contains("ILambdaSerializer", ex.Message); } @@ -322,7 +322,7 @@ public async Task StepAsync_Replay_NullResult_ReturnsDefault() }); var result = await context.StepAsync( - async (_) => { await Task.CompletedTask; return "fresh"; }, + async (_, _) => { await Task.CompletedTask; return "fresh"; }, name: "no_result"); Assert.Null(result); @@ -337,7 +337,7 @@ public async Task StepAsync_CancelledToken_ThrowsOperationCanceled() await Assert.ThrowsAnyAsync(() => context.StepAsync( - async (_) => + async (_, _) => { cts.Token.ThrowIfCancellationRequested(); await Task.CompletedTask; @@ -429,7 +429,7 @@ public async Task WaitAsync_StartedButNotExpired_ResuspendsWithoutNewCheckpoint( var idGen = new OperationIdGenerator(); var lambdaContext = CreateLambdaContext(); var recorder = new RecordingBatcher(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext, recorder.Batcher); var waitTask = context.WaitAsync(TimeSpan.FromSeconds(30), name: "pending_wait"); @@ -495,15 +495,15 @@ public async Task EndToEnd_StepWaitStep_FirstInvocation_SuspendsOnWait() state.LoadFromCheckpoint(null); var idGen = new OperationIdGenerator(); var lambdaContext = CreateLambdaContext(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext); var result = await DurableExecutionHandler.RunAsync( state, tm, async () => { - await context.StepAsync(async (_) => { await Task.CompletedTask; return "fetched"; }, name: "fetch"); + await context.StepAsync(async (_, _) => { await Task.CompletedTask; return "fetched"; }, name: "fetch"); await context.WaitAsync(TimeSpan.FromSeconds(30), name: "delay"); - var final = await context.StepAsync(async (_) => { await Task.CompletedTask; return "processed"; }, name: "process"); + var final = await context.StepAsync(async (_, _) => { await Task.CompletedTask; return "processed"; }, name: "process"); return final; }); @@ -539,20 +539,20 @@ public async Task EndToEnd_StepWaitStep_SecondInvocation_Completes() var idGen = new OperationIdGenerator(); var lambdaContext = CreateLambdaContext(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext); var processExecuted = false; var result = await DurableExecutionHandler.RunAsync( state, tm, async () => { - var fetched = await context.StepAsync(async (_) => { await Task.CompletedTask; return "fresh_fetch"; }, name: "fetch"); + var fetched = await context.StepAsync(async (_, _) => { await Task.CompletedTask; return "fresh_fetch"; }, name: "fetch"); Assert.Equal("fetched", fetched); // cached from replay await context.WaitAsync(TimeSpan.FromSeconds(30), name: "delay"); // wait is elapsed, continues - var final = await context.StepAsync(async (_) => + var final = await context.StepAsync(async (_, _) => { processExecuted = true; await Task.CompletedTask; @@ -589,11 +589,11 @@ public async Task StepAsync_ReplayTypeMismatch_ThrowsNonDeterministicException() var tm = new TerminationManager(); var idGen = new OperationIdGenerator(); var lambdaContext = CreateLambdaContext(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext); var ex = await Assert.ThrowsAsync(async () => await context.StepAsync( - async (_) => { await Task.CompletedTask; return "should not run"; }, + async (_, _) => { await Task.CompletedTask; return "should not run"; }, name: "my_op")); Assert.Contains("expected type 'STEP'", ex.Message); @@ -620,7 +620,7 @@ public async Task WaitAsync_ReplayTypeMismatch_ThrowsNonDeterministicException() var tm = new TerminationManager(); var idGen = new OperationIdGenerator(); var lambdaContext = CreateLambdaContext(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext); var ex = await Assert.ThrowsAsync(async () => await context.WaitAsync(TimeSpan.FromSeconds(10), name: "my_op")); @@ -652,11 +652,11 @@ public async Task StepAsync_ReplayNameMismatch_ThrowsNonDeterministicException() var tm = new TerminationManager(); var idGen = new OperationIdGenerator(); var lambdaContext = CreateLambdaContext(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext); var ex = await Assert.ThrowsAsync(async () => await context.StepAsync( - async (_) => { await Task.CompletedTask; return "new"; }, + async (_, _) => { await Task.CompletedTask; return "new"; }, name: "my_step")); Assert.Contains("expected name 'my_step'", ex.Message); @@ -669,7 +669,7 @@ public async Task StepAsync_NoReplay_SkipsValidation() var context = CreateContext(); var result = await context.StepAsync( - async (_) => { await Task.CompletedTask; return "ok"; }, + async (_, _) => { await Task.CompletedTask; return "ok"; }, name: "anything"); Assert.Equal("ok", result); @@ -694,10 +694,10 @@ public async Task StepAsync_FailsWithRetryStrategy_CheckpointsRetryAndSuspends() var idGen = new OperationIdGenerator(); var lambdaContext = CreateLambdaContext(); var recorder = new RecordingBatcher(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext, recorder.Batcher); var stepTask = context.StepAsync( - async (_) => { await Task.CompletedTask; throw new InvalidOperationException("transient"); }, + async (_, _) => { await Task.CompletedTask; throw new InvalidOperationException("transient"); }, name: "flaky_step", config: new StepConfig { @@ -730,7 +730,7 @@ public async Task StepAsync_FailsNoRetryStrategy_CheckpointsFail() var ex = await Assert.ThrowsAsync(() => context.StepAsync( - async (_) => { await Task.CompletedTask; throw new InvalidOperationException("permanent"); }, + async (_, _) => { await Task.CompletedTask; throw new InvalidOperationException("permanent"); }, name: "fail_step")); Assert.Equal("permanent", ex.Message); @@ -761,12 +761,12 @@ public async Task StepAsync_RetryExhausted_CheckpointsFail() var idGen = new OperationIdGenerator(); var lambdaContext = CreateLambdaContext(); var recorder = new RecordingBatcher(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext, recorder.Batcher); // Attempt 3 (last one) — should fail after this var ex = await Assert.ThrowsAsync(() => context.StepAsync( - async (_) => { await Task.CompletedTask; throw new InvalidOperationException("still failing"); }, + async (_, _) => { await Task.CompletedTask; throw new InvalidOperationException("still failing"); }, name: "exhaust_step", config: new StepConfig { @@ -809,10 +809,10 @@ public async Task StepAsync_PendingWithFutureTimestamp_Suspends() var idGen = new OperationIdGenerator(); var lambdaContext = CreateLambdaContext(); var recorder = new RecordingBatcher(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext, recorder.Batcher); var stepTask = context.StepAsync( - async (_) => { await Task.CompletedTask; return "should not run"; }, + async (_, _) => { await Task.CompletedTask; return "should not run"; }, name: "pending_step", config: new StepConfig { RetryStrategy = RetryStrategy.Default }); @@ -848,10 +848,10 @@ public async Task StepAsync_PendingWithPastTimestamp_ReExecutes() var tm = new TerminationManager(); var idGen = new OperationIdGenerator(); var lambdaContext = CreateLambdaContext(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext); var result = await context.StepAsync( - async (ctx) => + async (ctx, _) => { await Task.CompletedTask; Assert.Equal(2, ctx.AttemptNumber); @@ -886,11 +886,11 @@ public async Task StepAsync_ReadyReplay_AdvancesAttemptAndExecutes() var tm = new TerminationManager(); var idGen = new OperationIdGenerator(); var lambdaContext = CreateLambdaContext(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext); var executed = false; var result = await context.StepAsync( - async (ctx) => + async (ctx, _) => { executed = true; Assert.Equal(3, ctx.AttemptNumber); @@ -915,12 +915,12 @@ public async Task StepAsync_AtMostOnce_FlushesStartBeforeExecution() var idGen = new OperationIdGenerator(); var lambdaContext = CreateLambdaContext(); var recorder = new RecordingBatcher(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext, recorder.Batcher); IReadOnlyList? flushedAtFuncEntry = null; var result = await context.StepAsync( - async (_) => + async (_, _) => { flushedAtFuncEntry = recorder.Flushed.Select(o => o.Action.ToString()).ToArray(); await Task.CompletedTask; @@ -960,11 +960,11 @@ public async Task StepAsync_AtMostOnce_StartedReplay_TriggersRetryHandler() var idGen = new OperationIdGenerator(); var lambdaContext = CreateLambdaContext(); var recorder = new RecordingBatcher(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext, recorder.Batcher); var executed = false; var stepTask = context.StepAsync( - async (_) => { executed = true; await Task.CompletedTask; return "should not run"; }, + async (_, _) => { executed = true; await Task.CompletedTask; return "should not run"; }, name: "amo_replay", config: new StepConfig { diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs index 8078b0242..af38a4549 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/DurableFunctionTests.cs @@ -161,7 +161,7 @@ public async Task WrapAsync_VoidWorkflow_ReturnSucceeded() var output = await DurableFunction.WrapAsync( async (evt, ctx) => { - await ctx.StepAsync(async (_) => { await Task.CompletedTask; executed = true; }, name: "do_work"); + await ctx.StepAsync(async (_, _) => { await Task.CompletedTask; executed = true; }, name: "do_work"); }, input, CreateLambdaContext(), @@ -387,11 +387,11 @@ public async Task WrapAsync_PaginatedInitialState_HydratesAllPages() // without re-executing — if the loop missed a page, the corresponding step // would run fresh and append a different value to `observed`. observed.Add(await ctx.StepAsync( - async (_) => { await Task.CompletedTask; return "fresh"; }, name: "step1")); + async (_, _) => { await Task.CompletedTask; return "fresh"; }, name: "step1")); observed.Add(await ctx.StepAsync( - async (_) => { await Task.CompletedTask; return "fresh"; }, name: "step2")); + async (_, _) => { await Task.CompletedTask; return "fresh"; }, name: "step2")); observed.Add(await ctx.StepAsync( - async (_) => { await Task.CompletedTask; return "fresh"; }, name: "step3")); + async (_, _) => { await Task.CompletedTask; return "fresh"; }, name: "step3")); return new OrderResult { Status = "ok", OrderId = evt.OrderId }; }, input, @@ -573,7 +573,7 @@ private static AmazonServiceException MakeServiceException(string code, HttpStat private static async Task SingleStepWorkflow(OrderEvent input, IDurableContext context) { // One step succeed → forces a checkpoint flush, which the mock fails. - await context.StepAsync(async (_) => { await Task.CompletedTask; return "ok"; }, name: "s1"); + await context.StepAsync(async (_, _) => { await Task.CompletedTask; return "ok"; }, name: "s1"); return new OrderResult { Status = "done" }; } @@ -757,7 +757,7 @@ public async Task WrapAsync_ReplayDeterminism_CallbackIdStableAcrossInvocations( private static async Task MyWorkflow(OrderEvent input, IDurableContext context) { var validation = await context.StepAsync( - async (_) => { await Task.CompletedTask; return new ValidationResult { IsValid = true }; }, + async (_, _) => { await Task.CompletedTask; return new ValidationResult { IsValid = true }; }, name: "validate"); await context.WaitAsync(TimeSpan.FromSeconds(30), name: "delay"); diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/InvokeOperationTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/InvokeOperationTests.cs index eb8b7a757..daf933cb5 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/InvokeOperationTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/InvokeOperationTests.cs @@ -27,7 +27,7 @@ private static (DurableContext context, RecordingBatcher recorder, TerminationMa var lambdaContext = new TestLambdaContext { Serializer = new DefaultLambdaJsonSerializer() }; #pragma warning restore AWSLAMBDA001 var recorder = new RecordingBatcher(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext, recorder.Batcher); return (context, recorder, tm, state); } @@ -189,7 +189,7 @@ public async Task InvokeAsync_NoSerializerRegistered_ThrowsInvalidOperationExcep var idGen = new OperationIdGenerator(); var lambdaContext = new TestLambdaContext(); // no serializer! var recorder = new RecordingBatcher(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext, recorder.Batcher); await Assert.ThrowsAsync(() => context.InvokeAsync(FunctionArn, "x", name: "no_serializer")); @@ -479,16 +479,16 @@ public async Task EndToEnd_StepInvokeStep_FirstInvocation_SuspendsOnInvoke() var lambdaContext = new TestLambdaContext { Serializer = new DefaultLambdaJsonSerializer() }; #pragma warning restore AWSLAMBDA001 var batcher = new RecordingBatcher(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, batcher.Batcher); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext, batcher.Batcher); var result = await DurableExecutionHandler.RunAsync( state, tm, async () => { - await context.StepAsync(async (_) => { await Task.CompletedTask; return "validated"; }, name: "validate"); + await context.StepAsync(async (_, _) => { await Task.CompletedTask; return "validated"; }, name: "validate"); var paymentId = await context.InvokeAsync( FunctionArn, "validated", name: "process_payment"); - return await context.StepAsync(async (_) => { await Task.CompletedTask; return paymentId + "-done"; }, name: "finalize"); + return await context.StepAsync(async (_, _) => { await Task.CompletedTask; return paymentId + "-done"; }, name: "finalize"); }); Assert.Equal(InvocationStatus.Pending, result.Status); @@ -530,21 +530,21 @@ public async Task EndToEnd_StepInvokeStep_SecondInvocation_ResumesAndCompletes() #pragma warning disable AWSLAMBDA001 var lambdaContext = new TestLambdaContext { Serializer = new DefaultLambdaJsonSerializer() }; #pragma warning restore AWSLAMBDA001 - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext); var finalizeRan = false; var result = await DurableExecutionHandler.RunAsync( state, tm, async () => { - var validated = await context.StepAsync(async (_) => { await Task.CompletedTask; return "fresh-validated"; }, name: "validate"); + var validated = await context.StepAsync(async (_, _) => { await Task.CompletedTask; return "fresh-validated"; }, name: "validate"); Assert.Equal("validated", validated); // cached var paymentId = await context.InvokeAsync( FunctionArn, validated, name: "process_payment"); Assert.Equal("pmt-42", paymentId); // cached - return await context.StepAsync(async (_) => + return await context.StepAsync(async (_, _) => { finalizeRan = true; await Task.CompletedTask; diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForCallbackTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForCallbackTests.cs index 430df41c5..3596eb856 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForCallbackTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForCallbackTests.cs @@ -32,7 +32,7 @@ private static (DurableContext context, RecordingBatcher recorder, TerminationMa var idGen = new OperationIdGenerator(); var lambdaContext = CreateLambdaContext(); var recorder = new RecordingBatcher(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext, recorder.Batcher); return (context, recorder, tm, state); } @@ -69,7 +69,7 @@ public async Task WaitForCallbackAsync_FreshExecution_RunsSubmitterAndSuspendsFo string? receivedCallbackId = null; var resultTask = context.WaitForCallbackAsync( - async (callbackId, ctx) => + async (callbackId, ctx, _) => { receivedCallbackId = callbackId; Assert.NotNull(ctx.Logger); @@ -106,7 +106,7 @@ public async Task WaitForCallbackAsync_FreshExecution_KebabSuffixedSubOpNames() WireServiceCallbackIdAllocation(recorder, state, "cb-1"); var resultTask = context.WaitForCallbackAsync( - async (_, _) => await Task.CompletedTask, + async (_, _, _) => await Task.CompletedTask, name: "approval"); await Task.WhenAny(resultTask, tm.TerminationTask); @@ -129,7 +129,7 @@ public async Task WaitForCallbackAsync_FreshExecution_NullParentName_LeavesSubOp WireServiceCallbackIdAllocation(recorder, state, "cb-1"); var resultTask = context.WaitForCallbackAsync( - async (_, _) => await Task.CompletedTask); + async (_, _, _) => await Task.CompletedTask); await Task.WhenAny(resultTask, tm.TerminationTask); await recorder.Batcher.DrainAsync(); @@ -150,7 +150,7 @@ public async Task WaitForCallbackAsync_ChildOperationIdsDeterministic() WireServiceCallbackIdAllocation(recorder, state, "cb-1"); var resultTask = context.WaitForCallbackAsync( - async (_, _) => await Task.CompletedTask, + async (_, _, _) => await Task.CompletedTask, name: "approval"); await Task.WhenAny(resultTask, tm.TerminationTask); @@ -177,7 +177,7 @@ public async Task WaitForCallbackAsync_CallbackTimeoutInheritsFromConfig() WireServiceCallbackIdAllocation(recorder, state, "cb-1"); var resultTask = context.WaitForCallbackAsync( - async (_, _) => await Task.CompletedTask, + async (_, _, _) => await Task.CompletedTask, name: "approval", config: new WaitForCallbackConfig { @@ -219,7 +219,7 @@ public async Task WaitForCallbackAsync_ReplayWithCallbackSucceeded_ReturnsResult var executed = false; var result = await context.WaitForCallbackAsync( - async (_, _) => { executed = true; await Task.CompletedTask; }, + async (_, _, _) => { executed = true; await Task.CompletedTask; }, name: "approval"); Assert.False(executed); // Replay returns cached without re-running submitter. @@ -278,7 +278,7 @@ public async Task WaitForCallbackAsync_ReplayCallbackTimedOut_ThrowsCallbackTime var ex = await Assert.ThrowsAsync(() => context.WaitForCallbackAsync( - async (_, _) => await Task.CompletedTask, + async (_, _, _) => await Task.CompletedTask, name: "approval")); Assert.Equal("callback timed out", ex.Message); @@ -335,7 +335,7 @@ public async Task WaitForCallbackAsync_ReplayCallbackFailed_ThrowsCallbackFailed var ex = await Assert.ThrowsAsync(() => context.WaitForCallbackAsync( - async (_, _) => await Task.CompletedTask, + async (_, _, _) => await Task.CompletedTask, name: "approval")); Assert.Equal("external rejected", ex.Message); @@ -374,7 +374,7 @@ public async Task WaitForCallbackAsync_SubmitterFails_ThrowsCallbackSubmitterExc var ex = await Assert.ThrowsAsync(() => context.WaitForCallbackAsync( - async (_, _) => await Task.CompletedTask, + async (_, _, _) => await Task.CompletedTask, name: "approval")); Assert.IsAssignableFrom(ex); @@ -421,7 +421,7 @@ public async Task WaitForCallbackAsync_ReplayParentContextFailedWithCallbackTime var ex = await Assert.ThrowsAsync(() => context.WaitForCallbackAsync( - async (_, _) => await Task.CompletedTask, + async (_, _, _) => await Task.CompletedTask, name: "approval")); // Concrete-type check: not just `is CallbackException` — must be the @@ -462,7 +462,7 @@ public async Task WaitForCallbackAsync_ReplayParentContextFailedWithCallbackFail var ex = await Assert.ThrowsAsync(() => context.WaitForCallbackAsync( - async (_, _) => await Task.CompletedTask, + async (_, _, _) => await Task.CompletedTask, name: "approval")); Assert.Equal(typeof(CallbackFailedException), ex.GetType()); @@ -479,7 +479,7 @@ public async Task WaitForCallbackAsync_RetryStrategyForwardedToSubmitterStep() var seenAttempts = new List(); var resultTask = context.WaitForCallbackAsync( - async (_, ctx) => + async (_, ctx, _) => { // The submitter receives an IWaitForCallbackContext (no AttemptNumber) // — but this test doesn't need to verify retry mechanics, only @@ -514,7 +514,7 @@ public async Task WaitForCallbackAsync_SubmitterContext_IsIWaitForCallbackContex Type? observedContextType = null; var resultTask = context.WaitForCallbackAsync( - async (_, ctx) => + async (_, ctx, _) => { observedContextType = ctx.GetType(); await Task.CompletedTask; diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForConditionOperationTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForConditionOperationTests.cs index 6d355c47a..50f7557b3 100644 --- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForConditionOperationTests.cs +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WaitForConditionOperationTests.cs @@ -28,7 +28,7 @@ private static (DurableContext context, RecordingBatcher recorder, TerminationMa var idGen = new OperationIdGenerator(); var lambdaContext = CreateLambdaContext(serializer); var recorder = new RecordingBatcher(); - var context = new DurableContext(state, tm, idGen, "arn:test", lambdaContext, recorder.Batcher); + var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext, recorder.Batcher); return (context, recorder, tm, state); } @@ -44,7 +44,7 @@ public async Task FreshExecution_StrategyStopsImmediately_SucceedsWithFinalState // success path with no polling iterations. int checkInvocations = 0; var result = await context.WaitForConditionAsync( - check: async (state, ctx) => + check: async (state, ctx, _) => { checkInvocations++; Assert.Equal(checkInvocations, ctx.AttemptNumber); @@ -81,7 +81,7 @@ public async Task FreshExecution_StrategyContinues_EmitsRetryAndSuspends() // Strategy says continue → operation must emit RETRY and suspend. var task = context.WaitForConditionAsync( - check: async (state, _) => { await Task.CompletedTask; return state + 1; }, + check: async (state, _, _) => { await Task.CompletedTask; return state + 1; }, config: new WaitForConditionConfig { InitialState = 0, @@ -113,7 +113,7 @@ public async Task FreshExecution_UsesInitialStateOnFirstCall() int? observedInitial = null; await context.WaitForConditionAsync( - check: async (state, _) => + check: async (state, _, _) => { observedInitial ??= state; await Task.CompletedTask; @@ -136,7 +136,7 @@ public async Task FreshExecution_AttemptNumberIs1OnFirstCall() int observed = -1; await context.WaitForConditionAsync( - check: async (state, ctx) => + check: async (state, ctx, _) => { observed = ctx.AttemptNumber; await Task.CompletedTask; @@ -161,7 +161,7 @@ public async Task CheckContext_ExposesLogger() ILogger? observedLogger = null; await context.WaitForConditionAsync( - check: async (state, ctx) => + check: async (state, ctx, _) => { observedLogger = ctx.Logger; await Task.CompletedTask; @@ -199,7 +199,7 @@ public async Task Replay_Succeeded_ReturnsCachedAndSkipsCheck() var checkInvoked = false; var result = await context.WaitForConditionAsync( - check: async (_, _) => { checkInvoked = true; await Task.CompletedTask; return 0; }, + check: async (_, _, _) => { checkInvoked = true; await Task.CompletedTask; return 0; }, config: new WaitForConditionConfig { InitialState = 0, @@ -244,7 +244,7 @@ public async Task Replay_PendingTimerNotFired_ReSuspends() var checkInvoked = false; var task = context.WaitForConditionAsync( - check: async (_, _) => { checkInvoked = true; await Task.CompletedTask; return 0; }, + check: async (_, _, _) => { checkInvoked = true; await Task.CompletedTask; return 0; }, config: new WaitForConditionConfig { InitialState = 0, @@ -293,7 +293,7 @@ public async Task Replay_PendingTimerFired_ResumesWithCheckpointedState() int? observedState = null; int? observedAttempt = null; var result = await context.WaitForConditionAsync( - check: async (state, ctx) => + check: async (state, ctx, _) => { observedState = state; observedAttempt = ctx.AttemptNumber; @@ -346,7 +346,7 @@ public async Task Replay_Ready_ResumesWithCheckpointedState() int? observedState = null; int? observedAttempt = null; var result = await context.WaitForConditionAsync( - check: async (state, ctx) => + check: async (state, ctx, _) => { observedState = state; observedAttempt = ctx.AttemptNumber; @@ -392,7 +392,7 @@ public async Task Replay_Started_ResumesWithInitialState() int? observedState = null; int? observedAttempt = null; var result = await context.WaitForConditionAsync( - check: async (state, ctx) => + check: async (state, ctx, _) => { observedState = state; observedAttempt = ctx.AttemptNumber; @@ -444,7 +444,7 @@ public async Task Replay_Failed_FromCheckException_ThrowsStepException() var ex = await Assert.ThrowsAsync(() => context.WaitForConditionAsync( - check: async (_, _) => { await Task.CompletedTask; return 0; }, + check: async (_, _, _) => { await Task.CompletedTask; return 0; }, config: new WaitForConditionConfig { InitialState = 0, @@ -490,7 +490,7 @@ public async Task Replay_Failed_FromMaxAttempts_ThrowsWaitForConditionException( var ex = await Assert.ThrowsAsync(() => context.WaitForConditionAsync( - check: async (_, _) => { await Task.CompletedTask; return 0; }, + check: async (_, _, _) => { await Task.CompletedTask; return 0; }, config: new WaitForConditionConfig { InitialState = 0, @@ -513,7 +513,7 @@ public async Task Replay_Failed_FromMaxAttempts_LastState_MatchesLiveExecution() var liveEx = await Assert.ThrowsAsync(() => liveCtx.WaitForConditionAsync( - check: async (state, _) => { await Task.CompletedTask; return state + 1; }, + check: async (state, _, _) => { await Task.CompletedTask; return state + 1; }, config: new WaitForConditionConfig { InitialState = 5, @@ -555,7 +555,7 @@ public async Task Replay_Failed_FromMaxAttempts_LastState_MatchesLiveExecution() var replayEx = await Assert.ThrowsAsync(() => replayCtx.WaitForConditionAsync( - check: async (_, _) => { await Task.CompletedTask; return 0; }, + check: async (_, _, _) => { await Task.CompletedTask; return 0; }, config: new WaitForConditionConfig { InitialState = 0, @@ -601,7 +601,7 @@ public async Task Replay_Failed_FromMaxAttempts_NullPayload_LeavesLastStateNull( var ex = await Assert.ThrowsAsync(() => context.WaitForConditionAsync( - check: async (_, _) => { await Task.CompletedTask; return 0; }, + check: async (_, _, _) => { await Task.CompletedTask; return 0; }, config: new WaitForConditionConfig { InitialState = 0, @@ -625,7 +625,7 @@ public async Task MaxAttemptsExhausted_FreshExecution_ThrowsWaitForConditionExce // condition was met. Operation must throw, not SUCCEED. var ex = await Assert.ThrowsAsync(() => context.WaitForConditionAsync( - check: async (state, _) => { await Task.CompletedTask; return state + 1; }, + check: async (state, _, _) => { await Task.CompletedTask; return state + 1; }, config: new WaitForConditionConfig { InitialState = 5, @@ -658,7 +658,7 @@ public async Task MaxAttemptsExhausted_DistinguishesFromConditionMet() // The same maxAttempts=1 strategy WITH an isDone that's satisfied // should SUCCEED, not throw. var result = await context.WaitForConditionAsync( - check: async (_, _) => { await Task.CompletedTask; return 99; }, + check: async (_, _, _) => { await Task.CompletedTask; return 99; }, config: new WaitForConditionConfig { InitialState = 0, @@ -681,7 +681,7 @@ public async Task CheckThrows_CheckpointsFailAndThrows() var ex = await Assert.ThrowsAsync(() => context.WaitForConditionAsync( - check: async (_, _) => { await Task.CompletedTask; throw new InvalidOperationException("boom"); }, + check: async (_, _, _) => { await Task.CompletedTask; throw new InvalidOperationException("boom"); }, config: new WaitForConditionConfig { InitialState = 0, @@ -735,7 +735,7 @@ public async Task ReplayDeterminism_StateIsCarriedAcrossIterations() CounterState? observed = null; int? observedAttempt = null; var result = await context.WaitForConditionAsync( - check: async (state, ctx) => + check: async (state, ctx, _) => { observed = state; observedAttempt = ctx.AttemptNumber; @@ -786,7 +786,7 @@ public async Task ReplayDeterminism_RoundTripsThroughLambdaSerializer() serializer: serializer); var result = await context.WaitForConditionAsync( - check: async (_, _) => { await Task.CompletedTask; return new TestPerson { Name = "ignored", Age = 0 }; }, + check: async (_, _, _) => { await Task.CompletedTask; return new TestPerson { Name = "ignored", Age = 0 }; }, config: new WaitForConditionConfig { InitialState = new TestPerson { Name = "init", Age = 0 }, @@ -810,7 +810,7 @@ public async Task FreshExecution_FlushesStartBeforeSuspending() var (context, recorder, tm, _) = CreateContext(); var task = context.WaitForConditionAsync( - check: async (state, _) => { await Task.CompletedTask; return state + 1; }, + check: async (state, _, _) => { await Task.CompletedTask; return state + 1; }, config: new WaitForConditionConfig { InitialState = 0, @@ -852,7 +852,7 @@ public async Task ReplayUnknownStatus_ThrowsNonDeterministicException() await Assert.ThrowsAsync(() => context.WaitForConditionAsync( - check: async (_, _) => { await Task.CompletedTask; return 0; }, + check: async (_, _, _) => { await Task.CompletedTask; return 0; }, config: new WaitForConditionConfig { InitialState = 0, @@ -882,7 +882,7 @@ public async Task ReplayTypeMismatch_ThrowsNonDeterministicException() var ex = await Assert.ThrowsAsync(() => context.WaitForConditionAsync( - check: async (_, _) => { await Task.CompletedTask; return 0; }, + check: async (_, _, _) => { await Task.CompletedTask; return 0; }, config: new WaitForConditionConfig { InitialState = 0, @@ -915,7 +915,7 @@ public async Task NullConfig_ThrowsArgumentNullException() var (context, _, _, _) = CreateContext(); await Assert.ThrowsAsync(() => context.WaitForConditionAsync( - check: async (_, _) => { await Task.CompletedTask; return 0; }, + check: async (_, _, _) => { await Task.CompletedTask; return 0; }, config: null!)); } @@ -948,11 +948,12 @@ public async Task DeserializeStateOrInitial_CorruptPayload_LogsWarningAndFallsBa var recorder = new RecordingBatcher(); var logger = new RecordingLogger(); + var tm = new TerminationManager(); var op = new WaitForConditionOperation( operationId: IdAt(1), name: "poll", parentId: null, - check: async (s, _) => { await Task.CompletedTask; return s; }, + check: async (s, _, _) => { await Task.CompletedTask; return s; }, config: new WaitForConditionConfig { InitialState = 999, @@ -961,7 +962,8 @@ public async Task DeserializeStateOrInitial_CorruptPayload_LogsWarningAndFallsBa serializer: new ThrowingLambdaSerializer(), logger: logger, state: state, - termination: new TerminationManager(), + termination: tm, + workflowCancellation: new WorkflowCancellation(tm), durableExecutionArn: "arn:test", batcher: recorder.Batcher); @@ -1008,11 +1010,12 @@ public async Task ReplayFailed_CorruptLastStatePayload_LogsWarningAndLastStateNu var recorder = new RecordingBatcher(); var logger = new RecordingLogger(); + var tm = new TerminationManager(); var op = new WaitForConditionOperation( operationId: IdAt(1), name: "poll", parentId: null, - check: async (s, _) => { await Task.CompletedTask; return s; }, + check: async (s, _, _) => { await Task.CompletedTask; return s; }, config: new WaitForConditionConfig { InitialState = 0, @@ -1021,7 +1024,8 @@ public async Task ReplayFailed_CorruptLastStatePayload_LogsWarningAndLastStateNu serializer: new ThrowingLambdaSerializer(), logger: logger, state: state, - termination: new TerminationManager(), + termination: tm, + workflowCancellation: new WorkflowCancellation(tm), durableExecutionArn: "arn:test", batcher: recorder.Batcher); diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WorkflowCancellationTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WorkflowCancellationTests.cs new file mode 100644 index 000000000..4b1f04cdc --- /dev/null +++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/WorkflowCancellationTests.cs @@ -0,0 +1,275 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +using Amazon.Lambda.DurableExecution.Internal; +using Amazon.Lambda.Serialization.SystemTextJson; +using Amazon.Lambda.TestUtilities; +using Xunit; + +namespace Amazon.Lambda.DurableExecution.Tests; + +/// +/// Cancellation-flow tests for and the +/// linked-token contract surfaced through . +/// Companion to +/// Libraries/src/Amazon.Lambda.DurableExecution/docs/design/cancellation-design.md. +/// +public class WorkflowCancellationTests +{ + private static TestLambdaContext CreateLambdaContext() => +#pragma warning disable AWSLAMBDA001 // TestLambdaContext.Serializer is experimental. + new() { Serializer = new DefaultLambdaJsonSerializer() }; +#pragma warning restore AWSLAMBDA001 + + private sealed record Harness( + DurableContext Context, + TerminationManager Termination, + WorkflowCancellation WorkflowCancellation, + RecordingBatcher Recorder); + + private static Harness CreateHarness() + { + var state = new ExecutionState(); + var tm = new TerminationManager(); + var wfc = new WorkflowCancellation(tm); + var idGen = new OperationIdGenerator(); + var recorder = new RecordingBatcher(); + var ctx = new DurableContext(state, tm, wfc, idGen, "arn:test", CreateLambdaContext(), recorder.Batcher); + return new Harness(ctx, tm, wfc, recorder); + } + + // ── WorkflowCancellation primitive ────────────────────────────────── + + [Fact] + public void Token_NotCancelled_BeforeTermination() + { + var tm = new TerminationManager(); + using var wfc = new WorkflowCancellation(tm); + + Assert.False(wfc.Token.IsCancellationRequested); + } + + [Fact] + public async Task Token_CancelledWhenTerminationFires() + { + var tm = new TerminationManager(); + using var wfc = new WorkflowCancellation(tm); + var observed = new TaskCompletionSource(); + wfc.Token.Register(() => observed.TrySetResult()); + + tm.Terminate(TerminationReason.WaitScheduled); + + await observed.Task.WaitAsync(TimeSpan.FromSeconds(2)); + Assert.True(wfc.Token.IsCancellationRequested); + } + + [Fact] + public void Dispose_AfterTermination_DoesNotThrow() + { + var tm = new TerminationManager(); + var wfc = new WorkflowCancellation(tm); + tm.Terminate(TerminationReason.WaitScheduled); + wfc.Dispose(); + } + + // ── StepAsync token plumbing ──────────────────────────────────────── + + [Fact] + public async Task StepAsync_CallerToken_PropagatesIntoFunc() + { + // Cancel AFTER the func has started — pre-cancellation would short-circuit + // in StepOperation.ExecuteFunc's ThrowIfCancellationRequested before the + // user body runs and we'd never observe the propagation. + var harness = CreateHarness(); + using var caller = new CancellationTokenSource(); + var entered = new TaskCompletionSource(); + + var task = harness.Context.StepAsync(async (_, ct) => + { + entered.TrySetResult(); + // Block on the linked token; if the caller's cancel propagates into + // ct via the linked CTS, this throws. + await Task.Delay(Timeout.Infinite, ct); + return 0; + }, name: "step", cancellationToken: caller.Token); + + await entered.Task.WaitAsync(TimeSpan.FromSeconds(2)); + caller.Cancel(); + + await Assert.ThrowsAsync(() => task); + } + + [Fact] + public async Task StepAsync_LinkedToken_FiresWhenWorkflowCancels() + { + var harness = CreateHarness(); + var enteredFunc = new TaskCompletionSource(); + CancellationToken stepToken = default; + + var task = harness.Context.StepAsync(async (_, ct) => + { + stepToken = ct; + enteredFunc.TrySetResult(); + await Task.Delay(Timeout.Infinite, ct); + return 0; + }, name: "step"); + + await enteredFunc.Task.WaitAsync(TimeSpan.FromSeconds(2)); + harness.Termination.Terminate(TerminationReason.WaitScheduled); + + await Assert.ThrowsAsync(() => task); + Assert.True(stepToken.IsCancellationRequested); + } + + [Fact] + public async Task StepAsync_UserThrownOCE_IsTreatedAsFailureAndRetried() + { + // A user-thrown OperationCanceledException unrelated to our linked token + // falls through the cancellation when-clause and is funneled through + // the retry strategy like any other exception. + var harness = CreateHarness(); + var attempts = 0; + + var ex = await Assert.ThrowsAsync(() => + harness.Context.StepAsync(async (_, _) => + { + attempts++; + await Task.CompletedTask; + throw new OperationCanceledException("user-thrown, unrelated to SDK token"); + }, name: "step")); + + Assert.Equal(1, attempts); + Assert.Equal(typeof(OperationCanceledException).FullName, ex.ErrorType); + Assert.Contains(harness.Recorder.Flushed, + u => u.Action == OperationAction.FAIL && u.Type == OperationTypes.Step); + } + + [Fact] + public async Task StepAsync_CancellationViaLinkedToken_DoesNotCheckpointFailOrSucceed() + { + var harness = CreateHarness(); + var entered = new TaskCompletionSource(); + + var task = harness.Context.StepAsync(async (_, ct) => + { + entered.TrySetResult(); + await Task.Delay(Timeout.Infinite, ct); + return 0; + }, name: "step"); + + await entered.Task.WaitAsync(TimeSpan.FromSeconds(2)); + harness.Termination.Terminate(TerminationReason.WaitScheduled); + + await Assert.ThrowsAsync(() => task); + + // No FAIL/SUCCEED checkpoint emitted (only any START fire-and-forget that + // may have flushed under AtLeastOncePerRetry semantics). + Assert.DoesNotContain(harness.Recorder.Flushed, u => u.Action == OperationAction.FAIL); + Assert.DoesNotContain(harness.Recorder.Flushed, u => u.Action == OperationAction.SUCCEED); + } + + // ── Child context propagation ─────────────────────────────────────── + + [Fact] + public async Task RunInChildContextAsync_LinkedToken_CancelsInnerStep() + { + var harness = CreateHarness(); + var entered = new TaskCompletionSource(); + CancellationToken childToken = default; + CancellationToken stepToken = default; + + var task = harness.Context.RunInChildContextAsync(async (childCtx, ct) => + { + childToken = ct; + return await childCtx.StepAsync(async (_, stepCt) => + { + stepToken = stepCt; + entered.TrySetResult(); + await Task.Delay(Timeout.Infinite, stepCt); + return 0; + }, name: "inner"); + }, name: "outer"); + + await entered.Task.WaitAsync(TimeSpan.FromSeconds(2)); + harness.Termination.Terminate(TerminationReason.WaitScheduled); + + await Assert.ThrowsAsync(() => task); + Assert.True(childToken.IsCancellationRequested); + Assert.True(stepToken.IsCancellationRequested); + } + + // ── WaitForConditionAsync ─────────────────────────────────────────── + + [Fact] + public async Task WaitForConditionAsync_CheckReceivesLinkedToken() + { + var harness = CreateHarness(); + var entered = new TaskCompletionSource(); + CancellationToken seen = default; + + var task = harness.Context.WaitForConditionAsync(async (state, _, ct) => + { + seen = ct; + entered.TrySetResult(); + await Task.Delay(Timeout.Infinite, ct); + return state; + }, + new WaitForConditionConfig + { + InitialState = 0, + WaitStrategy = WaitStrategy.Fixed(TimeSpan.FromSeconds(1)), + }, + name: "poll"); + + await entered.Task.WaitAsync(TimeSpan.FromSeconds(2)); + harness.Termination.Terminate(TerminationReason.WaitScheduled); + + await Assert.ThrowsAsync(() => task); + Assert.True(seen.IsCancellationRequested); + } + + // ── Replay short-circuit ──────────────────────────────────────────── + + [Fact] + public async Task StepAsync_Replay_DoesNotInvokeFunc_EvenWithCancelledToken() + { + // Cached SUCCESS replay must short-circuit without calling the user + // Func, regardless of token state — replay determinism is structural. + var operationId = OperationIdGenerator.HashOperationId("1"); + var state = new ExecutionState(); + state.LoadFromCheckpoint(new InitialExecutionState + { + Operations = new List + { + new() + { + Id = operationId, + Type = OperationTypes.Step, + Status = OperationStatuses.Succeeded, + Name = "step", + StepDetails = new StepDetails { Result = "42" } + } + } + }); + + var tm = new TerminationManager(); + var wfc = new WorkflowCancellation(tm); + tm.Terminate(TerminationReason.WaitScheduled); // cancel before invocation + await Task.Yield(); + Assert.True(wfc.Token.IsCancellationRequested); + + var idGen = new OperationIdGenerator(); + var ctx = new DurableContext(state, tm, wfc, idGen, "arn:test", CreateLambdaContext()); + var invoked = false; + + var result = await ctx.StepAsync(async (_, _) => + { + invoked = true; + await Task.CompletedTask; + return 99; + }, name: "step"); + + Assert.False(invoked); + Assert.Equal(42, result); + } +}