Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .autover/changes/110c67f8-15d8-44b5-beae-6f1223933c27.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"Projects": [
{
"Name": "Amazon.Lambda.DurableExecution",
"Type": "Patch",
Comment thread
GarrettBeatty marked this conversation as resolved.
"ChangelogMessages": [
"Thread CancellationToken into every user Func accepted by IDurableContext (StepAsync, RunInChildContextAsync, WaitForCallbackAsync, WaitForConditionAsync). The token links the caller-supplied cancellation token with an SDK-owned workflow-shutdown signal so user step bodies unwind cleanly when the workflow is being torn down. Cancellation via the linked token is not checkpointed; user-thrown OperationCanceledException unrelated to the linked token continues to be treated as a normal step failure."
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace Amazon.Lambda.DurableExecution;
/// Base exception type for callback failures surfaced from
/// <see cref="ICallback{T}.GetResultAsync(System.Threading.CancellationToken)"/>
/// or
/// <see cref="IDurableContext.WaitForCallbackAsync{T}(System.Func{string, IWaitForCallbackContext, System.Threading.Tasks.Task}, string?, WaitForCallbackConfig?, System.Threading.CancellationToken)"/>.
/// <see cref="IDurableContext.WaitForCallbackAsync{T}(System.Func{string, IWaitForCallbackContext, System.Threading.CancellationToken, System.Threading.Tasks.Task}, string?, WaitForCallbackConfig?, System.Threading.CancellationToken)"/>.
/// Concrete subclasses distinguish failure modes — pattern-match
/// <see cref="CallbackFailedException"/>, <see cref="CallbackTimeoutException"/>,
/// or <see cref="CallbackSubmitterException"/> in <c>catch</c> clauses.
Expand Down Expand Up @@ -71,7 +71,7 @@ public CallbackTimeoutException(string message, Exception innerException) : base

/// <summary>
/// Thrown only from
/// <see cref="IDurableContext.WaitForCallbackAsync{T}(System.Func{string, IWaitForCallbackContext, System.Threading.Tasks.Task}, string?, WaitForCallbackConfig?, System.Threading.CancellationToken)"/>
/// <see cref="IDurableContext.WaitForCallbackAsync{T}(System.Func{string, IWaitForCallbackContext, System.Threading.CancellationToken, System.Threading.Tasks.Task}, string?, WaitForCallbackConfig?, System.Threading.CancellationToken)"/>
/// when the user-supplied submitter delegate (the step that hands the callback
/// ID to the external system) fails after retries are exhausted. Wraps the
/// underlying <see cref="StepException"/> as <see cref="System.Exception.InnerException"/>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Amazon.Lambda.DurableExecution;
/// <remarks>
/// A child context is a logical sub-workflow with its own deterministic
/// operation-ID space, persisted as a <c>CONTEXT</c> operation. Use
/// <see cref="IDurableContext.RunInChildContextAsync{T}(System.Func{IDurableContext, System.Threading.Tasks.Task{T}}, string?, ChildContextConfig?, System.Threading.CancellationToken)"/>
/// <see cref="IDurableContext.RunInChildContextAsync{T}(System.Func{IDurableContext, System.Threading.CancellationToken, System.Threading.Tasks.Task{T}}, string?, ChildContextConfig?, System.Threading.CancellationToken)"/>
/// (and overloads) to run code inside one.
/// </remarks>
public sealed class ChildContextConfig
Expand Down
55 changes: 34 additions & 21 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal sealed class DurableContext : IDurableContext
{
private readonly ExecutionState _state;
private readonly TerminationManager _terminationManager;
private readonly WorkflowCancellation _workflowCancellation;
private readonly OperationIdGenerator _idGenerator;
private readonly string _durableExecutionArn;
private readonly CheckpointBatcher? _batcher;
Expand All @@ -24,13 +25,15 @@ internal sealed class DurableContext : IDurableContext
public DurableContext(
ExecutionState state,
TerminationManager terminationManager,
WorkflowCancellation workflowCancellation,
OperationIdGenerator idGenerator,
string durableExecutionArn,
ILambdaContext lambdaContext,
CheckpointBatcher? batcher = null)
{
_state = state;
_terminationManager = terminationManager;
_workflowCancellation = workflowCancellation;
_idGenerator = idGenerator;
_durableExecutionArn = durableExecutionArn;
_batcher = batcher;
Expand All @@ -55,14 +58,14 @@ public void ConfigureLogger(LoggerConfig config)
}

public Task<T> StepAsync<T>(
Func<IStepContext, Task<T>> func,
Func<IStepContext, CancellationToken, Task<T>> func,
string? name = null,
StepConfig? config = null,
CancellationToken cancellationToken = default)
=> RunStep(func, name, config, cancellationToken);

public async Task StepAsync(
Func<IStepContext, Task> func,
Func<IStepContext, CancellationToken, Task> func,
string? name = null,
StepConfig? config = null,
CancellationToken cancellationToken = default)
Expand All @@ -71,12 +74,12 @@ public async Task StepAsync(
// step that always returns null. The serializer isn't actually invoked
// with a non-null value, so any registered ILambdaSerializer suffices.
await RunStep<object?>(
async (ctx) => { await func(ctx); return null; },
async (ctx, ct) => { await func(ctx, ct); return null; },
name, config, cancellationToken);
}

private Task<T> RunStep<T>(
Func<IStepContext, Task<T>> func,
Func<IStepContext, CancellationToken, Task<T>> func,
string? name,
StepConfig? config,
CancellationToken cancellationToken)
Expand All @@ -86,7 +89,7 @@ private Task<T> RunStep<T>(
var operationId = _idGenerator.NextId();
var op = new StepOperation<T>(
operationId, name, _idGenerator.ParentId, func, config, serializer, Logger,
_state, _terminationManager, _durableExecutionArn, _batcher);
_state, _terminationManager, _workflowCancellation, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}

Expand Down Expand Up @@ -114,14 +117,14 @@ public Task WaitAsync(
}

public Task<T> RunInChildContextAsync<T>(
Func<IDurableContext, Task<T>> func,
Func<IDurableContext, CancellationToken, Task<T>> func,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default)
=> RunChildContext(func, name, config, cancellationToken);

public async Task RunInChildContextAsync(
Func<IDurableContext, Task> func,
Func<IDurableContext, CancellationToken, Task> func,
string? name = null,
ChildContextConfig? config = null,
CancellationToken cancellationToken = default)
Expand All @@ -130,12 +133,12 @@ public async Task RunInChildContextAsync(
// returns null so the registered ILambdaSerializer is never asked to
// serialize a real value.
await RunChildContext<object?>(
async (ctx) => { await func(ctx); return null; },
async (ctx, ct) => { await func(ctx, ct); return null; },
name, config, cancellationToken);
}

public Task<TState> WaitForConditionAsync<TState>(
Func<TState, IConditionCheckContext, Task<TState>> check,
Func<TState, IConditionCheckContext, CancellationToken, Task<TState>> check,
WaitForConditionConfig<TState> config,
string? name = null,
CancellationToken cancellationToken = default)
Expand All @@ -148,12 +151,12 @@ public Task<TState> WaitForConditionAsync<TState>(
var operationId = _idGenerator.NextId();
var op = new WaitForConditionOperation<TState>(
operationId, name, _idGenerator.ParentId, check, config, serializer, Logger,
_state, _terminationManager, _durableExecutionArn, _batcher);
_state, _terminationManager, _workflowCancellation, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}

private Task<T> RunChildContext<T>(
Func<IDurableContext, Task<T>> func,
Func<IDurableContext, CancellationToken, Task<T>> func,
string? name,
ChildContextConfig? config,
CancellationToken cancellationToken)
Expand All @@ -163,16 +166,16 @@ private Task<T> RunChildContext<T>(
var operationId = _idGenerator.NextId();

// Capture this DurableContext's collaborators; the child shares state,
// termination, batcher, ARN, and Lambda context — but uses a child
// OperationIdGenerator so its operation IDs are deterministically
// namespaced under the parent op ID.
// termination, workflow cancellation, batcher, ARN, and Lambda context —
// but uses a child OperationIdGenerator so its operation IDs are
// deterministically namespaced under the parent op ID.
IDurableContext ChildFactory(string parentOpId) => new DurableContext(
_state, _terminationManager, _idGenerator.CreateChild(parentOpId),
_state, _terminationManager, _workflowCancellation, _idGenerator.CreateChild(parentOpId),
_durableExecutionArn, LambdaContext, _batcher);

var op = new ChildContextOperation<T>(
operationId, name, _idGenerator.ParentId, func, config, serializer, ChildFactory,
_state, _terminationManager, _durableExecutionArn, _batcher);
_state, _terminationManager, _workflowCancellation, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}

Expand All @@ -197,7 +200,7 @@ private Task<ICallback<T>> RunCallback<T>(
}

public Task<T> WaitForCallbackAsync<T>(
Func<string, IWaitForCallbackContext, Task> submitter,
Func<string, IWaitForCallbackContext, CancellationToken, Task> submitter,
string? name = null,
WaitForCallbackConfig? config = null,
CancellationToken cancellationToken = default)
Expand All @@ -218,7 +221,7 @@ public Task<T> WaitForCallbackAsync<T>(
/// </para>
/// </remarks>
private Task<T> RunWaitForCallback<T>(
Func<string, IWaitForCallbackContext, Task> submitter,
Func<string, IWaitForCallbackContext, CancellationToken, Task> submitter,
string? name,
WaitForCallbackConfig? config,
CancellationToken cancellationToken)
Expand All @@ -240,19 +243,29 @@ private Task<T> RunWaitForCallback<T>(
// StepAsync calls each pull the registered ILambdaSerializer from
// ILambdaContext.Serializer, so AOT and reflection-based scenarios share
// the same code path.
//
// Pass the OUTER cancellationToken (not childCtx's linked token) into the
// inner operations. Each inner operation will re-link the caller's token
// with the workflow-shutdown CTS itself when it invokes its user Func, so
// the submitter still observes both signals. Threading the already-linked
// childToken through here would propagate the workflow-shutdown signal
// into the inner operations' checkpoint writes (EnqueueAsync uses the
// cancellationToken parameter directly), which would risk lost START /
// SUCCEED checkpoints when termination fires mid-flush. See §7 of
// docs/design/cancellation-design.md.
return RunInChildContextAsync<T>(
async childCtx =>
async (childCtx, _) =>
{
var callback = await childCtx.CreateCallbackAsync<T>(
name: callbackName,
config: callbackConfig,
cancellationToken: cancellationToken);

await childCtx.StepAsync(
async (stepCtx) =>
async (stepCtx, stepToken) =>
{
var submitterCtx = new WaitForCallbackContext(stepCtx.Logger);
await submitter(callback.CallbackId, submitterCtx);
await submitter(callback.CallbackId, submitterCtx, stepToken);
},
name: submitterName,
config: stepConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ private static async Task<DurableExecutionInvocationOutput> WrapAsyncCore<TInput

var userPayload = ExtractUserPayload<TInput>(invocationInput, serializer);
var terminationManager = new TerminationManager();
using var workflowCancellation = new WorkflowCancellation(terminationManager);
var idGenerator = new OperationIdGenerator();

await using var batcher = new CheckpointBatcher(
Expand All @@ -108,7 +109,7 @@ private static async Task<DurableExecutionInvocationOutput> WrapAsyncCore<TInput
cancellationToken: ct));

var context = new DurableContext(
state, terminationManager, idGenerator,
state, terminationManager, workflowCancellation, idGenerator,
invocationInput.DurableExecutionArn, lambdaContext, batcher);

HandlerResult<TOutput> result;
Expand Down
Loading