Add ParallelAsync for concurrent branch execution (DOTNET-8662)#2375
Add ParallelAsync for concurrent branch execution (DOTNET-8662)#2375GarrettBeatty wants to merge 15 commits into
Conversation
|
|
||
| COPY bin/publish/ ${LAMBDA_TASK_ROOT} | ||
|
|
||
| ENTRYPOINT ["/var/task/bootstrap"] |
|
|
||
| COPY bin/publish/ ${LAMBDA_TASK_ROOT} | ||
|
|
||
| ENTRYPOINT ["/var/task/bootstrap"] |
|
|
||
| COPY bin/publish/ ${LAMBDA_TASK_ROOT} | ||
|
|
||
| ENTRYPOINT ["/var/task/bootstrap"] |
|
|
||
| COPY bin/publish/ ${LAMBDA_TASK_ROOT} | ||
|
|
||
| ENTRYPOINT ["/var/task/bootstrap"] |
|
|
||
| COPY bin/publish/ ${LAMBDA_TASK_ROOT} | ||
|
|
||
| ENTRYPOINT ["/var/task/bootstrap"] |
19c0128 to
fa13eef
Compare
464c591 to
d308c3b
Compare
fa13eef to
b7a06b4
Compare
stack-info: PR: #2361, branch: GarrettBeatty/stack/3
d308c3b to
be4c3ad
Compare
b7a06b4 to
08b2095
Compare
ad4d208 to
3acbed5
Compare
646b841 to
4d97473
Compare
4d97473 to
8a6c41c
Compare
8664e8c to
e4da00c
Compare
e4da00c to
9485866
Compare
9485866 to
cff0b86
Compare
There was a problem hiding this comment.
Pull request overview
Adds a new durable “fan-out” capability to Amazon.Lambda.DurableExecution via IDurableContext.ParallelAsync, enabling concurrent branch execution with configurable completion policies and concurrency limits, while extending replay/state handling to support multi-threaded branch orchestration.
Changes:
- Introduces
ParallelAsyncpublic API (+ supporting types likeParallelConfig,CompletionConfig,IBatchResult<T>,DurableBranch<T>, andParallelException). - Implements
Internal/ParallelOperation<T>orchestration and makesExecutionStatethread-safe for concurrent branch execution/replay. - Adds extensive unit tests plus multiple end-to-end integration test functions for determinism, throttling, and failure policy behavior.
Reviewed changes
Copilot reviewed 45 out of 45 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| Libraries/test/Amazon.Lambda.DurableExecution.Tests/ParallelOperationTests.cs | New unit tests covering ParallelAsync behavior (completion modes, determinism, cancellation, replay). |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelReplayDeterminismFunction/ParallelReplayDeterminismFunction.csproj | New integration test function project for replay determinism. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelReplayDeterminismFunction/Function.cs | Workflow that validates replay determinism across suspend/resume under parallel branches. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelReplayDeterminismFunction/Dockerfile | Container packaging for the replay determinism test function. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelPartialFailureFunction/ParallelPartialFailureFunction.csproj | New integration test function project for partial failure behavior. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelPartialFailureFunction/Function.cs | Workflow demonstrating AllCompleted with per-branch errors surfaced in the result. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelPartialFailureFunction/Dockerfile | Container packaging for the partial failure test function. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelMaxConcurrencyFunction/ParallelMaxConcurrencyFunction.csproj | New integration test function project for max concurrency throttling. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelMaxConcurrencyFunction/Function.cs | Workflow that uses durable waits to validate max-concurrency throttling. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelMaxConcurrencyFunction/Dockerfile | Container packaging for the max concurrency test function. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelHappyPathFunction/ParallelHappyPathFunction.csproj | New integration test function project for the basic happy path. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelHappyPathFunction/Function.cs | Happy-path parallel workflow returning joined results. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelHappyPathFunction/Dockerfile | Container packaging for the happy path test function. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelFirstSuccessfulFunction/ParallelFirstSuccessfulFunction.csproj | New integration test function project for FirstSuccessful behavior. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelFirstSuccessfulFunction/Function.cs | Workflow exercising FirstSuccessful with staggered durable waits. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelFirstSuccessfulFunction/Dockerfile | Container packaging for the FirstSuccessful test function. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelFailureToleranceFunction/ParallelFailureToleranceFunction.csproj | New integration test function project for failure tolerance exceeded. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelFailureToleranceFunction/Function.cs | Workflow that triggers ParallelException via tolerated failure count. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/ParallelFailureToleranceFunction/Dockerfile | Container packaging for the failure tolerance test function. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ParallelReplayDeterminismTest.cs | End-to-end test asserting deterministic branch IDs and replayed step outputs. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ParallelPartialFailureTest.cs | End-to-end test asserting AllCompleted surfaces partial failures without failing workflow. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ParallelMaxConcurrencyTest.cs | End-to-end test asserting max concurrency throttling via timestamp clustering. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ParallelHappyPathTest.cs | End-to-end happy-path test asserting correct service-side context history. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ParallelFirstSuccessfulTest.cs | End-to-end test asserting short-circuit semantics and winner reporting. |
| Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/ParallelFailureToleranceTest.cs | End-to-end test asserting workflow fails with ParallelException on exceeded tolerance. |
| Libraries/src/Amazon.Lambda.DurableExecution/ParallelConfig.cs | New configuration type for parallel execution (concurrency, completion, nesting). |
| Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs | Adds wire subtypes for Parallel parent and ParallelBranch checkpoints. |
| Libraries/src/Amazon.Lambda.DurableExecution/NestingType.cs | New enum describing checkpoint graph representation for parallel/map branches. |
| Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelSummary.cs | Internal checkpoint payload schema for parallel parent summary. |
| Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelOperation.cs | Core implementation of durable parallel orchestration + replay reconstruction. |
| Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelJsonContext.cs | Source-generated STJ context for AOT-safe serialization of internal summary payload. |
| Libraries/src/Amazon.Lambda.DurableExecution/Internal/ExecutionState.cs | Makes execution state thread-safe via a single lock for concurrent branch access. |
| Libraries/src/Amazon.Lambda.DurableExecution/Internal/BatchResult.cs | Default internal implementation of IBatchResult<T> views and helpers. |
| Libraries/src/Amazon.Lambda.DurableExecution/Internal/BatchItem.cs | Default internal implementation of IBatchItem<T>. |
| Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs | Public interface additions for ParallelAsync overloads. |
| Libraries/src/Amazon.Lambda.DurableExecution/IBatchResult.cs | New public batch result interfaces used by parallel/map. |
| Libraries/src/Amazon.Lambda.DurableExecution/IBatchItem.cs | New public per-branch/item interface surfaced by batch results. |
| Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs | Adds ParallelException carrying completion reason and batch result. |
| Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs | Implements ParallelAsync and refactors child-context factory creation. |
| Libraries/src/Amazon.Lambda.DurableExecution/DurableBranch.cs | New public record representing a named parallel branch. |
| Libraries/src/Amazon.Lambda.DurableExecution/CompletionReason.cs | New enum describing why a batch resolved. |
| Libraries/src/Amazon.Lambda.DurableExecution/CompletionConfig.cs | New completion policy type with factories (AllSuccessful/AllCompleted/FirstSuccessful). |
| Libraries/src/Amazon.Lambda.DurableExecution/BatchItemStatus.cs | New enum describing per-branch outcome status (Succeeded/Failed/Started). |
| Docs/durable-execution-design.md | Documentation updates reflecting new parallel types and failure-percentage config. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// </summary> | ||
| /// <remarks> | ||
| /// Not yet implemented in the .NET SDK; passing this value throws | ||
| /// <see cref="System.NotSupportedException"/>. |
There was a problem hiding this comment.
will do in another pr
1a1d5bc to
69364a0
Compare
|
Important
The switch from ConcurrentDictionary to a single lock is well-reasoned (compound operations in LoadFromCheckpoint need atomicity). Each critical section is O(1). For the branch
The design explicitly states: once dispatched, branches are never cancelled — even after a MinSuccessful or FailureToleranceExceeded short-circuit fires. This guarantees replay
public IBatchResult? Result { get; init; } Since ParallelException isn't generic, the result is boxed behind the non-generic IBatchResult marker. Callers who catch ParallelException need to cast: |
fixed number 2 only, the rest are already documented 3179531 |
| @@ -0,0 +1,15 @@ | |||
| namespace Amazon.Lambda.DurableExecution.Internal; | |||
| @@ -0,0 +1,80 @@ | |||
| namespace Amazon.Lambda.DurableExecution.Internal; | |||
| @@ -0,0 +1,15 @@ | |||
| using System.Text.Json.Serialization; | |||
| @@ -0,0 +1,654 @@ | |||
| using System.IO; | |||
| @@ -0,0 +1,31 @@ | |||
| using System.Text.Json.Serialization; | |||
| @@ -0,0 +1,72 @@ | |||
| using System.Linq; | |||
| @@ -0,0 +1,76 @@ | |||
| using System.Linq; | |||
| @@ -0,0 +1,74 @@ | |||
| using System.Linq; | |||
| @@ -0,0 +1,122 @@ | |||
| using System.Linq; | |||
| @@ -0,0 +1,1163 @@ | |||
| using Amazon.Lambda.DurableExecution; | |||
| /// <remarks> | ||
| /// <see cref="NestingType.Flat"/> is not yet supported in the .NET SDK and | ||
| /// will throw <see cref="System.NotSupportedException"/> when the parallel | ||
| /// operation is invoked. |
| /// <see cref="CompletionReason.FailureToleranceExceeded"/> does the parallel | ||
| /// throw. | ||
| /// </remarks> | ||
| internal sealed class ParallelOperation<T> : DurableOperation<IBatchResult<T>> |
There was a problem hiding this comment.
this whole class is going to get refactored later on in #2408 to support mapasync as well
| <Project Sdk="Microsoft.NET.Sdk"> | ||
|
|
||
| <PropertyGroup> | ||
| <TargetFramework>net8.0</TargetFramework> |
There was a problem hiding this comment.
All test projects should target .NET 10
| { | ||
| try | ||
| { | ||
| await Task.WhenAll(inFlight).ConfigureAwait(false); |
There was a problem hiding this comment.
As we discussed on the side. We need a mechanism to communicate to the inflight branches that they should cleanly shut down. For example provide them their own cancellation token that we cancel from here.
| { | ||
| try | ||
| { | ||
| await Task.WhenAll(inFlight).ConfigureAwait(false); |
There was a problem hiding this comment.
@normj i think in your example on the call, you were thinking something like
new ParallelConfig
{
MaxConcurrency = 10,
CompletionConfig = new CompletionConfig { MinSuccessful = 2 }
}
in this case the short circuit would not fire and we would await all 10 tasks.
but in the case like
new ParallelConfig
{
MaxConcurrency = 3,
CompletionConfig = new CompletionConfig { MinSuccessful = 2 }
}
When does MinSuccessful actually save work?
Lower the concurrency window. Same 10 branches, MaxConcurrency = 3, MinSuccessful = 2:
| Time | In flight | Done | succeeded |
Action |
|---|---|---|---|---|
| t0 | branches 0,1,2 | – | 0 | semaphore full; dispatch loop blocked on WaitAsync |
| t1 | branches 1,2,3 | 0 | 1 | branch 0 finishes → releases slot → loop wakes, dispatches branch 3 |
| t2 | branches 2,3 | 0,1 | 2 | branch 1 finishes → ShouldStopDispatching is true → release slot, break out of dispatch loop |
| t3 | – | 0,1,2,3 | 2+ | Task.WhenAll settles the two in-flight branches (2 and 3) |
| end | – | 0,1,2,3 | – | branches 4–9 were never dispatched; they surface as Started |
Result: 4 Succeeded, 6 Started, CompletionReason = MinSuccessfulReached — six branches' worth of compute saved.
There was a problem hiding this comment.
and as far as cancellation token goes - i think you were saying we can still cancel the inflight ones. but my only thing i would need to think about is if cancellation -> FAILED status. i think in other places (i would need to double check), we decided cancellation is different than having it failed.
i think theres some edge cases where if we just cancel the task, and some checkpoints are written already (and we just cancel it without it letting write the final checkpoints) then the replay will be corrupted
There was a problem hiding this comment.
But given the default of MaxConcurrency is the number of branches I suspect we will get more task running at a time then MinSuccessful.
You also have the problem which is probably more common that if you have an exception in one of the branches and the configuration is to break if any exceptions have occurred we still wait for all inflight branches even though they won't matter because we are going to return a failure back.
628ebff to
c3162e0
Compare
c3162e0 to
c88ad8a
Compare
#2216
What
Adds parallel branch execution to
Amazon.Lambda.DurableExecution.ParallelAsyncruns N branches concurrently with configurable concurrency limits and completion policies, returning anIBatchResult<T>with per-branch status and error information. The sharedIBatchResult<T>family is reused byMapAsyncin Wave 2.Public API:
IDurableContext.ParallelAsync<T>(Func[], ...)IDurableContext.ParallelAsync<T>(DurableBranch<T>[], ...)DurableBranch<T>(Name, Func)ParallelConfigMaxConcurrency,CompletionConfig,NestingType.CompletionConfigAllSuccessful(),FirstSuccessful(),AllCompleted(). ValidatedMinSuccessful/ToleratedFailureCount/ToleratedFailurePercentage(0.0–1.0).IBatchResult<T>All/Succeeded/Failed/Started,GetResults,GetErrors,ThrowIfError,HasFailure,CompletionReason, count properties.IBatchItem<T>Index,Name,Status,Result,Error).BatchItemStatusSucceeded/Failed/Started.CompletionReasonAllCompleted/MinSuccessfulReached/FailureToleranceExceeded.NestingTypeNested(default);Flatreserved for a follow-up PR (throwsNotSupportedExceptiontoday).ParallelExceptionCompletionConfigsignalsFailureToleranceExceeded; carries theIBatchResult<T>.Per-branch checkpoint payloads are serialized via the
ILambdaSerializerregistered onILambdaContext.Serializer— same pattern asStepAsync/RunInChildContextAsyncfrom #2370. There are no separate reflection / AOT-safe overload pairs: the AOT story is determined entirely by which serializer the user registers with the runtime (e.g.,SourceGeneratorLambdaJsonSerializer<TContext>).Testing
31 new unit tests in
ParallelOperationTests.csand supporting fixtures:CompletionConfigmatrix:AllSuccessful,AllCompleted,FirstSuccessful,MinSuccessful,ToleratedFailureCount,ToleratedFailurePercentage— both pass and fail thresholds.MaxConcurrencyenforced via semaphore; unbounded when null; cancel-mid-dispatch leaves no orphan branches.ExecutionStateaccess regression test (parallel writers do not corrupt the visited-set).SUCCEEDED+FAILED+STARTED),FirstSuccessfulwith all-fail, named vs. unnamed branches.IBatchResult<T>accessors andGetResults/GetErrors/ThrowIfErrorsemantics.NestingType.FlatthrowsNotSupportedException(placeholder for follow-up).6 new integration tests build successfully (require AWS credentials to run): happy path, max-concurrency, first-successful, partial-failure, failure-tolerance, and replay-determinism.