diff --git a/Docs/durable-execution-design.md b/Docs/durable-execution-design.md
index c1d1e8474..c5c4da089 100644
--- a/Docs/durable-execution-design.md
+++ b/Docs/durable-execution-design.md
@@ -1443,22 +1443,21 @@ public class MapConfig
public int? MaxConcurrency { get; set; }
///
- /// When to consider the operation complete.
+ /// When to consider the operation complete. Defaults to AllCompleted() —
+ /// every item runs regardless of per-item failures, which surface via
+ /// IBatchResult<T>.Failed rather than throwing. This permissive default
+ /// matches the Python and Java SDKs' map operation. It differs intentionally
+ /// from ParallelConfig.CompletionConfig, which defaults to AllSuccessful()
+ /// (fail-fast). For fail-fast map behavior, set this to
+ /// CompletionConfig.AllSuccessful() or call IBatchResult<T>.ThrowIfError().
///
- public CompletionConfig CompletionConfig { get; set; } = CompletionConfig.AllSuccessful();
+ public CompletionConfig CompletionConfig { get; set; } = CompletionConfig.AllCompleted();
///
/// How item branches are represented in the checkpoint graph.
///
public NestingType NestingType { get; set; } = NestingType.Nested;
- ///
- /// Optional batching configuration for grouping items before processing.
- /// When set, items are grouped into batches and each batch is processed as a unit.
- /// Reduces checkpoint overhead for large collections.
- ///
- public ItemBatcher? Batcher { get; set; }
-
///
/// Optional function to generate a custom name for each item's branch.
/// Improves observability in execution traces. Receives the item and its index.
@@ -1467,23 +1466,6 @@ public class MapConfig
public Func? ItemNamer { get; set; }
}
-///
-/// Groups items into batches for map operations to reduce checkpoint overhead.
-/// At least one of MaxItemsPerBatch or MaxBytesPerBatch must be set.
-///
-public class ItemBatcher
-{
- ///
- /// Maximum number of items per batch. Null = no count limit.
- ///
- public int? MaxItemsPerBatch { get; set; }
-
- ///
- /// Maximum serialized size (bytes) per batch. Null = no size limit.
- ///
- public int? MaxBytesPerBatch { get; set; }
-}
-
///
/// Defines completion criteria for parallel/map operations.
///
@@ -2222,7 +2204,6 @@ All four SDKs expose the same core operations. The differences are naming conven
| Jitter strategy | `JitterStrategy` enum on `Exponential()` | `jitter_strategy` on `RetryStrategyConfig` | `jitter` on `createRetryStrategy()` |
| Retry presets | `RetryStrategy.None/Default/Transient` | `RetryPresets.none()/default()/transient()` | `retryPresets.default/linear/noRetry` |
| Nesting type | `NestingType` on `ParallelConfig`/`MapConfig` | `NestingType` on parallel/map config | `NestingType` on parallel/map config |
-| Item batching | `ItemBatcher` on `MapConfig` | `ItemBatcher` on `MapConfig` | *(checkpoint manager handles batching)* |
| Item namer | `ItemNamer` on `MapConfig` | Item naming function on `MapConfig` | `itemNamer` on `MapConfig` |
| Error mapping | `ErrorMapping` on `ChildContextConfig` | *(typed exception wrapping)* | `errorMapping` on child context config |
| Message-based retry filter | `retryableMessagePatterns` (regex) | `retryable_errors` (regex) | `retryableErrors` (RegExp[]) |
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
index 5ebad8aa4..6e2b2af2a 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
@@ -260,6 +260,41 @@ private Task> RunParallel(
return op.ExecuteAsync(cancellationToken);
}
+ public Task> MapAsync(
+ IReadOnlyList items,
+ Func, Task> func,
+ string? name = null,
+ MapConfig? config = null,
+ CancellationToken cancellationToken = default)
+ => RunMap(items, func, name, config, cancellationToken);
+
+ private Task> RunMap(
+ IReadOnlyList items,
+ Func, Task> func,
+ string? name,
+ MapConfig? config,
+ CancellationToken cancellationToken)
+ {
+ if (items == null) throw new ArgumentNullException(nameof(items));
+ if (func == null) throw new ArgumentNullException(nameof(func));
+
+ var effectiveConfig = config ?? new MapConfig();
+ if (effectiveConfig.NestingType == NestingType.Flat)
+ {
+ throw new NotSupportedException(
+ "NestingType.Flat is not yet supported in the .NET Durable Execution SDK. " +
+ "Use NestingType.Nested (the default) for now.");
+ }
+
+ var serializer = LambdaSerializerHelper.GetRequired(LambdaContext);
+
+ var operationId = _idGenerator.NextId();
+ var op = new Internal.MapOperation(
+ operationId, name, _idGenerator.ParentId, items, func, effectiveConfig, serializer, MakeChildFactory(),
+ _state, _terminationManager, _workflowCancellation, _durableExecutionArn, _batcher);
+ return op.ExecuteAsync(cancellationToken);
+ }
+
public Task WaitForCallbackAsync(
Func submitter,
string? name = null,
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs b/Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs
index 1b65c86b3..e4748b381 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs
@@ -131,3 +131,36 @@ public ParallelException(string message) : base(message) { }
/// Creates a wrapping an inner exception.
public ParallelException(string message, Exception innerException) : base(message, innerException) { }
}
+
+///
+/// Thrown when a map operation resolves with
+/// . The aggregate
+/// is preserved on so callers
+/// can inspect per-item outcomes.
+///
+///
+/// This is the base type for map failures. Subclasses may be added in future
+/// releases; catching remains forward-compatible.
+/// A dedicated type (rather than reusing ) lets
+/// callers pattern-match which concurrent operation failed.
+///
+public class MapException : DurableExecutionException
+{
+ ///
+ /// The aggregate result of the map operation. Type-erased — cast to
+ /// IBatchResult<T> if the per-item result type is known.
+ ///
+ public IBatchResult? Result { get; init; }
+
+ ///
+ /// Why the map operation resolved.
+ ///
+ public CompletionReason CompletionReason { get; init; }
+
+ /// Creates an empty .
+ public MapException() { }
+ /// Creates a with the given message.
+ public MapException(string message) : base(message) { }
+ /// Creates a wrapping an inner exception.
+ public MapException(string message, Exception innerException) : base(message, innerException) { }
+}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs
index 3677b5acf..e59e400de 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs
@@ -444,6 +444,35 @@ Task> ParallelAsync(
string? name = null,
ParallelConfig? config = null,
CancellationToken cancellationToken = default);
+
+ ///
+ /// Process a collection of items concurrently, running
+ /// once per item. Each item runs inside its own child context; per-item
+ /// results are aggregated into an . Items
+ /// are dispatched up to ; the aggregate
+ /// resolves according to .
+ ///
+ ///
+ /// The per-item function receives the durable context, the item, its
+ /// zero-based index, and the full source list (matching the Python and
+ /// JavaScript SDKs). On per-item failure (the user function throws), the
+ /// failure is captured on the corresponding
+ /// instead of aborting the map. By default
+ /// ( ) every item runs and failures
+ /// surface via ; the map throws
+ /// only when
+ /// criteria are violated. Use
+ /// for explicit
+ /// strict-success semantics. Per-item results are serialized to checkpoints
+ /// using the registered on
+ /// .
+ ///
+ Task> MapAsync(
+ IReadOnlyList items,
+ Func, Task> func,
+ string? name = null,
+ MapConfig? config = null,
+ CancellationToken cancellationToken = default);
}
///
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/BatchJsonContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/BatchJsonContext.cs
new file mode 100644
index 000000000..d2bfeb32f
--- /dev/null
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/BatchJsonContext.cs
@@ -0,0 +1,15 @@
+using System.Text.Json.Serialization;
+
+namespace Amazon.Lambda.DurableExecution.Internal;
+
+///
+/// AOT-friendly for the internal
+/// payload stored on a concurrent operation's parent
+/// CONTEXT checkpoint (parallel or map). Only this internal type — never user T —
+/// flows through here, so the source-generated metadata is sufficient.
+///
+[JsonSerializable(typeof(BatchSummary))]
+[JsonSerializable(typeof(BatchUnitSummary))]
+internal sealed partial class BatchJsonContext : JsonSerializerContext
+{
+}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/BatchSummary.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/BatchSummary.cs
new file mode 100644
index 000000000..1e58e9654
--- /dev/null
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/BatchSummary.cs
@@ -0,0 +1,33 @@
+using System.Text.Json.Serialization;
+
+namespace Amazon.Lambda.DurableExecution.Internal;
+
+///
+/// Internal payload shape stored on a concurrent operation's parent CONTEXT
+/// checkpoint (as ContextDetails.Result ) and reconstructed on replay.
+/// Shared by both and
+/// : carries the completion reason and
+/// the per-unit index → status map so the can be
+/// rebuilt without depending on user T shape — per-unit results live on the
+/// children's own checkpoints.
+///
+internal sealed class BatchSummary
+{
+ [JsonPropertyName("CompletionReason")]
+ public string? CompletionReason { get; set; }
+
+ [JsonPropertyName("Units")]
+ public IList Units { get; set; } = new List();
+}
+
+internal sealed class BatchUnitSummary
+{
+ [JsonPropertyName("Index")]
+ public int Index { get; set; }
+
+ [JsonPropertyName("Name")]
+ public string? Name { get; set; }
+
+ [JsonPropertyName("Status")]
+ public string? Status { get; set; }
+}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ConcurrentOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ConcurrentOperation.cs
new file mode 100644
index 000000000..a4a7a1741
--- /dev/null
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ConcurrentOperation.cs
@@ -0,0 +1,710 @@
+// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+using System.IO;
+using System.Text;
+using System.Text.Json;
+using Amazon.Lambda;
+using Amazon.Lambda.Core;
+using SdkErrorObject = Amazon.Lambda.Model.ErrorObject;
+using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate;
+
+namespace Amazon.Lambda.DurableExecution.Internal;
+
+///
+/// Shared orchestration base for the concurrent durable operations
+/// ( and ).
+/// Runs N user-supplied units concurrently (each as a
+/// ) under a shared
+/// and concurrency limit, persisting the
+/// aggregate result so subsequent invocations replay it without re-executing.
+///
+///
+/// Subclasses supply only what differs between Parallel and Map — the unit count,
+/// how to obtain a unit's (name, func) , the parent/child sub-type labels,
+/// and the failure-exception factory. All concurrency, completion, checkpoint, and
+/// replay logic lives here.
+///
+/// Fresh : no prior state → sync-flush parent CONTEXT START →
+/// dispatch units respecting MaxConcurrency → wait for in-flight to
+/// complete after CompletionConfig short-circuit → emit parent CONTEXT
+/// SUCCEED with summary payload ( ).
+/// SUCCEEDED : parent payload supplies the snapshot of per-unit
+/// statuses + completion reason; per-unit results are deserialised from the
+/// children's own CONTEXT checkpoints.
+/// FAILED : same reconstruction; throws the subclass exception
+/// carrying the rebuilt .
+/// STARTED / PENDING : re-execute (children replay from their
+/// own checkpoints).
+///
+/// Per-unit errors do NOT abort the operation directly — the orchestrator catches
+/// each unit's , records it as a failed
+/// , and consults the
+/// after every completion. Only when the completion config marks the run as
+/// does it throw.
+///
+internal abstract class ConcurrentOperation : DurableOperation>
+{
+ private readonly CompletionPolicy _policy;
+ private readonly int? _maxConcurrency;
+ private readonly WorkflowCancellation _workflowCancellation;
+
+ /// Serializer used to deserialize per-unit child results on replay.
+ protected readonly ILambdaSerializer Serializer;
+
+ /// Factory used to build each unit's inner child context.
+ protected readonly Func ChildContextFactory;
+
+ protected ConcurrentOperation(
+ string operationId,
+ string? name,
+ string? parentId,
+ CompletionConfig completionConfig,
+ int? maxConcurrency,
+ ILambdaSerializer serializer,
+ Func childContextFactory,
+ ExecutionState state,
+ TerminationManager termination,
+ WorkflowCancellation workflowCancellation,
+ string durableExecutionArn,
+ CheckpointBatcher? batcher = null)
+ : base(operationId, name, parentId, state, termination, durableExecutionArn, batcher)
+ {
+ _policy = new CompletionPolicy(completionConfig);
+ _maxConcurrency = maxConcurrency;
+ _workflowCancellation = workflowCancellation;
+ Serializer = serializer;
+ ChildContextFactory = childContextFactory;
+ }
+
+ protected override string OperationType => OperationTypes.Context;
+
+ // ── Subclass hooks ──────────────────────────────────────────────────
+
+ /// The number of units (branches or items) to execute.
+ protected abstract int UnitCount { get; }
+
+ /// Parent CONTEXT sub-type label (e.g. Parallel / Map).
+ protected abstract string ParentSubType { get; }
+
+ /// Per-unit child-context sub-type label (e.g. ParallelBranch / MapItem).
+ protected abstract string ChildSubType { get; }
+
+ /// Singular operation noun used in messages (e.g. "Parallel" / "Map").
+ protected abstract string OperationNoun { get; }
+
+ /// Plural unit noun used in messages (e.g. "branches" / "items").
+ protected abstract string UnitNounPlural { get; }
+
+ ///
+ /// Resolves the unit at into its display name and the
+ /// function to run inside the unit's child context.
+ ///
+ protected abstract (string? Name, Func> Func) GetUnit(int index);
+
+ ///
+ /// Builds the subclass-specific exception thrown when the operation resolves
+ /// with .
+ ///
+ protected abstract DurableExecutionException CreateException(string message, IBatchResult result);
+
+ // ── Orchestration ───────────────────────────────────────────────────
+
+ protected override async Task> StartAsync(CancellationToken cancellationToken)
+ {
+ // Sync-flush parent CONTEXT START. Mirrors ChildContextOperation: if a
+ // unit suspends (e.g., a Wait inside it), the service needs to know the
+ // parent existed.
+ await EnqueueAsync(new SdkOperationUpdate
+ {
+ Id = OperationId,
+ Type = OperationTypes.Context,
+ Action = OperationAction.START,
+ SubType = ParentSubType,
+ Name = Name
+ }, cancellationToken);
+
+ return await ExecuteUnitsAsync(cancellationToken);
+ }
+
+ protected override Task> ReplayAsync(Operation existing, CancellationToken cancellationToken)
+ {
+ switch (existing.Status)
+ {
+ case OperationStatuses.Succeeded:
+ return Task.FromResult(ReconstructFromCheckpoints(existing, throwOnFailure: false));
+
+ case OperationStatuses.Failed:
+ // Reconstruct so the caller (and the exception's Result) sees the
+ // per-unit outcomes; then throw.
+ var failed = ReconstructFromCheckpoints(existing, throwOnFailure: false);
+ throw BuildException(failed);
+
+ case OperationStatuses.Started:
+ case OperationStatuses.Pending:
+ // Re-run: units replay from their own checkpoints.
+ return ExecuteUnitsAsync(cancellationToken);
+
+ default:
+ throw new NonDeterministicExecutionException(
+ $"{OperationNoun} operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay.");
+ }
+ }
+
+ private async Task> ExecuteUnitsAsync(CancellationToken cancellationToken)
+ {
+ // Combine the caller's token with the workflow-shutdown token for the
+ // operation's OWN control flow: the dispatch loop's semaphore waits, the
+ // post-settle re-throw, and each unit's OCE classification.
+ //
+ // CRITICAL: childOp.ExecuteAsync below still receives the *caller* token
+ // only. ChildContextOperation re-links workflow-shutdown itself for the
+ // user func, and its checkpoint writes (CONTEXT FAIL/SUCCEED) must NOT
+ // observe shutdown, otherwise teardown could abort a unit's successful
+ // checkpoint mid-flush.
+ using var controlCts = CancellationTokenSource.CreateLinkedTokenSource(
+ cancellationToken, _workflowCancellation.Token);
+ var controlToken = controlCts.Token;
+
+ controlToken.ThrowIfCancellationRequested();
+
+ // Cooperative-bail signal: tripped the moment a CompletionConfig
+ // short-circuit is decided. It flows into each unit's user func only
+ // (via ChildContextOperation's cooperative-bail token), NOT into the
+ // units' checkpoint writes — a unit that honors the token unwinds with an
+ // OperationCanceledException we record as Started, while a unit mid-flush
+ // of a successful checkpoint still completes. Units that ignore the token
+ // simply run to their natural terminal state, exactly as before. We never
+ // abandon a dispatched unit, so replay stays deterministic.
+ using var shortCircuitCts = new CancellationTokenSource();
+
+ var unitCount = UnitCount;
+ var slots = new UnitOutcome[unitCount];
+ var dispatched = new bool[unitCount];
+
+ var maxConcurrency = _maxConcurrency ?? unitCount;
+ // Optimisation: when MaxConcurrency >= unitCount, skip the semaphore
+ // entirely. Behaviour is identical, allocations are lower. (Also covers
+ // the empty-collection case, where unitCount == 0 and no unit runs.)
+ var semaphore = (maxConcurrency >= unitCount || unitCount == 0)
+ ? null
+ : new SemaphoreSlim(maxConcurrency, maxConcurrency);
+
+ var succeeded = 0;
+ var failed = 0;
+
+ var inFlight = new List(unitCount);
+
+ // Reads the live counters and asks the completion policy whether the run
+ // is already decided. Volatile reads pair with the Interlocked.Increment
+ // writes in the onComplete callback. Reads are non-atomic across the two
+ // counters: at worst we observe slightly stale values and dispatch one
+ // extra unit before the next completion forces a re-check. That's
+ // acceptable — the post-loop ComputeCompletionReason is the source of truth.
+ bool ShouldStopDispatchingNow() => _policy.ShouldStopDispatching(
+ Volatile.Read(ref succeeded), Volatile.Read(ref failed), unitCount);
+
+ // Signal still-running units to bail. Idempotent: the first Cancel wins,
+ // racing callbacks are harmless. Tolerate a late call after the CTS is
+ // disposed at end-of-scope (a unit completing during teardown).
+ void SignalShortCircuit()
+ {
+ try { shortCircuitCts.Cancel(); }
+ catch (ObjectDisposedException) { }
+ }
+
+ // Units run with the caller's token (re-linked to workflow-shutdown inside
+ // ChildContextOperation) so cooperative cancellation still propagates into
+ // user code, but we must NOT abandon already-dispatched units while they're
+ // still writing checkpoints — that would diverge between the original run
+ // and replay. The finally block therefore awaits every in-flight task even
+ // when cancellation fires, and only then disposes the semaphore (after units
+ // have settled — success, failure, or cooperative OCE).
+ try
+ {
+ for (var i = 0; i < unitCount; i++)
+ {
+ if (ShouldStopDispatchingNow())
+ {
+ SignalShortCircuit();
+ break;
+ }
+
+ if (semaphore != null)
+ {
+ await semaphore.WaitAsync(controlToken).ConfigureAwait(false);
+ // Re-check after acquiring: the wait may have unblocked because
+ // earlier units finished and short-circuited the operation.
+ if (ShouldStopDispatchingNow())
+ {
+ semaphore.Release();
+ SignalShortCircuit();
+ break;
+ }
+ }
+
+ var index = i;
+ dispatched[index] = true;
+ inFlight.Add(RunUnitAsync(index, slots, semaphore, cancellationToken, controlToken,
+ shortCircuitCts.Token,
+ onComplete: outcome =>
+ {
+ if (outcome.Status == BatchItemStatus.Succeeded)
+ Interlocked.Increment(ref succeeded);
+ else if (outcome.Status == BatchItemStatus.Failed)
+ Interlocked.Increment(ref failed);
+
+ // The deciding completion typically lands AFTER every unit
+ // has been dispatched, so the loop is no longer sitting at a
+ // break point. Re-check here and signal any still-running
+ // units to bail.
+ if (ShouldStopDispatchingNow())
+ SignalShortCircuit();
+ }));
+ }
+ }
+ finally
+ {
+ // CRITICAL: wait for every dispatched unit — even on the exceptional
+ // path (control-token cancellation mid-dispatch, or a synchronous throw
+ // out of the loop) — before the semaphore is disposed. Otherwise
+ // surviving units' Release() calls hit ObjectDisposedException, the
+ // tasks become unobserved, and they keep writing checkpoints out from
+ // under us.
+ //
+ // We deliberately DO NOT cancel already-running units when a
+ // short-circuit fires — orphan units that continue writing checkpoints
+ // would diverge between the original run and replay. Letting them finish
+ // guarantees determinism: all dispatched units end up Succeeded or
+ // Failed. Only un-dispatched units surface as Started.
+ if (inFlight.Count > 0)
+ {
+ try
+ {
+ await Task.WhenAll(inFlight).ConfigureAwait(false);
+ }
+ catch
+ {
+ // Swallow here — Task.WhenAll only surfaces the first exception,
+ // but every unit task is now in a terminal state and we want to
+ // inspect each one individually below to decide whether to
+ // surface a workflow-level error. The Task objects themselves
+ // still carry their exceptions, so this swallow does not orphan
+ // them.
+ }
+ }
+
+ semaphore?.Dispose();
+ }
+
+ // Surface any workflow-level exception (e.g. NonDeterministicExecutionException)
+ // raised inside a unit. RunUnitAsync re-throws DurableExecutionException
+ // (other than ChildContextException which is captured into the slot) so the
+ // task faults with that exception. Take the first such failure: these are
+ // structural errors, not "unit failed gracefully" outcomes.
+ foreach (var t in inFlight)
+ {
+ if (t.IsFaulted && t.Exception is { } agg)
+ {
+ foreach (var inner in agg.InnerExceptions)
+ {
+ if (inner is DurableExecutionException dex && inner is not ChildContextException)
+ {
+ throw dex;
+ }
+ }
+ }
+ }
+
+ // Re-throw any pending cancellation (caller-cancel or workflow shutdown) now
+ // that units have settled and the semaphore has been disposed cleanly.
+ // Surfacing it here means a torn-down operation propagates an
+ // OperationCanceledException instead of synthesizing a spurious
+ // FailureToleranceExceeded verdict from units that merely unwound.
+ controlToken.ThrowIfCancellationRequested();
+
+ // Build BatchItems for every unit in original order.
+ var items = new List>(unitCount);
+ for (var i = 0; i < unitCount; i++)
+ {
+ var (unitName, _) = GetUnit(i);
+ if (dispatched[i])
+ {
+ var outcome = slots[i];
+ items.Add(new BatchItem
+ {
+ Index = i,
+ Name = unitName,
+ Status = outcome.Status,
+ Result = outcome.Status == BatchItemStatus.Succeeded ? outcome.Result : default,
+ Error = outcome.Status == BatchItemStatus.Failed ? outcome.Error : null
+ });
+ }
+ else
+ {
+ items.Add(new BatchItem
+ {
+ Index = i,
+ Name = unitName,
+ Status = BatchItemStatus.Started,
+ Result = default,
+ Error = null
+ });
+ }
+ }
+
+ var completionReason = ComputeCompletionReason(items, unitCount);
+ var result = new BatchResult(items, completionReason);
+
+ var failureException = completionReason == CompletionReason.FailureToleranceExceeded
+ ? BuildException(result)
+ : null;
+
+ await CheckpointParentResultAsync(result, completionReason, failureException, cancellationToken);
+
+ if (failureException != null)
+ {
+ throw failureException;
+ }
+
+ return result;
+ }
+
+ private async Task RunUnitAsync(
+ int index,
+ UnitOutcome[] slots,
+ SemaphoreSlim? semaphore,
+ CancellationToken cancellationToken,
+ CancellationToken controlToken,
+ CancellationToken shortCircuitToken,
+ Action onComplete)
+ {
+ try
+ {
+ var (unitName, unitFunc) = GetUnit(index);
+ var childOpId = OperationIdGenerator.HashOperationId($"{OperationId}-{index + 1}");
+
+ var childOp = new ChildContextOperation(
+ childOpId,
+ unitName,
+ OperationId,
+ unitFunc,
+ new ChildContextConfig { SubType = ChildSubType },
+ Serializer,
+ ChildContextFactory,
+ State,
+ Termination,
+ _workflowCancellation,
+ DurableExecutionArn,
+ Batcher,
+ shortCircuitToken);
+
+ try
+ {
+ var result = await childOp.ExecuteAsync(cancellationToken).ConfigureAwait(false);
+ slots[index] = new UnitOutcome { Status = BatchItemStatus.Succeeded, Result = result };
+ }
+ catch (ChildContextException ex)
+ {
+ slots[index] = new UnitOutcome { Status = BatchItemStatus.Failed, Error = ex };
+ }
+ catch (DurableExecutionException)
+ {
+ // E.g. NonDeterministicExecutionException — these are not "unit
+ // failed gracefully" but workflow-level problems. Surface them:
+ // re-throw out of the operation without writing a slot (the
+ // orchestrator's outer flow handles it).
+ throw;
+ }
+ catch (OperationCanceledException) when (
+ shortCircuitToken.IsCancellationRequested && !controlToken.IsCancellationRequested)
+ {
+ // Cooperative bail: this unit honored the short-circuit signal raised
+ // when a sibling satisfied the CompletionConfig. It is neither a
+ // failure nor an operation-wide cancel — record it as Started so the
+ // verdict math treats it like an un-dispatched unit (and so it can
+ // never trip a failure threshold). The unit wrote no terminal
+ // checkpoint, so replay reconstructs it identically from the parent
+ // summary.
+ //
+ // Ordered BEFORE the control-token clause: a genuine caller-cancel /
+ // workflow-shutdown still takes precedence.
+ slots[index] = new UnitOutcome { Status = BatchItemStatus.Started };
+ }
+ catch (OperationCanceledException) when (controlToken.IsCancellationRequested)
+ {
+ // Control-token cancellation — caller-cancel OR workflow shutdown (a
+ // sibling op suspended, a checkpoint failed). Don't write a slot —
+ // Task.WhenAll observes this and the orchestrator re-throws after
+ // settling.
+ throw;
+ }
+ catch (OperationCanceledException ex)
+ {
+ // Unit-internal cancellation that is NOT tied to the control token
+ // (e.g. the unit's own CancellationTokenSource fired). Treat it as a
+ // normal per-unit failure rather than killing the operation as
+ // cancelled.
+ var wrapped = new ChildContextException(ex.Message, ex)
+ {
+ SubType = ChildSubType,
+ ErrorType = ex.GetType().FullName
+ };
+ slots[index] = new UnitOutcome { Status = BatchItemStatus.Failed, Error = wrapped };
+ }
+ catch (Exception ex)
+ {
+ // Wrap unexpected exceptions as ChildContextException — they're
+ // per-unit failures from the user's POV.
+ var wrapped = new ChildContextException(ex.Message, ex)
+ {
+ SubType = ChildSubType,
+ ErrorType = ex.GetType().FullName
+ };
+ slots[index] = new UnitOutcome { Status = BatchItemStatus.Failed, Error = wrapped };
+ }
+
+ onComplete(slots[index]);
+ }
+ finally
+ {
+ // Defensive: with this structure the semaphore is only disposed after
+ // Task.WhenAll(inFlight) has settled, so this Release should always
+ // succeed. ObjectDisposedException would indicate a bug elsewhere, but
+ // we tolerate it here so the task doesn't fault with a noise exception
+ // that masks the real one.
+ try
+ {
+ semaphore?.Release();
+ }
+ catch (ObjectDisposedException)
+ {
+ }
+ }
+ }
+
+ private CompletionReason ComputeCompletionReason(IReadOnlyList> items, int totalCount)
+ {
+ var succeeded = 0;
+ var failed = 0;
+ var started = 0;
+
+ foreach (var item in items)
+ {
+ switch (item.Status)
+ {
+ case BatchItemStatus.Succeeded: succeeded++; break;
+ case BatchItemStatus.Failed: failed++; break;
+ case BatchItemStatus.Started: started++; break;
+ }
+ }
+
+ return _policy.Evaluate(succeeded, failed, started, totalCount);
+ }
+
+ private DurableExecutionException BuildException(IBatchResult result)
+ {
+ var message =
+ $"{OperationNoun} operation failed: failure tolerance exceeded " +
+ $"({result.FailureCount} of {result.TotalCount} {UnitNounPlural} failed).";
+ return CreateException(message, result);
+ }
+
+ private async Task CheckpointParentResultAsync(
+ BatchResult result,
+ CompletionReason completionReason,
+ DurableExecutionException? failureException,
+ CancellationToken cancellationToken)
+ {
+ var summary = new BatchSummary
+ {
+ CompletionReason = SerializeCompletionReason(completionReason),
+ Units = new List(result.All.Count)
+ };
+ for (var i = 0; i < result.All.Count; i++)
+ {
+ var item = result.All[i];
+ summary.Units.Add(new BatchUnitSummary
+ {
+ Index = item.Index,
+ Name = item.Name,
+ Status = SerializeStatus(item.Status)
+ });
+ }
+
+ var payload = JsonSerializer.Serialize(summary, BatchJsonContext.Default.BatchSummary);
+ var failed = failureException != null;
+
+ await EnqueueAsync(new SdkOperationUpdate
+ {
+ Id = OperationId,
+ Type = OperationTypes.Context,
+ Action = failed ? OperationAction.FAIL : OperationAction.SUCCEED,
+ SubType = ParentSubType,
+ Name = Name,
+ Payload = failed ? null : payload,
+ Error = failed ? BuildAggregateError(result, failureException!) : null
+ }, cancellationToken);
+ }
+
+ private IBatchResult ReconstructFromCheckpoints(Operation parent, bool throwOnFailure)
+ {
+ var summary = ParseSummary(parent.ContextDetails?.Result);
+
+ var items = new List>(UnitCount);
+ for (var i = 0; i < UnitCount; i++)
+ {
+ var (unitName, _) = GetUnit(i);
+ var childOpId = OperationIdGenerator.HashOperationId($"{OperationId}-{i + 1}");
+ var childOp = State.GetOperation(childOpId);
+ var summaryEntry = summary?.Units.FirstOrDefault(b => b.Index == i);
+
+ BatchItemStatus status = summaryEntry != null
+ ? DeserializeStatus(summaryEntry.Status)
+ : InferStatusFromChildOp(childOp);
+
+ // Prefer the name that was checkpointed at the moment the batch
+ // resolved. This is the only authoritative source for units reported
+ // as Started (no per-unit checkpoint exists to consult), and it lets
+ // us detect unit-name drift between deployments.
+ var checkpointedName = summaryEntry?.Name;
+ if (checkpointedName != null && unitName != null && checkpointedName != unitName)
+ {
+ throw new NonDeterministicExecutionException(
+ $"Non-deterministic execution detected for {OperationNoun.ToLowerInvariant()} unit {i} of operation " +
+ $"'{Name ?? OperationId}': expected name '{unitName}' but found '{checkpointedName}' " +
+ $"from a previous invocation. Code must not change the order or name of concurrent " +
+ $"units between deployments.");
+ }
+ var resolvedName = checkpointedName ?? unitName;
+
+ T? unitResult = default;
+ DurableExecutionException? unitError = null;
+
+ if (status == BatchItemStatus.Succeeded && childOp?.ContextDetails?.Result != null)
+ {
+ unitResult = DeserializeResult(childOp.ContextDetails.Result);
+ }
+ else if (status == BatchItemStatus.Failed && childOp?.ContextDetails?.Error != null)
+ {
+ var err = childOp.ContextDetails.Error;
+ unitError = new ChildContextException(err.ErrorMessage ?? "Unit failed")
+ {
+ SubType = childOp.SubType ?? ChildSubType,
+ ErrorType = err.ErrorType,
+ ErrorData = err.ErrorData,
+ OriginalStackTrace = err.StackTrace
+ };
+ }
+
+ items.Add(new BatchItem
+ {
+ Index = i,
+ Name = resolvedName,
+ Status = status,
+ Result = unitResult,
+ Error = unitError
+ });
+ }
+
+ var completionReason = summary != null
+ ? DeserializeCompletionReason(summary.CompletionReason)
+ : ComputeCompletionReason(items, UnitCount);
+
+ var result = new BatchResult(items, completionReason);
+
+ if (throwOnFailure && completionReason == CompletionReason.FailureToleranceExceeded)
+ {
+ throw BuildException(result);
+ }
+
+ return result;
+ }
+
+ private static BatchItemStatus InferStatusFromChildOp(Operation? childOp)
+ {
+ if (childOp == null) return BatchItemStatus.Started;
+ return childOp.Status switch
+ {
+ OperationStatuses.Succeeded => BatchItemStatus.Succeeded,
+ OperationStatuses.Failed => BatchItemStatus.Failed,
+ _ => BatchItemStatus.Started
+ };
+ }
+
+ private SdkErrorObject BuildAggregateError(IBatchResult result, DurableExecutionException failureException)
+ {
+ return new SdkErrorObject
+ {
+ ErrorType = failureException.GetType().FullName,
+ ErrorMessage =
+ $"{OperationNoun} operation failed: {result.FailureCount} of {result.TotalCount} {UnitNounPlural} failed."
+ };
+ }
+
+ private static BatchSummary? ParseSummary(string? payload)
+ {
+ if (string.IsNullOrEmpty(payload)) return null;
+ try
+ {
+ return JsonSerializer.Deserialize(payload, BatchJsonContext.Default.BatchSummary);
+ }
+ catch (JsonException)
+ {
+ // Tolerate older / corrupted payloads — fall back to inferring status
+ // from per-unit checkpoints.
+ return null;
+ }
+ }
+
+ private static string SerializeStatus(BatchItemStatus status) => status switch
+ {
+ BatchItemStatus.Succeeded => "SUCCEEDED",
+ BatchItemStatus.Failed => "FAILED",
+ BatchItemStatus.Started => "STARTED",
+ _ => throw new ArgumentOutOfRangeException(nameof(status))
+ };
+
+ private static BatchItemStatus DeserializeStatus(string? wire) => wire switch
+ {
+ "SUCCEEDED" => BatchItemStatus.Succeeded,
+ "FAILED" => BatchItemStatus.Failed,
+ "STARTED" => BatchItemStatus.Started,
+ _ => BatchItemStatus.Started
+ };
+
+ private static string SerializeCompletionReason(CompletionReason reason) => reason switch
+ {
+ CompletionReason.AllCompleted => "ALL_COMPLETED",
+ CompletionReason.MinSuccessfulReached => "MIN_SUCCESSFUL_REACHED",
+ CompletionReason.FailureToleranceExceeded => "FAILURE_TOLERANCE_EXCEEDED",
+ _ => throw new ArgumentOutOfRangeException(nameof(reason))
+ };
+
+ private static CompletionReason DeserializeCompletionReason(string? wire) => wire switch
+ {
+ "ALL_COMPLETED" => CompletionReason.AllCompleted,
+ "MIN_SUCCESSFUL_REACHED" => CompletionReason.MinSuccessfulReached,
+ "FAILURE_TOLERANCE_EXCEEDED" => CompletionReason.FailureToleranceExceeded,
+ _ => CompletionReason.AllCompleted
+ };
+
+ private T DeserializeResult(string serialized)
+ {
+ var bytes = Encoding.UTF8.GetBytes(serialized);
+ using var ms = new MemoryStream(bytes);
+ return Serializer.Deserialize(ms);
+ }
+
+ ///
+ /// Internal scratch space tracking each unit's outcome as it lands in the
+ /// executor; copied into the user-facing once every
+ /// dispatched unit has settled.
+ ///
+ private struct UnitOutcome
+ {
+ public BatchItemStatus Status;
+ public T? Result;
+ public DurableExecutionException? Error;
+ }
+}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/MapOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/MapOperation.cs
new file mode 100644
index 000000000..6d11c31ab
--- /dev/null
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/MapOperation.cs
@@ -0,0 +1,80 @@
+// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
+// SPDX-License-Identifier: Apache-2.0
+
+using System.Globalization;
+using Amazon.Lambda;
+using Amazon.Lambda.Core;
+
+namespace Amazon.Lambda.DurableExecution.Internal;
+
+///
+/// Durable map operation. Processes a collection in parallel, running the
+/// user-supplied function once per item — each as a
+/// . All orchestration, completion,
+/// checkpoint, and replay logic lives in ;
+/// this subclass supplies only the map-specific bits: how to turn an item index
+/// into a (name, func) pair (the per-item callback receives the item, its
+/// index, and the full source list), the Map sub-type labels, and the
+/// factory.
+///
+internal sealed class MapOperation : ConcurrentOperation
+{
+ private readonly IReadOnlyList _items;
+ private readonly Func, Task> _func;
+ private readonly Func? _itemNamer;
+
+ public MapOperation(
+ string operationId,
+ string? name,
+ string? parentId,
+ IReadOnlyList items,
+ Func, Task> func,
+ MapConfig config,
+ ILambdaSerializer serializer,
+ Func childContextFactory,
+ ExecutionState state,
+ TerminationManager termination,
+ WorkflowCancellation workflowCancellation,
+ string durableExecutionArn,
+ CheckpointBatcher? batcher = null)
+ : base(operationId, name, parentId, config.CompletionConfig, config.MaxConcurrency,
+ serializer, childContextFactory, state, termination, workflowCancellation,
+ durableExecutionArn, batcher)
+ {
+ _items = items;
+ _func = func;
+ _itemNamer = config.ItemNamer;
+ }
+
+ protected override int UnitCount => _items.Count;
+ protected override string ParentSubType => OperationSubTypes.Map;
+ protected override string ChildSubType => OperationSubTypes.MapItem;
+ protected override string OperationNoun => "Map";
+ protected override string UnitNounPlural => "items";
+
+ protected override (string? Name, Func> Func) GetUnit(int index)
+ {
+ var item = _items[index];
+ // Default name is the index — matches the unnamed-branch convention in
+ // ParallelAsync. A custom ItemNamer can derive a readable name from the
+ // item's content. Naming affects observability only, never replay
+ // correlation (child operation IDs are derived from the index).
+ var name = _itemNamer is not null
+ ? _itemNamer(item!, index)
+ : index.ToString(CultureInfo.InvariantCulture);
+
+ // The per-item callback does not take a CancellationToken; cooperative
+ // bail still reaches it through the child context's token chain (the
+ // ChildContextOperation links the short-circuit token into ctx).
+ return (name, (ctx, _) => _func(ctx, item, index, _items));
+ }
+
+ protected override DurableExecutionException CreateException(string message, IBatchResult result)
+ {
+ return new MapException(message)
+ {
+ Result = result,
+ CompletionReason = result.CompletionReason
+ };
+ }
+}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelJsonContext.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelJsonContext.cs
deleted file mode 100644
index 2081c8e68..000000000
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelJsonContext.cs
+++ /dev/null
@@ -1,18 +0,0 @@
-// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
-// SPDX-License-Identifier: Apache-2.0
-
-using System.Text.Json.Serialization;
-
-namespace Amazon.Lambda.DurableExecution.Internal;
-
-///
-/// AOT-friendly for the internal
-/// payload stored on a parallel parent's CONTEXT
-/// checkpoint. Only this internal type — never user T — flows through here, so
-/// the source-generated metadata is sufficient.
-///
-[JsonSerializable(typeof(ParallelSummary))]
-[JsonSerializable(typeof(ParallelBranchSummary))]
-internal sealed partial class ParallelJsonContext : JsonSerializerContext
-{
-}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelOperation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelOperation.cs
index 2d55dc511..7c43ebb8e 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelOperation.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelOperation.cs
@@ -1,53 +1,22 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
-using System.IO;
-using System.Text;
using Amazon.Lambda;
using Amazon.Lambda.Core;
-using SdkErrorObject = Amazon.Lambda.Model.ErrorObject;
-using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate;
namespace Amazon.Lambda.DurableExecution.Internal;
///
-/// Durable parallel operation. Runs N user-supplied branches concurrently
-/// (each as a ) under a shared
-/// and concurrency limit, persisting the
-/// aggregate result so subsequent invocations replay it without re-executing.
+/// Durable parallel operation. Runs N user-supplied branches concurrently,
+/// each as a . All orchestration,
+/// completion, checkpoint, and replay logic lives in
+/// ; this subclass supplies only the
+/// branch-specific bits (unit count, per-branch (name, func) , sub-type
+/// labels, and the failure-exception factory).
///
-///
-/// Replay branches — example: await ctx.ParallelAsync(funcs, name: "fetch")
-///
-/// Fresh : no prior state → sync-flush parent CONTEXT START →
-/// dispatch branches respecting MaxConcurrency → wait for in-flight to
-/// complete after CompletionConfig short-circuit → emit parent CONTEXT
-/// SUCCEED with summary payload ( ).
-/// SUCCEEDED : parent payload supplies the snapshot of per-
-/// branch statuses + completion reason; per-branch results are
-/// deserialised from the children's own CONTEXT checkpoints.
-/// FAILED : same reconstruction; throws
-/// carrying the rebuilt
-/// .
-/// STARTED / PENDING : re-execute (children replay from
-/// their own checkpoints).
-///
-/// Per-branch errors do NOT abort the parallel directly — the orchestrator
-/// catches each branch's , records it as a
-/// failed , and consults the
-/// after every completion. Only when the
-/// completion config marks the run as
-/// does the parallel
-/// throw.
-///
-internal sealed class ParallelOperation : DurableOperation>
+internal sealed class ParallelOperation : ConcurrentOperation
{
private readonly IReadOnlyList> _branches;
- private readonly ParallelConfig _config;
- private readonly CompletionPolicy _policy;
- private readonly ILambdaSerializer _serializer;
- private readonly Func _childContextFactory;
- private readonly WorkflowCancellation _workflowCancellation;
public ParallelOperation(
string operationId,
@@ -62,545 +31,31 @@ public ParallelOperation(
WorkflowCancellation workflowCancellation,
string durableExecutionArn,
CheckpointBatcher? batcher = null)
- : base(operationId, name, parentId, state, termination, durableExecutionArn, batcher)
+ : base(operationId, name, parentId, config.CompletionConfig, config.MaxConcurrency,
+ serializer, childContextFactory, state, termination, workflowCancellation,
+ durableExecutionArn, batcher)
{
_branches = branches;
- _config = config;
- _policy = new CompletionPolicy(config.CompletionConfig);
- _serializer = serializer;
- _childContextFactory = childContextFactory;
- _workflowCancellation = workflowCancellation;
}
- protected override string OperationType => OperationTypes.Context;
+ protected override int UnitCount => _branches.Count;
+ protected override string ParentSubType => OperationSubTypes.Parallel;
+ protected override string ChildSubType => OperationSubTypes.ParallelBranch;
+ protected override string OperationNoun => "Parallel";
+ protected override string UnitNounPlural => "branches";
- protected override async Task> StartAsync(CancellationToken cancellationToken)
+ protected override (string? Name, Func> Func) GetUnit(int index)
{
- // Sync-flush parent CONTEXT START. Mirrors ChildContextOperation: if a
- // branch suspends (e.g., a Wait inside a branch), the service needs to
- // know the parallel parent existed.
- await EnqueueAsync(new SdkOperationUpdate
- {
- Id = OperationId,
- Type = OperationTypes.Context,
- Action = OperationAction.START,
- SubType = OperationSubTypes.Parallel,
- Name = Name
- }, cancellationToken);
-
- return await ExecuteBranchesAsync(cancellationToken);
- }
-
- protected override Task> ReplayAsync(Operation existing, CancellationToken cancellationToken)
- {
- switch (existing.Status)
- {
- case OperationStatuses.Succeeded:
- return Task.FromResult(ReconstructFromCheckpoints(existing, throwOnFailure: false));
-
- case OperationStatuses.Failed:
- // Reconstruct so the caller (and ParallelException.Result) sees
- // the per-branch outcomes; then throw.
- var failed = ReconstructFromCheckpoints(existing, throwOnFailure: false);
- throw BuildParallelException(failed);
-
- case OperationStatuses.Started:
- case OperationStatuses.Pending:
- // Re-run: branches replay from their own checkpoints.
- return ExecuteBranchesAsync(cancellationToken);
-
- default:
- throw new NonDeterministicExecutionException(
- $"Parallel operation '{Name ?? OperationId}' has unexpected status '{existing.Status}' on replay.");
- }
- }
-
- private async Task> ExecuteBranchesAsync(CancellationToken cancellationToken)
- {
- // Combine the caller's token with the workflow-shutdown token for the
- // parallel's OWN control flow: the dispatch loop's semaphore waits, the
- // post-settle re-throw, and each branch's OCE classification.
- //
- // CRITICAL: childOp.ExecuteAsync below still receives the *caller* token
- // only. ChildContextOperation re-links workflow-shutdown itself for the
- // user Func, and its checkpoint writes (CONTEXT FAIL/SUCCEED) must NOT
- // observe shutdown, otherwise teardown could abort a branch's successful
- // checkpoint mid-flush.
- using var controlCts = CancellationTokenSource.CreateLinkedTokenSource(
- cancellationToken, _workflowCancellation.Token);
- var controlToken = controlCts.Token;
-
- controlToken.ThrowIfCancellationRequested();
-
- // Cooperative-bail signal: tripped the moment a CompletionConfig
- // short-circuit is decided. It flows into each branch's user Func only
- // (via ChildContextOperation's linked token), NOT into the branches'
- // checkpoint writes — a branch that honors the token unwinds with an
- // OperationCanceledException we record as Started, while a branch
- // mid-flush of a successful checkpoint still completes. Branches that
- // ignore the token simply run to their natural terminal state, exactly
- // as before. We never abandon a dispatched branch, so replay stays
- // deterministic.
- using var shortCircuitCts = new CancellationTokenSource();
-
- var branchCount = _branches.Count;
- var slots = new BranchOutcome[branchCount];
- var dispatched = new bool[branchCount];
-
- var maxConcurrency = _config.MaxConcurrency ?? branchCount;
- // Optimisation: when MaxConcurrency >= branchCount, skip the semaphore
- // entirely. Behaviour is identical, allocations are lower.
- var semaphore = (maxConcurrency >= branchCount) ? null : new SemaphoreSlim(maxConcurrency, maxConcurrency);
-
- var succeeded = 0;
- var failed = 0;
-
- var inFlight = new List(branchCount);
-
- // Reads the live counters and asks the completion policy whether the run
- // is already decided. Volatile reads pair with the Interlocked.Increment
- // writes in the onComplete callback. Reads are non-atomic across the two
- // counters: at worst we observe slightly stale values and dispatch one
- // extra branch before the next completion forces a re-check. That's
- // acceptable — the post-loop _policy.Evaluate is the source of truth.
- bool ShouldStopDispatchingNow() => _policy.ShouldStopDispatching(
- Volatile.Read(ref succeeded), Volatile.Read(ref failed), branchCount);
-
- // Signal still-running branches to bail. Idempotent: the first Cancel
- // wins, racing callbacks are harmless. Tolerate a late call after the
- // CTS is disposed at end-of-scope (a branch completing during teardown).
- void SignalShortCircuit()
- {
- try { shortCircuitCts.Cancel(); }
- catch (ObjectDisposedException) { }
- }
-
- // Branches run with the caller's token (re-linked to workflow-shutdown
- // inside ChildContextOperation) so cooperative cancellation still
- // propagates into user code, but we must NOT abandon already-dispatched
- // branches while they're still writing checkpoints — that would diverge
- // between the original run and replay. The finally block therefore awaits
- // every in-flight task even when cancellation fires, and only then
- // disposes the semaphore (after branches have settled — success, failure,
- // or cooperative OCE).
- try
- {
- for (var i = 0; i < branchCount; i++)
- {
- if (ShouldStopDispatchingNow())
- {
- SignalShortCircuit();
- break;
- }
-
- if (semaphore != null)
- {
- await semaphore.WaitAsync(controlToken).ConfigureAwait(false);
- // Re-check after acquiring: the wait may have unblocked
- // because earlier branches finished and short-circuited
- // the operation.
- if (ShouldStopDispatchingNow())
- {
- semaphore.Release();
- SignalShortCircuit();
- break;
- }
- }
-
- var index = i;
- dispatched[index] = true;
- inFlight.Add(RunBranchAsync(index, slots, semaphore, cancellationToken, controlToken,
- shortCircuitCts.Token,
- onComplete: outcome =>
- {
- if (outcome.Status == BatchItemStatus.Succeeded)
- Interlocked.Increment(ref succeeded);
- else if (outcome.Status == BatchItemStatus.Failed)
- Interlocked.Increment(ref failed);
-
- // The deciding completion typically lands AFTER every
- // branch has been dispatched, so the loop is no longer
- // sitting at a break point. Re-check here and signal
- // any still-running branches to bail.
- if (ShouldStopDispatchingNow())
- SignalShortCircuit();
- }));
- }
- }
- finally
- {
- // CRITICAL: wait for every dispatched branch — even on the
- // exceptional path (control-token cancellation mid-dispatch, or a
- // synchronous throw out of the loop) — before the semaphore is
- // disposed. Otherwise surviving branches' Release() calls hit
- // ObjectDisposedException, the tasks become unobserved, and they
- // keep writing checkpoints out from under us.
- //
- // We deliberately DO NOT cancel already-running branches when a
- // short-circuit fires — orphan branches that continue writing
- // checkpoints would diverge between the original run and replay.
- // Letting them finish guarantees determinism: all dispatched
- // branches end up Succeeded or Failed. Only un-dispatched branches
- // surface as Started.
- if (inFlight.Count > 0)
- {
- try
- {
- await Task.WhenAll(inFlight).ConfigureAwait(false);
- }
- catch
- {
- // Swallow here — Task.WhenAll only surfaces the first
- // exception, but every branch task is now in a terminal
- // state and we want to inspect each one individually below
- // to decide whether to surface a workflow-level error. The
- // Task objects themselves still carry their exceptions, so
- // this swallow does not orphan them.
- }
- }
-
- semaphore?.Dispose();
- }
-
- // Surface any workflow-level exception (e.g. NonDeterministicExecutionException)
- // raised inside a branch. RunBranchAsync re-throws DurableExecutionException
- // (other than ChildContextException which is captured into the slot) so the
- // task faults with that exception. Take the first such failure: these are
- // structural errors, not "branch failed gracefully" outcomes.
- foreach (var t in inFlight)
- {
- if (t.IsFaulted && t.Exception is { } agg)
- {
- foreach (var inner in agg.InnerExceptions)
- {
- if (inner is DurableExecutionException dex && inner is not ChildContextException)
- {
- throw dex;
- }
- }
- }
- }
-
- // Re-throw any pending cancellation (caller-cancel or workflow shutdown)
- // now that branches have settled and the semaphore has been disposed
- // cleanly. Surfacing it here means a torn-down parallel propagates an
- // OperationCanceledException instead of synthesizing a spurious
- // FailureToleranceExceeded verdict from branches that merely unwound.
- controlToken.ThrowIfCancellationRequested();
-
- // Build BatchItems for every branch in original order.
- var items = new List>(branchCount);
- for (var i = 0; i < branchCount; i++)
- {
- if (dispatched[i])
- {
- var outcome = slots[i];
- items.Add(new BatchItem
- {
- Index = i,
- Name = _branches[i].Name,
- Status = outcome.Status,
- Result = outcome.Status == BatchItemStatus.Succeeded ? outcome.Result : default,
- Error = outcome.Status == BatchItemStatus.Failed ? outcome.Error : null
- });
- }
- else
- {
- items.Add(new BatchItem
- {
- Index = i,
- Name = _branches[i].Name,
- Status = BatchItemStatus.Started,
- Result = default,
- Error = null
- });
- }
- }
-
- var completionReason = EvaluateCompletion(items, branchCount);
- var result = new BatchResult(items, completionReason);
-
- await CheckpointParentResultAsync(result, completionReason, cancellationToken);
-
- if (completionReason == CompletionReason.FailureToleranceExceeded)
- {
- throw BuildParallelException(result);
- }
-
- return result;
- }
-
- private async Task RunBranchAsync(
- int index,
- BranchOutcome[] slots,
- SemaphoreSlim? semaphore,
- CancellationToken cancellationToken,
- CancellationToken controlToken,
- CancellationToken shortCircuitToken,
- Action onComplete)
- {
- try
- {
- var branch = _branches[index];
- var branchOpId = OperationIdGenerator.HashOperationId($"{OperationId}-{index + 1}");
-
- var childOp = new ChildContextOperation(
- branchOpId,
- branch.Name,
- OperationId,
- branch.Func,
- new ChildContextConfig { SubType = OperationSubTypes.ParallelBranch },
- _serializer,
- _childContextFactory,
- State,
- Termination,
- _workflowCancellation,
- DurableExecutionArn,
- Batcher,
- shortCircuitToken);
-
- try
- {
- var result = await childOp.ExecuteAsync(cancellationToken).ConfigureAwait(false);
- slots[index] = new BranchOutcome { Status = BatchItemStatus.Succeeded, Result = result };
- }
- catch (ChildContextException ex)
- {
- slots[index] = new BranchOutcome { Status = BatchItemStatus.Failed, Error = ex };
- }
- catch (DurableExecutionException)
- {
- // E.g. NonDeterministicExecutionException — these are not
- // "branch failed gracefully" but workflow-level problems.
- // Surface them: re-throw out of the parallel without writing
- // a slot (the orchestrator's outer flow handles it).
- throw;
- }
- catch (OperationCanceledException) when (
- shortCircuitToken.IsCancellationRequested && !controlToken.IsCancellationRequested)
- {
- // Cooperative bail: this branch honored the short-circuit signal
- // raised when a sibling satisfied the CompletionConfig. It is
- // neither a failure nor a parallel-wide cancel — record it as
- // Started so the verdict math treats it like an un-dispatched
- // branch (and so it can never trip a failure threshold). The
- // branch wrote no terminal checkpoint, so replay reconstructs
- // it identically from the parent summary.
- //
- // Ordered BEFORE the control-token clause: a genuine
- // caller-cancel / workflow-shutdown still takes precedence.
- slots[index] = new BranchOutcome { Status = BatchItemStatus.Started };
- }
- catch (OperationCanceledException) when (controlToken.IsCancellationRequested)
- {
- // Control-token cancellation — caller-cancel OR workflow
- // shutdown (a sibling op suspended, a checkpoint failed)
- throw;
- }
- catch (OperationCanceledException ex)
- {
- // Branch-internal cancellation that is NOT tied to the control
- // token (e.g. the branch's own CancellationTokenSource fired).
- // Treat it as a normal per-branch failure rather than killing
- // the parallel as cancelled.
- var wrapped = new ChildContextException(ex.Message, ex)
- {
- SubType = OperationSubTypes.ParallelBranch,
- ErrorType = ex.GetType().FullName
- };
- slots[index] = new BranchOutcome { Status = BatchItemStatus.Failed, Error = wrapped };
- }
- catch (Exception ex)
- {
- // Wrap unexpected exceptions as ChildContextException — they're
- // per-branch failures from the user's POV.
- var wrapped = new ChildContextException(ex.Message, ex)
- {
- SubType = OperationSubTypes.ParallelBranch,
- ErrorType = ex.GetType().FullName
- };
- slots[index] = new BranchOutcome { Status = BatchItemStatus.Failed, Error = wrapped };
- }
-
- onComplete(slots[index]);
- }
- finally
- {
- // Defensive: with the new structure the semaphore is only disposed
- // after Task.WhenAll(inFlight) has settled, so this Release should
- // always succeed. ObjectDisposedException would indicate a bug
- // elsewhere, but we tolerate it here so the task doesn't fault
- // with a noise exception that masks the real one.
- try
- {
- semaphore?.Release();
- }
- catch (ObjectDisposedException)
- {
- }
- }
- }
-
- private CompletionReason EvaluateCompletion(IReadOnlyList> items, int totalCount)
- {
- var succeeded = 0;
- var failed = 0;
- var started = 0;
-
- foreach (var item in items)
- {
- switch (item.Status)
- {
- case BatchItemStatus.Succeeded: succeeded++; break;
- case BatchItemStatus.Failed: failed++; break;
- case BatchItemStatus.Started: started++; break;
- }
- }
-
- return _policy.Evaluate(succeeded, failed, started, totalCount);
- }
-
- private async Task CheckpointParentResultAsync(
- BatchResult result,
- CompletionReason completionReason,
- CancellationToken cancellationToken)
- {
- var summary = ParallelSummaryCodec.Build(result.All, completionReason);
- var payload = ParallelSummaryCodec.ToPayload(summary);
- var failed = completionReason == CompletionReason.FailureToleranceExceeded;
-
- await EnqueueAsync(new SdkOperationUpdate
- {
- Id = OperationId,
- Type = OperationTypes.Context,
- Action = failed ? OperationAction.FAIL : OperationAction.SUCCEED,
- SubType = OperationSubTypes.Parallel,
- Name = Name,
- Payload = failed ? null : payload,
- Error = failed ? BuildAggregateError(result) : null
- }, cancellationToken);
- }
-
- private IBatchResult ReconstructFromCheckpoints(Operation parent, bool throwOnFailure)
- {
- var summary = ParallelSummaryCodec.FromPayload(parent.ContextDetails?.Result);
-
- var items = new List>(_branches.Count);
- for (var i = 0; i < _branches.Count; i++)
- {
- var branchOpId = OperationIdGenerator.HashOperationId($"{OperationId}-{i + 1}");
- var branchOp = State.GetOperation(branchOpId);
- var summaryEntry = summary?.Branches.FirstOrDefault(b => b.Index == i);
-
- BatchItemStatus status = summaryEntry != null
- ? ParallelSummaryCodec.ReadStatus(summaryEntry.Status)
- : InferStatusFromBranchOp(branchOp);
-
- // Prefer the name that was checkpointed at the moment the batch
- // resolved. This is the only authoritative source for branches
- // reported as Started (no per-branch checkpoint exists to consult),
- // and it lets us detect branch-name drift between deployments.
- var currentName = _branches[i].Name;
- var checkpointedName = summaryEntry?.Name;
- if (checkpointedName != null && currentName != null && checkpointedName != currentName)
- {
- throw new NonDeterministicExecutionException(
- $"Non-deterministic execution detected for parallel branch {i} of operation " +
- $"'{Name ?? OperationId}': expected name '{currentName}' but found '{checkpointedName}' " +
- $"from a previous invocation. Code must not change the order or name of parallel " +
- $"branches between deployments.");
- }
- var itemName = checkpointedName ?? currentName;
-
- T? branchResult = default;
- DurableExecutionException? branchError = null;
-
- if (status == BatchItemStatus.Succeeded && branchOp?.ContextDetails?.Result != null)
- {
- branchResult = DeserializeBranchResult(branchOp.ContextDetails.Result);
- }
- else if (status == BatchItemStatus.Failed && branchOp?.ContextDetails?.Error != null)
- {
- var err = branchOp.ContextDetails.Error;
- branchError = new ChildContextException(err.ErrorMessage ?? "Branch failed")
- {
- SubType = branchOp.SubType ?? OperationSubTypes.ParallelBranch,
- ErrorType = err.ErrorType,
- ErrorData = err.ErrorData,
- OriginalStackTrace = err.StackTrace
- };
- }
-
- items.Add(new BatchItem
- {
- Index = i,
- Name = itemName,
- Status = status,
- Result = branchResult,
- Error = branchError
- });
- }
-
- var completionReason = summary != null
- ? ParallelSummaryCodec.ReadCompletionReason(summary.CompletionReason)
- : EvaluateCompletion(items, _branches.Count);
-
- var result = new BatchResult(items, completionReason);
-
- if (throwOnFailure && completionReason == CompletionReason.FailureToleranceExceeded)
- {
- throw BuildParallelException(result);
- }
-
- return result;
- }
-
- private static BatchItemStatus InferStatusFromBranchOp(Operation? branchOp)
- {
- if (branchOp == null) return BatchItemStatus.Started;
- return branchOp.Status switch
- {
- OperationStatuses.Succeeded => BatchItemStatus.Succeeded,
- OperationStatuses.Failed => BatchItemStatus.Failed,
- _ => BatchItemStatus.Started
- };
+ var branch = _branches[index];
+ return (branch.Name, branch.Func);
}
- private static ParallelException BuildParallelException(IBatchResult result)
+ protected override DurableExecutionException CreateException(string message, IBatchResult result)
{
- return new ParallelException(
- $"Parallel operation failed: failure tolerance exceeded ({result.FailureCount} of {result.TotalCount} branches failed).")
+ return new ParallelException(message)
{
Result = result,
CompletionReason = result.CompletionReason
};
}
-
- private static SdkErrorObject BuildAggregateError(IBatchResult result)
- {
- return new SdkErrorObject
- {
- ErrorType = typeof(ParallelException).FullName,
- ErrorMessage = $"Parallel operation failed: {result.FailureCount} of {result.TotalCount} branches failed."
- };
- }
-
- private T DeserializeBranchResult(string serialized)
- {
- var bytes = Encoding.UTF8.GetBytes(serialized);
- using var ms = new MemoryStream(bytes);
- return _serializer.Deserialize(ms);
- }
-
- ///
- /// Internal scratch space tracking each branch's outcome as it lands in
- /// the executor; copied into the user-facing
- /// once every dispatched branch has settled.
- ///
- private struct BranchOutcome
- {
- public BatchItemStatus Status;
- public T? Result;
- public DurableExecutionException? Error;
- }
}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelSummary.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelSummary.cs
deleted file mode 100644
index b083b9bbf..000000000
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelSummary.cs
+++ /dev/null
@@ -1,34 +0,0 @@
-// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
-// SPDX-License-Identifier: Apache-2.0
-
-using System.Text.Json.Serialization;
-
-namespace Amazon.Lambda.DurableExecution.Internal;
-
-///
-/// Internal payload shape stored on a parallel parent's CONTEXT checkpoint
-/// (as ContextDetails.Result ) and reconstructed on replay. Carries the
-/// completion reason and the per-branch index → status map so the
-/// can be rebuilt without depending on user T
-/// shape — per-branch results live on the children's own checkpoints.
-///
-internal sealed class ParallelSummary
-{
- [JsonPropertyName("CompletionReason")]
- public string? CompletionReason { get; set; }
-
- [JsonPropertyName("Branches")]
- public IList Branches { get; set; } = new List();
-}
-
-internal sealed class ParallelBranchSummary
-{
- [JsonPropertyName("Index")]
- public int Index { get; set; }
-
- [JsonPropertyName("Name")]
- public string? Name { get; set; }
-
- [JsonPropertyName("Status")]
- public string? Status { get; set; }
-}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelSummaryCodec.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelSummaryCodec.cs
deleted file mode 100644
index bf14d6485..000000000
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelSummaryCodec.cs
+++ /dev/null
@@ -1,89 +0,0 @@
-// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
-// SPDX-License-Identifier: Apache-2.0
-
-using System.Text.Json;
-
-namespace Amazon.Lambda.DurableExecution.Internal;
-
-///
-/// Owns the on-the-wire representation of a parallel parent's CONTEXT payload:
-/// the JSON and the enum↔string mappings for
-/// per-branch status and completion reason. Keeping every string literal and the
-/// JSON (de)serialisation behind this one type means
-/// deals only in domain enums, and the wire contract has a single owner.
-///
-internal static class ParallelSummaryCodec
-{
- /// Builds the summary snapshot for a settled batch.
- public static ParallelSummary Build(IReadOnlyList> items, CompletionReason completionReason)
- {
- var summary = new ParallelSummary
- {
- CompletionReason = WriteCompletionReason(completionReason),
- Branches = new List(items.Count)
- };
- foreach (var item in items)
- {
- summary.Branches.Add(new ParallelBranchSummary
- {
- Index = item.Index,
- Name = item.Name,
- Status = WriteStatus(item.Status)
- });
- }
- return summary;
- }
-
- public static string ToPayload(ParallelSummary summary)
- => JsonSerializer.Serialize(summary, ParallelJsonContext.Default.ParallelSummary);
-
- ///
- /// Parses a checkpointed payload. Returns null for empty, older, or
- /// corrupted payloads so the caller can fall back to inferring per-branch
- /// status from the children's own checkpoints.
- ///
- public static ParallelSummary? FromPayload(string? payload)
- {
- if (string.IsNullOrEmpty(payload)) return null;
- try
- {
- return JsonSerializer.Deserialize(payload, ParallelJsonContext.Default.ParallelSummary);
- }
- catch (JsonException)
- {
- return null;
- }
- }
-
- public static string WriteStatus(BatchItemStatus status) => status switch
- {
- BatchItemStatus.Succeeded => "SUCCEEDED",
- BatchItemStatus.Failed => "FAILED",
- BatchItemStatus.Started => "STARTED",
- _ => throw new ArgumentOutOfRangeException(nameof(status))
- };
-
- public static BatchItemStatus ReadStatus(string? wire) => wire switch
- {
- "SUCCEEDED" => BatchItemStatus.Succeeded,
- "FAILED" => BatchItemStatus.Failed,
- "STARTED" => BatchItemStatus.Started,
- _ => BatchItemStatus.Started
- };
-
- public static string WriteCompletionReason(CompletionReason reason) => reason switch
- {
- CompletionReason.AllCompleted => "ALL_COMPLETED",
- CompletionReason.MinSuccessfulReached => "MIN_SUCCESSFUL_REACHED",
- CompletionReason.FailureToleranceExceeded => "FAILURE_TOLERANCE_EXCEEDED",
- _ => throw new ArgumentOutOfRangeException(nameof(reason))
- };
-
- public static CompletionReason ReadCompletionReason(string? wire) => wire switch
- {
- "ALL_COMPLETED" => CompletionReason.AllCompleted,
- "MIN_SUCCESSFUL_REACHED" => CompletionReason.MinSuccessfulReached,
- "FAILURE_TOLERANCE_EXCEEDED" => CompletionReason.FailureToleranceExceeded,
- _ => CompletionReason.AllCompleted
- };
-}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/MapConfig.cs b/Libraries/src/Amazon.Lambda.DurableExecution/MapConfig.cs
new file mode 100644
index 000000000..97b314aac
--- /dev/null
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/MapConfig.cs
@@ -0,0 +1,75 @@
+namespace Amazon.Lambda.DurableExecution;
+
+///
+/// Configuration for
+/// .
+///
+///
+/// Per-item checkpoint payloads are serialized via the
+/// registered on
+/// (typically
+/// configured via LambdaBootstrapBuilder.Create(handler, serializer) );
+/// this config does not expose a serializer slot.
+///
+public sealed class MapConfig
+{
+ private int? _maxConcurrency;
+
+ ///
+ /// Maximum number of items processed concurrently. null (default) =
+ /// unlimited. Must be at least 1 when set.
+ ///
+ ///
+ /// Thrown by the setter if the value is less than or equal to 0.
+ ///
+ public int? MaxConcurrency
+ {
+ get => _maxConcurrency;
+ set
+ {
+ if (value is { } v && v <= 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(value), v,
+ "MaxConcurrency must be at least 1, or null for unlimited.");
+ }
+ _maxConcurrency = value;
+ }
+ }
+
+ ///
+ /// When the map operation is considered complete. Defaults to
+ /// — every item runs regardless
+ /// of per-item failures, which are surfaced via
+ /// rather than thrown.
+ ///
+ ///
+ /// This permissive default matches the Python and Java SDKs' map operation.
+ /// It differs intentionally from ,
+ /// which defaults to (fail-fast).
+ /// For fail-fast map behavior — any item failure surfaces a
+ /// when the result is awaited — set this to
+ /// , or call
+ /// on the result.
+ ///
+ public CompletionConfig CompletionConfig { get; set; } = CompletionConfig.AllCompleted();
+
+ ///
+ /// How item branches are represented in the checkpoint graph. Defaults to
+ /// .
+ ///
+ ///
+ /// is not yet supported in the .NET SDK and
+ /// will throw when the map
+ /// operation is invoked.
+ ///
+ public NestingType NestingType { get; set; } = NestingType.Nested;
+
+ ///
+ /// Optional function to generate a custom name for each item's branch.
+ /// Receives the item and its zero-based index, and returns the branch name
+ /// surfaced in execution traces and on .
+ /// When null (default), branches are named by index ("0" ,
+ /// "1" , ...), matching .
+ ///
+ public Func? ItemNamer { get; set; }
+}
diff --git a/Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs b/Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs
index f6956632c..ca358a46b 100644
--- a/Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs
+++ b/Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs
@@ -204,6 +204,12 @@ public static class OperationSubTypes
/// Parallel branch (per-branch child-context) sub-type.
public const string ParallelBranch = "ParallelBranch";
+
+ /// Map parent sub-type.
+ public const string Map = "Map";
+
+ /// Map item (per-item child-context) sub-type.
+ public const string MapItem = "MapItem";
}
///
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapFailureToleranceTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapFailureToleranceTest.cs
new file mode 100644
index 000000000..06ab716c0
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapFailureToleranceTest.cs
@@ -0,0 +1,69 @@
+using System.Linq;
+using System.Text;
+using Amazon.Lambda.Model;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Amazon.Lambda.DurableExecution.IntegrationTests;
+
+public class MapFailureToleranceTest
+{
+ private readonly ITestOutputHelper _output;
+ public MapFailureToleranceTest(ITestOutputHelper output) => _output = output;
+
+ ///
+ /// Five items, two fail, ToleratedFailureCount=1. The map must surface a
+ /// with reason
+ /// ; the workflow must
+ /// terminate FAILED. Validates the failure-tolerance short-circuit and that
+ /// MapException (not ParallelException ) propagates as the
+ /// workflow's terminal error.
+ ///
+ [Fact]
+ public async Task Map_FailureToleranceExceeded_FailsWorkflow()
+ {
+ await using var deployment = await DurableFunctionDeployment.CreateAsync(
+ DurableFunctionDeployment.FindTestFunctionDir("MapFailureToleranceFunction"),
+ "mtol", _output);
+
+ var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "m3"}""");
+ var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray());
+ _output.WriteLine($"Response: {responsePayload}");
+
+ // Failed workflows return null payload to the Invoke caller — locate the
+ // execution by name to inspect its terminal status.
+ var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60));
+ Assert.NotNull(arn);
+
+ var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(60));
+ Assert.Equal("FAILED", status, ignoreCase: true);
+
+ var execution = await deployment.GetExecutionAsync(arn!);
+ Assert.NotNull(execution.Error);
+ // MapException is the terminal error type the SDK throws when the
+ // failure-tolerance short-circuit fires.
+ var errorType = execution.Error.ErrorType ?? string.Empty;
+ var errorMessage = execution.Error.ErrorMessage ?? string.Empty;
+ Assert.True(
+ errorType.Contains("MapException", StringComparison.Ordinal)
+ || errorMessage.Contains("Map", StringComparison.OrdinalIgnoreCase),
+ $"Expected error to indicate MapException; got type='{errorType}' message='{errorMessage}'");
+
+ // History: parent CONTEXT and at least 2 failed item contexts visible.
+ var history = await deployment.WaitForHistoryAsync(
+ arn!,
+ h => (h.Events?.Count(e => e.EventType == EventType.ContextStarted) ?? 0) >= 3
+ && (h.Events?.Count(e => e.EventType == EventType.ContextFailed) ?? 0) >= 2,
+ TimeSpan.FromSeconds(60));
+ var events = history.Events ?? new List();
+
+ Assert.True(
+ events.Count(e => e.EventType == EventType.ContextFailed) >= 2,
+ $"Expected >= 2 ContextFailed events; got {events.Count(e => e.EventType == EventType.ContextFailed)}");
+
+ // The parent context (named "tolerance") records the aggregate failure.
+ var parentFailed = events.FirstOrDefault(e =>
+ e.EventType == EventType.ContextFailed && e.Name == "tolerance");
+ Assert.NotNull(parentFailed);
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapFirstSuccessfulTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapFirstSuccessfulTest.cs
new file mode 100644
index 000000000..737e70a2f
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapFirstSuccessfulTest.cs
@@ -0,0 +1,70 @@
+using System.Linq;
+using System.Text;
+using System.Text.Json;
+using Amazon.Lambda.Model;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Amazon.Lambda.DurableExecution.IntegrationTests;
+
+public class MapFirstSuccessfulTest
+{
+ private readonly ITestOutputHelper _output;
+ public MapFirstSuccessfulTest(ITestOutputHelper output) => _output = output;
+
+ ///
+ /// Four items with staggered durable waits, FirstSuccessful : as soon
+ /// as one item completes, the map resolves. In-flight items remain in
+ /// rather than being cancelled.
+ /// Validates the cross-cutting decision: orphan units are NOT cancelled, and
+ /// short-circuit reports them as Started.
+ ///
+ [Fact]
+ public async Task Map_FirstSuccessful_ShortCircuitsOnFirstWin()
+ {
+ await using var deployment = await DurableFunctionDeployment.CreateAsync(
+ DurableFunctionDeployment.FindTestFunctionDir("MapFirstSuccessfulFunction"),
+ "mfirst", _output);
+
+ var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "m4"}""");
+ var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray());
+ _output.WriteLine($"Response: {responsePayload}");
+
+ var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60));
+ Assert.NotNull(arn);
+
+ // Wait timer = 8s, plus invocation overhead. Generous timeout for CI variance.
+ var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120));
+ Assert.Equal("SUCCEEDED", status, ignoreCase: true);
+
+ using var doc = JsonDocument.Parse(responsePayload);
+ var winnerIndex = doc.RootElement.GetProperty("WinnerIndex").GetInt32();
+ var winnerName = doc.RootElement.GetProperty("WinnerName").GetString();
+ var completionReason = doc.RootElement.GetProperty("CompletionReason").GetString();
+ var successCount = doc.RootElement.GetProperty("SuccessCount").GetInt32();
+
+ // At least one item succeeded — the workflow short-circuited as soon as
+ // the first win materialised. The fastest item is index 1 (1s wait).
+ Assert.True(successCount >= 1, $"Expected >= 1 successful item, got {successCount}");
+ Assert.True(winnerIndex >= 0 && winnerIndex < 4,
+ $"WinnerIndex should be a valid item index, got {winnerIndex}");
+ Assert.NotNull(winnerName);
+ Assert.NotEqual("FailureToleranceExceeded", completionReason);
+
+ // Service-side: the parent CONTEXT and at least the winning item CONTEXT
+ // succeeded. Other items' final state is timing-dependent (the
+ // orchestrator does not cancel in-flight units on short-circuit).
+ var history = await deployment.WaitForHistoryAsync(
+ arn!,
+ h => (h.Events?.Any(e => e.EventType == EventType.ContextSucceeded && e.Name == "race") ?? false),
+ TimeSpan.FromSeconds(60));
+ var events = history.Events ?? new List();
+
+ var parentSucceeded = events.FirstOrDefault(e =>
+ e.EventType == EventType.ContextSucceeded && e.Name == "race");
+ Assert.NotNull(parentSucceeded);
+
+ // The winning item's CONTEXT SUCCEEDED is in the history.
+ Assert.Contains(events, e => e.EventType == EventType.ContextSucceeded && e.Name == winnerName);
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapHappyPathTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapHappyPathTest.cs
new file mode 100644
index 000000000..6ee451049
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapHappyPathTest.cs
@@ -0,0 +1,75 @@
+using System.Linq;
+using System.Text;
+using Amazon.Lambda.Model;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Amazon.Lambda.DurableExecution.IntegrationTests;
+
+public class MapHappyPathTest
+{
+ private readonly ITestOutputHelper _output;
+ public MapHappyPathTest(ITestOutputHelper output) => _output = output;
+
+ ///
+ /// End-to-end happy-path map: three items each processed in a step, and the
+ /// workflow returns the joined results. Validates the parent CONTEXT and
+ /// per-item CONTEXT checkpoints all land in the service-side history with the
+ /// correct (ItemNamer-derived) names and ordering.
+ ///
+ [Fact]
+ public async Task Map_AllItemsSucceed()
+ {
+ await using var deployment = await DurableFunctionDeployment.CreateAsync(
+ DurableFunctionDeployment.FindTestFunctionDir("MapHappyPathFunction"),
+ "mhappy", _output);
+
+ var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "m1"}""");
+ Assert.Equal(200, invokeResponse.StatusCode);
+
+ var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray());
+ _output.WriteLine($"Response: {responsePayload}");
+
+ var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60));
+ Assert.NotNull(arn);
+
+ var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(60));
+ Assert.Equal("SUCCEEDED", status, ignoreCase: true);
+
+ // The user-visible payload contains all three item outputs in index
+ // order (the SDK preserves index order even when items race).
+ Assert.Contains("order-1-m1", responsePayload);
+ Assert.Contains("order-2-m1", responsePayload);
+ Assert.Contains("order-3-m1", responsePayload);
+
+ // History is eventually consistent — wait until the parent CONTEXT and
+ // all three item CONTEXT checkpoints are visible.
+ var history = await deployment.WaitForHistoryAsync(
+ arn!,
+ h => (h.Events?.Count(e => e.EventType == EventType.ContextStarted) ?? 0) >= 4
+ && (h.Events?.Count(e => e.EventType == EventType.ContextSucceeded) ?? 0) >= 4,
+ TimeSpan.FromSeconds(60));
+ var events = history.Events ?? new List();
+
+ // Parent + 3 items = 4 ContextStarted, 4 ContextSucceeded.
+ Assert.Equal(4, events.Count(e => e.EventType == EventType.ContextStarted));
+ Assert.Equal(4, events.Count(e => e.EventType == EventType.ContextSucceeded));
+
+ // The three items show up by their ItemNamer name on their own
+ // ContextStarted events.
+ var startedNames = events
+ .Where(e => e.EventType == EventType.ContextStarted)
+ .Select(e => e.Name)
+ .ToList();
+ Assert.Contains("process_all", startedNames);
+ Assert.Contains("item-order-1", startedNames);
+ Assert.Contains("item-order-2", startedNames);
+ Assert.Contains("item-order-3", startedNames);
+
+ // Each item ran one step => 3 StepSucceeded.
+ Assert.Equal(3, events.Count(e => e.EventType == EventType.StepSucceeded));
+
+ // No item failed.
+ Assert.Empty(events.Where(e => e.EventType == EventType.ContextFailed));
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapMaxConcurrencyTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapMaxConcurrencyTest.cs
new file mode 100644
index 000000000..7c55418e7
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapMaxConcurrencyTest.cs
@@ -0,0 +1,69 @@
+using System.Linq;
+using System.Text;
+using System.Text.Json;
+using Amazon.Lambda.Model;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Amazon.Lambda.DurableExecution.IntegrationTests;
+
+public class MapMaxConcurrencyTest
+{
+ private readonly ITestOutputHelper _output;
+ public MapMaxConcurrencyTest(ITestOutputHelper output) => _output = output;
+
+ ///
+ /// 6 items, each with a 2-second durable wait, MaxConcurrency = 2. Validates
+ /// the semaphore actually throttles dispatch: timestamps must cluster into
+ /// waves rather than all six firing simultaneously. Timing tolerance is
+ /// intentionally generous to avoid CI flakiness; the load-bearing assertion
+ /// is "not all 6 ran at once".
+ ///
+ [Fact]
+ public async Task Map_MaxConcurrency_ThrottlesItemDispatch()
+ {
+ await using var deployment = await DurableFunctionDeployment.CreateAsync(
+ DurableFunctionDeployment.FindTestFunctionDir("MapMaxConcurrencyFunction"),
+ "mmaxc", _output);
+
+ var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "m5"}""");
+ var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray());
+ _output.WriteLine($"Response: {responsePayload}");
+
+ var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60));
+ Assert.NotNull(arn);
+
+ // 3 waves x 2s waits + invocation overhead. Allow generous headroom.
+ var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(180));
+ Assert.Equal("SUCCEEDED", status, ignoreCase: true);
+
+ using var doc = JsonDocument.Parse(responsePayload);
+ var successCount = doc.RootElement.GetProperty("SuccessCount").GetInt32();
+ Assert.Equal(6, successCount);
+
+ var timestamps = doc.RootElement.GetProperty("Timestamps")
+ .EnumerateArray().Select(t => t.GetInt64()).ToList();
+ Assert.Equal(6, timestamps.Count);
+
+ var sorted = timestamps.OrderBy(t => t).ToList();
+ var minTs = sorted[0];
+ var relative = sorted.Select(t => t - minTs).ToList();
+ _output.WriteLine($"Relative timestamps (ms): {string.Join(", ", relative)}");
+
+ // Tolerant clustering: with MaxConcurrency=2 and 2s waits, the first wave
+ // should hold ~2 items. Strict 3-wave clustering can be flaky under
+ // service jitter, so we assert the weaker (still meaningful) property:
+ // not all 6 items fired in the same wave.
+ var firstWave = relative.Where(r => r < 1500).Count();
+ Assert.True(firstWave <= 3,
+ $"Expected MaxConcurrency=2 to limit the first wave to ~2 items; got {firstWave} within 1500ms of start. " +
+ $"Relative timestamps: [{string.Join(", ", relative)}]");
+
+ // The full set must span at least one wave-gap (~2s) — proving items did
+ // NOT all run at once.
+ var total = sorted[^1] - sorted[0];
+ Assert.True(total >= 1500,
+ $"Expected items to span >= 1500ms (proves throttling); got {total}ms. " +
+ $"Relative timestamps: [{string.Join(", ", relative)}]");
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapPartialFailureTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapPartialFailureTest.cs
new file mode 100644
index 000000000..6a29c18df
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapPartialFailureTest.cs
@@ -0,0 +1,75 @@
+using System.Linq;
+using System.Text;
+using System.Text.Json;
+using Amazon.Lambda.Model;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Amazon.Lambda.DurableExecution.IntegrationTests;
+
+public class MapPartialFailureTest
+{
+ private readonly ITestOutputHelper _output;
+ public MapPartialFailureTest(ITestOutputHelper output) => _output = output;
+
+ ///
+ /// Three items, one throws, two succeed — with NO config supplied. Map's
+ /// default CompletionConfig is AllCompleted() (permissive),
+ /// unlike Parallel's AllSuccessful() . This validates the headline
+ /// Map-vs-Parallel behavioral difference end-to-end: a partial failure does
+ /// NOT fail the workflow; it surfaces success/failure counts and per-item
+ /// errors through the service round-trip and back into the rebuilt
+ /// .
+ ///
+ [Fact]
+ public async Task Map_PartialFailure_DefaultIsPermissive_ReportsCounts()
+ {
+ await using var deployment = await DurableFunctionDeployment.CreateAsync(
+ DurableFunctionDeployment.FindTestFunctionDir("MapPartialFailureFunction"),
+ "mpartial", _output);
+
+ var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "m2"}""");
+ var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray());
+ _output.WriteLine($"Response: {responsePayload}");
+
+ var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60));
+ Assert.NotNull(arn);
+
+ var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(60));
+ // Permissive default means partial failure is NOT a workflow failure —
+ // the workflow accepted the failure and returned a result.
+ Assert.Equal("SUCCEEDED", status, ignoreCase: true);
+
+ using var doc = JsonDocument.Parse(responsePayload);
+ var successCount = doc.RootElement.GetProperty("SuccessCount").GetInt32();
+ var failureCount = doc.RootElement.GetProperty("FailureCount").GetInt32();
+ var errorSummary = doc.RootElement.GetProperty("ErrorSummary").GetString();
+
+ Assert.Equal(2, successCount);
+ Assert.Equal(1, failureCount);
+ Assert.NotNull(errorSummary);
+ Assert.Contains("intentional partial failure", errorSummary);
+
+ // History: 1 parent + 3 items = 4 ContextStarted; 3 ContextSucceeded
+ // (parent + 2 ok items); 1 ContextFailed (the boom item).
+ var history = await deployment.WaitForHistoryAsync(
+ arn!,
+ h => (h.Events?.Count(e => e.EventType == EventType.ContextStarted) ?? 0) >= 4
+ && (h.Events?.Any(e => e.EventType == EventType.ContextFailed) ?? false)
+ && (h.Events?.Count(e => e.EventType == EventType.ContextSucceeded) ?? 0) >= 3,
+ TimeSpan.FromSeconds(60));
+ var events = history.Events ?? new List();
+
+ Assert.Equal(4, events.Count(e => e.EventType == EventType.ContextStarted));
+ Assert.Equal(3, events.Count(e => e.EventType == EventType.ContextSucceeded));
+ Assert.Equal(1, events.Count(e => e.EventType == EventType.ContextFailed));
+
+ // The failing item's checkpoint preserves the exception message. Its
+ // branch name is the default index ("1", the middle item).
+ var failedEvent = events.SingleOrDefault(e => e.EventType == EventType.ContextFailed);
+ Assert.NotNull(failedEvent);
+ Assert.Equal("1", failedEvent!.Name);
+ Assert.Contains("intentional partial failure",
+ failedEvent.ContextFailedDetails?.Error?.Payload?.ErrorMessage ?? string.Empty);
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapReplayDeterminismTest.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapReplayDeterminismTest.cs
new file mode 100644
index 000000000..02b867958
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapReplayDeterminismTest.cs
@@ -0,0 +1,114 @@
+using System.Linq;
+using System.Security.Cryptography;
+using System.Text;
+using Amazon.Lambda.Model;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Amazon.Lambda.DurableExecution.IntegrationTests;
+
+public class MapReplayDeterminismTest
+{
+ private readonly ITestOutputHelper _output;
+ public MapReplayDeterminismTest(ITestOutputHelper output) => _output = output;
+
+ ///
+ /// Each item's operation ID must equal SHA-256(parentOpId + "-" + (index+1))
+ /// (matching OperationIdGenerator's CreateChild contract). Reproduced locally
+ /// because OperationIdGenerator is internal to the SDK.
+ ///
+ private static string HashOpId(string raw)
+ {
+ var bytes = Encoding.UTF8.GetBytes(raw);
+ var hash = SHA256.HashData(bytes);
+ var sb = new StringBuilder(hash.Length * 2);
+ foreach (var b in hash) sb.Append(b.ToString("x2"));
+ return sb.ToString();
+ }
+
+ ///
+ /// Three map items, each containing a step + a durable wait (the wait forces
+ /// a suspend/resume cycle so the map actually replays). Verifies:
+ /// 1. The item operation IDs match the deterministic
+ /// SHA256("<parentId>-<n>") formula (the same one used by
+ /// OperationIdGenerator.CreateChild and the reference Java/JS/Python SDKs).
+ /// 2. Each item's user-visible step result is preserved across replay (the
+ /// GUID generated inside generate survives suspend/resume).
+ ///
+ [Fact]
+ public async Task Map_ItemOperationIds_AreDeterministic_AcrossReplay()
+ {
+ await using var deployment = await DurableFunctionDeployment.CreateAsync(
+ DurableFunctionDeployment.FindTestFunctionDir("MapReplayDeterminismFunction"),
+ "mreplay", _output);
+
+ var (invokeResponse, executionName) = await deployment.InvokeAsync("""{"orderId": "m6"}""");
+ var responsePayload = Encoding.UTF8.GetString(invokeResponse.Payload.ToArray());
+ _output.WriteLine($"Response: {responsePayload}");
+
+ var arn = await deployment.FindDurableExecutionArnByNameAsync(executionName, TimeSpan.FromSeconds(60));
+ Assert.NotNull(arn);
+
+ var status = await deployment.PollForCompletionAsync(arn!, TimeSpan.FromSeconds(120));
+ Assert.Equal("SUCCEEDED", status, ignoreCase: true);
+
+ // The map parent is the first root-level operation -> SHA256("1").
+ var parentOpId = HashOpId("1");
+ var expectedItemIds = new[]
+ {
+ HashOpId($"{parentOpId}-1"),
+ HashOpId($"{parentOpId}-2"),
+ HashOpId($"{parentOpId}-3"),
+ };
+
+ // Wait until each item's CONTEXT SUCCEEDED is visible AND each item's
+ // step/wait events are visible (they live under the item operation IDs).
+ var history = await deployment.WaitForHistoryAsync(
+ arn!,
+ h =>
+ {
+ var events = h.Events ?? new List();
+ if (events.Count(e => e.EventType == EventType.ContextSucceeded) < 4) return false;
+ if (events.Count(e => e.EventType == EventType.StepSucceeded) < 3) return false;
+ if (events.Count(e => e.EventType == EventType.WaitSucceeded) < 3) return false;
+ return true;
+ },
+ TimeSpan.FromSeconds(60));
+ var allEvents = history.Events ?? new List();
+
+ // 1. Item operation IDs match the deterministic hash.
+ var itemStartedEvents = allEvents
+ .Where(e => e.EventType == EventType.ContextStarted && e.Id != null && e.Id != parentOpId)
+ .ToList();
+ var observedItemIds = itemStartedEvents.Select(e => e.Id).Distinct().ToList();
+ Assert.Equal(3, observedItemIds.Count);
+ foreach (var expected in expectedItemIds)
+ {
+ Assert.Contains(expected, observedItemIds);
+ }
+
+ // 2. Each item's CONTEXT succeeded (parent named "fanout" excluded).
+ var itemSucceededEvents = allEvents
+ .Where(e => e.EventType == EventType.ContextSucceeded && e.Name != "fanout")
+ .ToList();
+ Assert.Equal(3, itemSucceededEvents.Count);
+
+ // 3. Each item's "generate" step succeeded exactly once — proving replay
+ // returned the cached step result rather than re-executing.
+ var stepSucceededEvents = allEvents
+ .Where(e => e.EventType == EventType.StepSucceeded && e.Name == "generate")
+ .ToList();
+ Assert.Equal(3, stepSucceededEvents.Count);
+
+ // 4. The wait events span at least 2 invocations (suspend + resume),
+ // proving replay actually happened.
+ var invocations = allEvents.Where(e => e.InvocationCompletedDetails != null).ToList();
+ Assert.True(
+ invocations.Count >= 2,
+ $"Expected >= 2 InvocationCompleted events (suspend + resume), got {invocations.Count}");
+
+ // 5. The user-visible response contains the per-item step results
+ // (proving they survived replay).
+ Assert.Contains("\"data\"", responsePayload, StringComparison.OrdinalIgnoreCase);
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/Dockerfile
new file mode 100644
index 000000000..c1913d56a
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/Dockerfile
@@ -0,0 +1,7 @@
+FROM public.ecr.aws/lambda/provided:al2023
+
+RUN dnf install -y libicu
+
+COPY bin/publish/ ${LAMBDA_TASK_ROOT}
+
+ENTRYPOINT ["/var/task/bootstrap"]
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/Function.cs
new file mode 100644
index 000000000..62712b6a4
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/Function.cs
@@ -0,0 +1,55 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.RuntimeSupport;
+using Amazon.Lambda.Serialization.SystemTextJson;
+
+namespace DurableExecutionTestFunction;
+
+public class Function
+{
+ public static async Task Main(string[] args)
+ {
+ var handler = new Function();
+ var serializer = new DefaultLambdaJsonSerializer();
+ using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer);
+ using var bootstrap = new LambdaBootstrap(handlerWrapper);
+ await bootstrap.RunAsync();
+ }
+
+ public Task Handler(
+ DurableExecutionInvocationInput input, ILambdaContext context)
+ => DurableFunction.WrapAsync(Workflow, input, context);
+
+ private async Task Workflow(TestEvent input, IDurableContext context)
+ {
+ // Five items, two throw. ToleratedFailureCount = 1 means a second failure
+ // exceeds tolerance and the map surfaces a MapException — terminating the
+ // workflow FAILED.
+ var items = new[] { "ok1", "bad1", "ok2", "bad2", "ok3" };
+
+ var batch = await context.MapAsync(
+ items,
+ async (ctx, item, index, all) =>
+ {
+ await Task.CompletedTask;
+ if (item.StartsWith("bad"))
+ throw new InvalidOperationException($"{item} boom");
+ return item;
+ },
+ name: "tolerance",
+ config: new MapConfig
+ {
+ CompletionConfig = new CompletionConfig { ToleratedFailureCount = 1 }
+ });
+
+ // Should not reach here — the map must throw MapException.
+ return new TestResult { Status = "should_not_reach", SuccessCount = batch.SuccessCount };
+ }
+}
+
+public class TestEvent { public string? OrderId { get; set; } }
+public class TestResult
+{
+ public string? Status { get; set; }
+ public int SuccessCount { get; set; }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/MapFailureToleranceFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/MapFailureToleranceFunction.csproj
new file mode 100644
index 000000000..f8bf7fd0c
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/MapFailureToleranceFunction.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net10.0
+ Exe
+ true
+ bootstrap
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/Dockerfile
new file mode 100644
index 000000000..c1913d56a
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/Dockerfile
@@ -0,0 +1,7 @@
+FROM public.ecr.aws/lambda/provided:al2023
+
+RUN dnf install -y libicu
+
+COPY bin/publish/ ${LAMBDA_TASK_ROOT}
+
+ENTRYPOINT ["/var/task/bootstrap"]
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/Function.cs
new file mode 100644
index 000000000..d083a054b
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/Function.cs
@@ -0,0 +1,63 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.RuntimeSupport;
+using Amazon.Lambda.Serialization.SystemTextJson;
+
+namespace DurableExecutionTestFunction;
+
+public class Function
+{
+ public static async Task Main(string[] args)
+ {
+ var handler = new Function();
+ var serializer = new DefaultLambdaJsonSerializer();
+ using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer);
+ using var bootstrap = new LambdaBootstrap(handlerWrapper);
+ await bootstrap.RunAsync();
+ }
+
+ public Task Handler(
+ DurableExecutionInvocationInput input, ILambdaContext context)
+ => DurableFunction.WrapAsync(Workflow, input, context);
+
+ private async Task Workflow(TestEvent input, IDurableContext context)
+ {
+ // Four items, each waits a different (durable) duration. The shortest
+ // wait should win and short-circuit the map via FirstSuccessful. Wait
+ // durations are at least 1s (service timer granularity). The item value
+ // IS the wait-seconds; the result is the item's index.
+ var waitSeconds = new[] { 8, 1, 5, 6 };
+
+ var batch = await context.MapAsync(
+ waitSeconds,
+ async (ctx, seconds, index, all) =>
+ {
+ await ctx.WaitAsync(TimeSpan.FromSeconds(seconds), name: $"wait_{index}");
+ return index;
+ },
+ name: "race",
+ config: new MapConfig { CompletionConfig = CompletionConfig.FirstSuccessful() });
+
+ var winner = batch.Succeeded.FirstOrDefault();
+ return new TestResult
+ {
+ Status = "completed",
+ WinnerIndex = winner?.Index ?? -1,
+ WinnerName = winner?.Name,
+ CompletionReason = batch.CompletionReason.ToString(),
+ SuccessCount = batch.SuccessCount,
+ StartedCount = batch.StartedCount
+ };
+ }
+}
+
+public class TestEvent { public string? OrderId { get; set; } }
+public class TestResult
+{
+ public string? Status { get; set; }
+ public int WinnerIndex { get; set; }
+ public string? WinnerName { get; set; }
+ public string? CompletionReason { get; set; }
+ public int SuccessCount { get; set; }
+ public int StartedCount { get; set; }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/MapFirstSuccessfulFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/MapFirstSuccessfulFunction.csproj
new file mode 100644
index 000000000..f8bf7fd0c
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/MapFirstSuccessfulFunction.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net10.0
+ Exe
+ true
+ bootstrap
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/Dockerfile
new file mode 100644
index 000000000..c1913d56a
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/Dockerfile
@@ -0,0 +1,7 @@
+FROM public.ecr.aws/lambda/provided:al2023
+
+RUN dnf install -y libicu
+
+COPY bin/publish/ ${LAMBDA_TASK_ROOT}
+
+ENTRYPOINT ["/var/task/bootstrap"]
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/Function.cs
new file mode 100644
index 000000000..14da119f8
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/Function.cs
@@ -0,0 +1,45 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.RuntimeSupport;
+using Amazon.Lambda.Serialization.SystemTextJson;
+
+namespace DurableExecutionTestFunction;
+
+public class Function
+{
+ public static async Task Main(string[] args)
+ {
+ var handler = new Function();
+ var serializer = new DefaultLambdaJsonSerializer();
+ using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer);
+ using var bootstrap = new LambdaBootstrap(handlerWrapper);
+ await bootstrap.RunAsync();
+ }
+
+ public Task Handler(
+ DurableExecutionInvocationInput input, ILambdaContext context)
+ => DurableFunction.WrapAsync(Workflow, input, context);
+
+ private async Task Workflow(TestEvent input, IDurableContext context)
+ {
+ var orders = new[] { "order-1", "order-2", "order-3" };
+
+ // Each item is processed inside a step so the per-item child context
+ // owns a leaf operation. ItemNamer gives each item a readable branch
+ // name in the service-side history.
+ var batch = await context.MapAsync(
+ orders,
+ async (ctx, orderId, index, all) =>
+ await ctx.StepAsync(
+ async (_) => { await Task.CompletedTask; return $"{orderId}-{input.OrderId}"; },
+ name: "process"),
+ name: "process_all",
+ config: new MapConfig { ItemNamer = (item, index) => $"item-{item}" });
+
+ var joined = string.Join(",", batch.GetResults());
+ return new TestResult { Status = "completed", Data = joined };
+ }
+}
+
+public class TestEvent { public string? OrderId { get; set; } }
+public class TestResult { public string? Status { get; set; } public string? Data { get; set; } }
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/MapHappyPathFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/MapHappyPathFunction.csproj
new file mode 100644
index 000000000..f8bf7fd0c
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/MapHappyPathFunction.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net10.0
+ Exe
+ true
+ bootstrap
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/Dockerfile
new file mode 100644
index 000000000..c1913d56a
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/Dockerfile
@@ -0,0 +1,7 @@
+FROM public.ecr.aws/lambda/provided:al2023
+
+RUN dnf install -y libicu
+
+COPY bin/publish/ ${LAMBDA_TASK_ROOT}
+
+ENTRYPOINT ["/var/task/bootstrap"]
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/Function.cs
new file mode 100644
index 000000000..0499a7a93
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/Function.cs
@@ -0,0 +1,61 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.RuntimeSupport;
+using Amazon.Lambda.Serialization.SystemTextJson;
+
+namespace DurableExecutionTestFunction;
+
+public class Function
+{
+ public static async Task Main(string[] args)
+ {
+ var handler = new Function();
+ var serializer = new DefaultLambdaJsonSerializer();
+ using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer);
+ using var bootstrap = new LambdaBootstrap(handlerWrapper);
+ await bootstrap.RunAsync();
+ }
+
+ public Task Handler(
+ DurableExecutionInvocationInput input, ILambdaContext context)
+ => DurableFunction.WrapAsync(Workflow, input, context);
+
+ private async Task Workflow(TestEvent input, IDurableContext context)
+ {
+ // 6 items, MaxConcurrency = 2. Each item does a 2-second durable wait
+ // then captures the post-wait wall-clock as a unix-ms timestamp. The
+ // expected outcome is 3 waves of 2 items; total elapsed ~6s. Use
+ // IDurableContext.WaitAsync (not Task.Delay) — Task.Delay is NOT durable
+ // and would skew this measurement under replay.
+ var items = new[] { 0, 1, 2, 3, 4, 5 };
+
+ var batch = await context.MapAsync(
+ items,
+ async (ctx, item, index, all) =>
+ {
+ await ctx.WaitAsync(TimeSpan.FromSeconds(2), name: $"wait_{index}");
+ return DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
+ },
+ name: "throttled",
+ config: new MapConfig
+ {
+ MaxConcurrency = 2,
+ CompletionConfig = CompletionConfig.AllCompleted()
+ });
+
+ return new TestResult
+ {
+ Status = "completed",
+ SuccessCount = batch.SuccessCount,
+ Timestamps = batch.GetResults().ToArray()
+ };
+ }
+}
+
+public class TestEvent { public string? OrderId { get; set; } }
+public class TestResult
+{
+ public string? Status { get; set; }
+ public int SuccessCount { get; set; }
+ public long[]? Timestamps { get; set; }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/MapMaxConcurrencyFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/MapMaxConcurrencyFunction.csproj
new file mode 100644
index 000000000..f8bf7fd0c
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/MapMaxConcurrencyFunction.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net10.0
+ Exe
+ true
+ bootstrap
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/Dockerfile
new file mode 100644
index 000000000..c1913d56a
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/Dockerfile
@@ -0,0 +1,7 @@
+FROM public.ecr.aws/lambda/provided:al2023
+
+RUN dnf install -y libicu
+
+COPY bin/publish/ ${LAMBDA_TASK_ROOT}
+
+ENTRYPOINT ["/var/task/bootstrap"]
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/Function.cs
new file mode 100644
index 000000000..39676c3ed
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/Function.cs
@@ -0,0 +1,63 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.RuntimeSupport;
+using Amazon.Lambda.Serialization.SystemTextJson;
+
+namespace DurableExecutionTestFunction;
+
+public class Function
+{
+ public static async Task Main(string[] args)
+ {
+ var handler = new Function();
+ var serializer = new DefaultLambdaJsonSerializer();
+ using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer);
+ using var bootstrap = new LambdaBootstrap(handlerWrapper);
+ await bootstrap.RunAsync();
+ }
+
+ public Task Handler(
+ DurableExecutionInvocationInput input, ILambdaContext context)
+ => DurableFunction.WrapAsync(Workflow, input, context);
+
+ private async Task Workflow(TestEvent input, IDurableContext context)
+ {
+ // Three items, the middle one throws. Map's DEFAULT CompletionConfig is
+ // AllCompleted() (permissive) — unlike Parallel's AllSuccessful() — so NO
+ // config is supplied here and the map must still drive every item to a
+ // terminal state without throwing. This is the key Map-vs-Parallel
+ // behavioral difference, validated end-to-end.
+ var items = new[] { "ok1", "boom", "ok2" };
+
+ var batch = await context.MapAsync(
+ items,
+ async (ctx, item, index, all) =>
+ {
+ await Task.CompletedTask;
+ if (item == "boom")
+ throw new InvalidOperationException("intentional partial failure");
+ return item;
+ },
+ name: "partial");
+
+ var errors = batch.GetErrors();
+ var errorSummary = string.Join("|", errors.Select(e => $"{e.GetType().Name}:{e.Message}"));
+
+ return new TestResult
+ {
+ Status = "completed",
+ SuccessCount = batch.SuccessCount,
+ FailureCount = batch.FailureCount,
+ ErrorSummary = errorSummary
+ };
+ }
+}
+
+public class TestEvent { public string? OrderId { get; set; } }
+public class TestResult
+{
+ public string? Status { get; set; }
+ public int SuccessCount { get; set; }
+ public int FailureCount { get; set; }
+ public string? ErrorSummary { get; set; }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/MapPartialFailureFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/MapPartialFailureFunction.csproj
new file mode 100644
index 000000000..f8bf7fd0c
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/MapPartialFailureFunction.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net10.0
+ Exe
+ true
+ bootstrap
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/Dockerfile b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/Dockerfile
new file mode 100644
index 000000000..c1913d56a
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/Dockerfile
@@ -0,0 +1,7 @@
+FROM public.ecr.aws/lambda/provided:al2023
+
+RUN dnf install -y libicu
+
+COPY bin/publish/ ${LAMBDA_TASK_ROOT}
+
+ENTRYPOINT ["/var/task/bootstrap"]
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/Function.cs b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/Function.cs
new file mode 100644
index 000000000..9a75cbd5e
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/Function.cs
@@ -0,0 +1,53 @@
+using Amazon.Lambda.Core;
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.RuntimeSupport;
+using Amazon.Lambda.Serialization.SystemTextJson;
+
+namespace DurableExecutionTestFunction;
+
+public class Function
+{
+ public static async Task Main(string[] args)
+ {
+ var handler = new Function();
+ var serializer = new DefaultLambdaJsonSerializer();
+ using var handlerWrapper = HandlerWrapper.GetHandlerWrapper(handler.Handler, serializer);
+ using var bootstrap = new LambdaBootstrap(handlerWrapper);
+ await bootstrap.RunAsync();
+ }
+
+ public Task Handler(
+ DurableExecutionInvocationInput input, ILambdaContext context)
+ => DurableFunction.WrapAsync(Workflow, input, context);
+
+ private async Task Workflow(TestEvent input, IDurableContext context)
+ {
+ // Three items. Each item generates a fresh GUID inside a step, then does
+ // a durable wait. The wait forces a suspend/resume cycle, so the second
+ // invocation MUST replay the cached GUID rather than re-running the step.
+ // If replay determinism is broken, the GUID would change between the
+ // original execution and replay.
+ var items = new[] { 0, 1, 2 };
+
+ var batch = await context.MapAsync(
+ items,
+ async (ctx, item, index, all) =>
+ {
+ var generatedId = await ctx.StepAsync(
+ async (_) => { await Task.CompletedTask; return Guid.NewGuid().ToString(); },
+ name: "generate");
+
+ // Force a suspend/resume cycle to trigger replay of the map.
+ await ctx.WaitAsync(TimeSpan.FromSeconds(2), name: "boundary");
+
+ return generatedId;
+ },
+ name: "fanout");
+
+ var joined = string.Join(",", batch.GetResults());
+ return new TestResult { Status = "completed", Data = joined };
+ }
+}
+
+public class TestEvent { public string? OrderId { get; set; } }
+public class TestResult { public string? Status { get; set; } public string? Data { get; set; } }
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/MapReplayDeterminismFunction.csproj b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/MapReplayDeterminismFunction.csproj
new file mode 100644
index 000000000..f8bf7fd0c
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/MapReplayDeterminismFunction.csproj
@@ -0,0 +1,18 @@
+
+
+
+ net10.0
+ Exe
+ true
+ bootstrap
+ enable
+ enable
+
+
+
+
+
+
+
+
+
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/MapOperationTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/MapOperationTests.cs
new file mode 100644
index 000000000..68ffad0c1
--- /dev/null
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/MapOperationTests.cs
@@ -0,0 +1,688 @@
+using Amazon.Lambda.DurableExecution;
+using Amazon.Lambda.DurableExecution.Internal;
+using Amazon.Lambda.Serialization.SystemTextJson;
+using Amazon.Lambda.TestUtilities;
+using Xunit;
+
+namespace Amazon.Lambda.DurableExecution.Tests;
+
+public class MapOperationTests
+{
+ /// Reproduces the Id that emits for the n-th root-level operation.
+ private static string IdAt(int position) => OperationIdGenerator.HashOperationId(position.ToString());
+
+ /// The hashed ID of the n-th child operation under .
+ private static string ChildIdAt(string parentOpId, int position) =>
+ OperationIdGenerator.HashOperationId($"{parentOpId}-{position}");
+
+ private static (DurableContext context, RecordingBatcher recorder, TerminationManager tm, ExecutionState state)
+ CreateContext(InitialExecutionState? initialState = null)
+ {
+ var state = new ExecutionState();
+ state.LoadFromCheckpoint(initialState);
+ var tm = new TerminationManager();
+ var idGen = new OperationIdGenerator();
+#pragma warning disable AWSLAMBDA001 // TestLambdaContext.Serializer is experimental.
+ var lambdaContext = new TestLambdaContext { Serializer = new DefaultLambdaJsonSerializer() };
+#pragma warning restore AWSLAMBDA001
+ var recorder = new RecordingBatcher();
+ var context = new DurableContext(state, tm, new WorkflowCancellation(tm), idGen, "arn:test", lambdaContext, recorder.Batcher);
+ return (context, recorder, tm, state);
+ }
+
+ // ──────────────────────────────────────────────────────────────────────
+ // Public surface — basic happy paths
+ // ──────────────────────────────────────────────────────────────────────
+
+ [Fact]
+ public async Task MapAsync_FreshExecution_AllItemsSucceed()
+ {
+ var (context, recorder, tm, _) = CreateContext();
+
+ var items = new[] { 10, 20, 30 };
+
+ var result = await context.MapAsync(
+ items,
+ async (ctx, item, index, all) => { await Task.Yield(); return item * 2; },
+ name: "double_all");
+
+ Assert.False(tm.IsTerminated);
+ Assert.Equal(3, result.TotalCount);
+ Assert.Equal(3, result.SuccessCount);
+ Assert.Equal(0, result.FailureCount);
+ Assert.Equal(0, result.StartedCount);
+ Assert.False(result.HasFailure);
+ Assert.Equal(CompletionReason.AllCompleted, result.CompletionReason);
+ Assert.Equal(new[] { 20, 40, 60 }, result.GetResults());
+
+ await recorder.Batcher.DrainAsync();
+
+ // Parent CONTEXT START + 3 item CONTEXT STARTs + 3 item CONTEXT SUCCEEDs + Parent CONTEXT SUCCEED
+ var contextActions = recorder.Flushed.Where(o => o.Type == "CONTEXT")
+ .Select(o => $"{o.SubType}:{o.Action}").ToArray();
+ Assert.Equal(8, contextActions.Length);
+ Assert.Equal("Map:START", contextActions[0]);
+ Assert.Equal("Map:SUCCEED", contextActions[^1]);
+ }
+
+ [Fact]
+ public async Task MapAsync_PassesItemIndexAndFullList_ToCallback()
+ {
+ var (context, _, _, _) = CreateContext();
+
+ var items = new[] { "a", "b", "c" };
+
+ var result = await context.MapAsync(
+ items,
+ async (ctx, item, index, all) =>
+ {
+ await Task.Yield();
+ // Confirm the callback sees the item, its index, and the whole list.
+ Assert.Same(items, all);
+ Assert.Equal(items[index], item);
+ return $"{index}:{item}:{all.Count}";
+ });
+
+ Assert.Equal(new[] { "0:a:3", "1:b:3", "2:c:3" }, result.GetResults());
+ }
+
+ [Fact]
+ public async Task MapAsync_PreservesIndexOrder_EvenWhenItemsCompleteOutOfOrder()
+ {
+ var (context, _, _, _) = CreateContext();
+
+ var result = await context.MapAsync(
+ new[] { 40, 10, 20 },
+ async (ctx, delay, index, all) => { await Task.Delay(delay); return index + 1; });
+
+ Assert.Equal(new[] { 1, 2, 3 }, result.GetResults());
+ for (var i = 0; i < result.All.Count; i++)
+ {
+ Assert.Equal(i, result.All[i].Index);
+ }
+ }
+
+ [Fact]
+ public async Task MapAsync_ItemOperationIds_AreDeterministic()
+ {
+ var (context, recorder, _, _) = CreateContext();
+
+ await context.MapAsync(
+ new[] { "a", "b" },
+ async (ctx, item, index, all) => { await Task.Yield(); return item; });
+
+ await recorder.Batcher.DrainAsync();
+
+ var parentOpId = IdAt(1);
+ var firstItemId = ChildIdAt(parentOpId, 1);
+ var secondItemId = ChildIdAt(parentOpId, 2);
+
+ var itemStarts = recorder.Flushed
+ .Where(o => o.Type == "CONTEXT" && o.SubType == "MapItem" && o.Action == "START")
+ .ToArray();
+ Assert.Equal(2, itemStarts.Length);
+ Assert.Contains(itemStarts, o => o.Id == firstItemId);
+ Assert.Contains(itemStarts, o => o.Id == secondItemId);
+ }
+
+ [Fact]
+ public async Task MapAsync_DefaultNaming_UsesIndexAsName()
+ {
+ var (context, _, _, _) = CreateContext();
+
+ var result = await context.MapAsync(
+ new[] { 1, 2 },
+ async (ctx, item, index, all) => { await Task.Yield(); return item; });
+
+ Assert.Equal("0", result.All[0].Name);
+ Assert.Equal("1", result.All[1].Name);
+ }
+
+ [Fact]
+ public async Task MapAsync_ItemNamer_PropagatesNameToCheckpointAndItem()
+ {
+ var (context, recorder, _, _) = CreateContext();
+
+ var result = await context.MapAsync(
+ new[] { "order-1", "order-2" },
+ async (ctx, item, index, all) => { await Task.Yield(); return item.Length; },
+ name: "process_orders",
+ config: new MapConfig { ItemNamer = (item, index) => $"Order-{item}" });
+
+ Assert.Equal("Order-order-1", result.All[0].Name);
+ Assert.Equal("Order-order-2", result.All[1].Name);
+
+ await recorder.Batcher.DrainAsync();
+
+ var itemSucceeds = recorder.Flushed
+ .Where(o => o.Type == "CONTEXT" && o.SubType == "MapItem" && o.Action == "SUCCEED")
+ .ToArray();
+ Assert.Contains(itemSucceeds, o => o.Name == "Order-order-1");
+ Assert.Contains(itemSucceeds, o => o.Name == "Order-order-2");
+ }
+
+ [Fact]
+ public async Task MapAsync_EmptyCollection_ReturnsEmptyResultWithAllCompleted()
+ {
+ var (context, recorder, _, _) = CreateContext();
+
+ var result = await context.MapAsync(
+ Array.Empty(),
+ async (ctx, item, index, all) => { await Task.Yield(); return item; });
+
+ Assert.Equal(0, result.TotalCount);
+ Assert.Equal(CompletionReason.AllCompleted, result.CompletionReason);
+
+ await recorder.Batcher.DrainAsync();
+
+ // Even the empty case still flushes parent START + parent SUCCEED.
+ var contextActions = recorder.Flushed.Where(o => o.Type == "CONTEXT")
+ .Select(o => $"{o.SubType}:{o.Action}").ToArray();
+ Assert.Equal(new[] { "Map:START", "Map:SUCCEED" }, contextActions);
+ }
+
+ // ──────────────────────────────────────────────────────────────────────
+ // CompletionConfig — Map's permissive default vs fail-fast opt-in
+ // ──────────────────────────────────────────────────────────────────────
+
+ [Fact]
+ public async Task MapAsync_AllCompletedDefault_PartialFailureDoesNotThrow()
+ {
+ // Map's default CompletionConfig is AllCompleted() (permissive), unlike
+ // Parallel's AllSuccessful(). A single item failure is captured rather
+ // than thrown.
+ var (context, _, _, _) = CreateContext();
+
+ var result = await context.MapAsync(
+ new[] { 1, 2, 3 },
+ async (ctx, item, index, all) =>
+ {
+ await Task.Yield();
+ if (item == 2) throw new InvalidOperationException("oops");
+ return item;
+ });
+
+ Assert.True(result.HasFailure);
+ Assert.Equal(2, result.SuccessCount);
+ Assert.Equal(1, result.FailureCount);
+ Assert.Equal(CompletionReason.AllCompleted, result.CompletionReason);
+ Assert.Equal(new[] { 1, 3 }, result.GetResults());
+
+ var errors = result.GetErrors();
+ Assert.Single(errors);
+ Assert.Contains("oops", errors[0].Message);
+ }
+
+ [Fact]
+ public async Task MapAsync_AllSuccessfulOptIn_OneFailureThrowsMapException()
+ {
+ var (context, _, _, _) = CreateContext();
+
+ var ex = await Assert.ThrowsAsync(() =>
+ context.MapAsync(
+ new[] { 1, 2, 3 },
+ async (ctx, item, index, all) =>
+ {
+ await Task.Yield();
+ if (item == 2) throw new InvalidOperationException("item boom");
+ return item;
+ },
+ config: new MapConfig { CompletionConfig = CompletionConfig.AllSuccessful() }));
+
+ Assert.Equal(CompletionReason.FailureToleranceExceeded, ex.CompletionReason);
+ Assert.NotNull(ex.Result);
+ var typed = Assert.IsAssignableFrom>(ex.Result);
+ Assert.Equal(1, typed.FailureCount);
+ Assert.Equal(2, typed.SuccessCount);
+ }
+
+ [Fact]
+ public async Task MapAsync_ThrowIfError_ThrowsUnderPermissiveDefault()
+ {
+ // The permissive default does not auto-throw; ThrowIfError is the
+ // explicit strict-success check.
+ var (context, _, _, _) = CreateContext();
+
+ var result = await context.MapAsync(
+ new[] { 1, 2 },
+ async (ctx, item, index, all) =>
+ {
+ await Task.Yield();
+ if (item == 2) throw new InvalidOperationException("boom");
+ return item;
+ });
+
+ Assert.True(result.HasFailure);
+ var thrown = Assert.ThrowsAny(() => result.ThrowIfError());
+ Assert.Contains("boom", thrown.Message);
+ }
+
+ [Fact]
+ public async Task MapAsync_ToleratedFailureCount_ExceededThrows()
+ {
+ var (context, _, _, _) = CreateContext();
+
+ var ex = await Assert.ThrowsAsync(() =>
+ context.MapAsync(
+ new[] { 1, 2, 3 },
+ async (ctx, item, index, all) =>
+ {
+ await Task.Yield();
+ if (item != 3) throw new InvalidOperationException($"fail-{item}");
+ return item;
+ },
+ config: new MapConfig
+ {
+ CompletionConfig = new CompletionConfig { ToleratedFailureCount = 1 }
+ }));
+
+ Assert.Equal(CompletionReason.FailureToleranceExceeded, ex.CompletionReason);
+ }
+
+ // ──────────────────────────────────────────────────────────────────────
+ // CompletionConfig — first/min-successful short-circuit
+ // ──────────────────────────────────────────────────────────────────────
+
+ [Fact]
+ public async Task MapAsync_FirstSuccessful_ResolvesAfterFirstSuccess()
+ {
+ var (context, _, _, _) = CreateContext();
+
+ // MaxConcurrency = 1 so dispatch order is deterministic: item 0 fires
+ // first and succeeds; items 1 and 2 are never dispatched and remain
+ // BatchItemStatus.Started.
+ var result = await context.MapAsync(
+ new[] { 1, 2, 3 },
+ async (ctx, item, index, all) => { await Task.Yield(); return item; },
+ config: new MapConfig
+ {
+ MaxConcurrency = 1,
+ CompletionConfig = CompletionConfig.FirstSuccessful()
+ });
+
+ Assert.Equal(CompletionReason.MinSuccessfulReached, result.CompletionReason);
+ Assert.Equal(1, result.SuccessCount);
+ Assert.Equal(2, result.StartedCount);
+ Assert.Equal(0, result.FailureCount);
+ Assert.Equal(3, result.TotalCount);
+
+ Assert.Equal(BatchItemStatus.Succeeded, result.All[0].Status);
+ Assert.Equal(BatchItemStatus.Started, result.All[1].Status);
+ Assert.Equal(BatchItemStatus.Started, result.All[2].Status);
+ }
+
+ // ──────────────────────────────────────────────────────────────────────
+ // MaxConcurrency
+ // ──────────────────────────────────────────────────────────────────────
+
+ [Fact]
+ public async Task MapAsync_MaxConcurrency_LimitsInFlight()
+ {
+ var (context, _, _, _) = CreateContext();
+
+ var inFlight = 0;
+ var maxObserved = 0;
+ var lockObj = new object();
+
+ var result = await context.MapAsync(
+ new[] { 1, 2, 3, 4, 5 },
+ async (ctx, item, index, all) =>
+ {
+ lock (lockObj)
+ {
+ inFlight++;
+ if (inFlight > maxObserved) maxObserved = inFlight;
+ }
+ await Task.Delay(20);
+ lock (lockObj) inFlight--;
+ return item;
+ },
+ config: new MapConfig { MaxConcurrency = 2 });
+
+ Assert.Equal(5, result.SuccessCount);
+ Assert.True(maxObserved <= 2, $"Observed concurrency {maxObserved} exceeded MaxConcurrency = 2");
+ }
+
+ [Fact]
+ public async Task MapAsync_MaxConcurrencyAtLeastItemCount_RunsWithoutSemaphore()
+ {
+ // MaxConcurrency >= item count exercises the no-semaphore optimization
+ // path; behavior must be identical (all items still run).
+ var (context, _, _, _) = CreateContext();
+
+ var result = await context.MapAsync(
+ new[] { 1, 2, 3 },
+ async (ctx, item, index, all) => { await Task.Yield(); return item; },
+ config: new MapConfig { MaxConcurrency = 10 });
+
+ Assert.Equal(3, result.SuccessCount);
+ Assert.Equal(new[] { 1, 2, 3 }, result.GetResults());
+ }
+
+ [Fact]
+ public void MapConfig_MaxConcurrency_OutOfRange_Throws()
+ {
+ var config = new MapConfig();
+ Assert.Throws(() => config.MaxConcurrency = 0);
+ Assert.Throws(() => config.MaxConcurrency = -1);
+ config.MaxConcurrency = 1;
+ config.MaxConcurrency = null;
+ }
+
+ [Fact]
+ public void MapConfig_DefaultCompletionConfig_IsAllCompleted()
+ {
+ // Guards the intentional divergence from ParallelConfig (AllSuccessful).
+ var config = new MapConfig();
+ // AllCompleted() == empty CompletionConfig (no failure thresholds).
+ Assert.Null(config.CompletionConfig.ToleratedFailureCount);
+ Assert.Null(config.CompletionConfig.MinSuccessful);
+ Assert.Null(config.CompletionConfig.ToleratedFailurePercentage);
+ }
+
+ // ──────────────────────────────────────────────────────────────────────
+ // NestingType
+ // ──────────────────────────────────────────────────────────────────────
+
+ [Fact]
+ public async Task MapAsync_NestingTypeFlat_ThrowsNotSupported()
+ {
+ var (context, _, _, _) = CreateContext();
+
+ await Assert.ThrowsAsync(() =>
+ context.MapAsync(
+ new[] { 1 },
+ async (ctx, item, index, all) => { await Task.Yield(); return item; },
+ config: new MapConfig { NestingType = NestingType.Flat }));
+ }
+
+ // ──────────────────────────────────────────────────────────────────────
+ // Argument validation
+ // ──────────────────────────────────────────────────────────────────────
+
+ [Fact]
+ public async Task MapAsync_NullItems_Throws()
+ {
+ var (context, _, _, _) = CreateContext();
+
+ await Assert.ThrowsAsync(() =>
+ context.MapAsync(
+ null!,
+ async (ctx, item, index, all) => { await Task.Yield(); return item; }));
+ }
+
+ [Fact]
+ public async Task MapAsync_NullFunc_Throws()
+ {
+ var (context, _, _, _) = CreateContext();
+
+ await Assert.ThrowsAsync(() =>
+ context.MapAsync(new[] { 1 }, (Func, Task>)null!));
+ }
+
+ // ──────────────────────────────────────────────────────────────────────
+ // Replay
+ // ──────────────────────────────────────────────────────────────────────
+
+ [Fact]
+ public async Task MapAsync_ReplaySucceeded_RebuildsResultFromCheckpoints()
+ {
+ var parentOpId = IdAt(1);
+ var i0 = ChildIdAt(parentOpId, 1);
+ var i1 = ChildIdAt(parentOpId, 2);
+
+ var summaryJson = """
+ {"CompletionReason":"ALL_COMPLETED","Units":[
+ {"Index":0,"Name":"0","Status":"SUCCEEDED"},
+ {"Index":1,"Name":"1","Status":"SUCCEEDED"}
+ ]}
+ """;
+
+ var (context, recorder, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = parentOpId,
+ Type = OperationTypes.Context,
+ Status = OperationStatuses.Succeeded,
+ SubType = OperationSubTypes.Map,
+ Name = "double_all",
+ ContextDetails = new ContextDetails { Result = summaryJson }
+ },
+ new()
+ {
+ Id = i0,
+ Type = OperationTypes.Context,
+ Status = OperationStatuses.Succeeded,
+ SubType = OperationSubTypes.MapItem,
+ Name = "0",
+ ContextDetails = new ContextDetails { Result = "100" }
+ },
+ new()
+ {
+ Id = i1,
+ Type = OperationTypes.Context,
+ Status = OperationStatuses.Succeeded,
+ SubType = OperationSubTypes.MapItem,
+ Name = "1",
+ ContextDetails = new ContextDetails { Result = "200" }
+ }
+ }
+ });
+
+ var calls = 0;
+ var result = await context.MapAsync(
+ new[] { 1, 2 },
+ async (ctx, item, index, all) => { calls++; await Task.Yield(); return 999; },
+ name: "double_all");
+
+ // Cached results returned without re-executing the callback.
+ Assert.Equal(0, calls);
+ Assert.Equal(2, result.SuccessCount);
+ Assert.Equal(new[] { 100, 200 }, result.GetResults());
+
+ await recorder.Batcher.DrainAsync();
+ Assert.Empty(recorder.Flushed);
+ }
+
+ [Fact]
+ public async Task MapAsync_ReplayMixedStatus_PreservesStartedShortCircuited()
+ {
+ var parentOpId = IdAt(1);
+ var i0 = ChildIdAt(parentOpId, 1);
+ var i1 = ChildIdAt(parentOpId, 2);
+
+ var summaryJson = """
+ {"CompletionReason":"MIN_SUCCESSFUL_REACHED","Units":[
+ {"Index":0,"Name":"0","Status":"SUCCEEDED"},
+ {"Index":1,"Name":"1","Status":"SUCCEEDED"},
+ {"Index":2,"Name":"2","Status":"STARTED"}
+ ]}
+ """;
+
+ var (context, recorder, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = parentOpId,
+ Type = OperationTypes.Context,
+ Status = OperationStatuses.Succeeded,
+ SubType = OperationSubTypes.Map,
+ Name = "m",
+ ContextDetails = new ContextDetails { Result = summaryJson }
+ },
+ new()
+ {
+ Id = i0,
+ Type = OperationTypes.Context,
+ Status = OperationStatuses.Succeeded,
+ SubType = OperationSubTypes.MapItem,
+ Name = "0",
+ ContextDetails = new ContextDetails { Result = "10" }
+ },
+ new()
+ {
+ Id = i1,
+ Type = OperationTypes.Context,
+ Status = OperationStatuses.Succeeded,
+ SubType = OperationSubTypes.MapItem,
+ Name = "1",
+ ContextDetails = new ContextDetails { Result = "20" }
+ }
+ // Item 2 has no checkpoint at all — it was never dispatched.
+ }
+ });
+
+ var calls = 0;
+ var result = await context.MapAsync(
+ new[] { 1, 2, 3 },
+ async (ctx, item, index, all) => { calls++; await Task.Yield(); return 999; },
+ name: "m");
+
+ Assert.Equal(0, calls);
+ Assert.Equal(CompletionReason.MinSuccessfulReached, result.CompletionReason);
+ Assert.Equal(2, result.SuccessCount);
+ Assert.Equal(1, result.StartedCount);
+ Assert.Equal(BatchItemStatus.Succeeded, result.All[0].Status);
+ Assert.Equal(BatchItemStatus.Succeeded, result.All[1].Status);
+ Assert.Equal(BatchItemStatus.Started, result.All[2].Status);
+ Assert.Equal(new[] { 10, 20 }, result.GetResults());
+
+ await recorder.Batcher.DrainAsync();
+ Assert.Empty(recorder.Flushed);
+ }
+
+ [Fact]
+ public async Task MapAsync_ReplayFailed_RebuildsResultAndThrows()
+ {
+ var parentOpId = IdAt(1);
+ var i0 = ChildIdAt(parentOpId, 1);
+
+ var summaryJson = """
+ {"CompletionReason":"FAILURE_TOLERANCE_EXCEEDED","Units":[
+ {"Index":0,"Name":"0","Status":"FAILED"}
+ ]}
+ """;
+
+ var (context, _, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = parentOpId,
+ Type = OperationTypes.Context,
+ Status = OperationStatuses.Failed,
+ SubType = OperationSubTypes.Map,
+ Name = "m",
+ ContextDetails = new ContextDetails { Result = summaryJson }
+ },
+ new()
+ {
+ Id = i0,
+ Type = OperationTypes.Context,
+ Status = OperationStatuses.Failed,
+ SubType = OperationSubTypes.MapItem,
+ Name = "0",
+ ContextDetails = new ContextDetails
+ {
+ Error = new ErrorObject { ErrorMessage = "stored failure", ErrorType = "System.InvalidOperationException" }
+ }
+ }
+ }
+ });
+
+ var ex = await Assert.ThrowsAsync(() =>
+ context.MapAsync(
+ new[] { 1 },
+ async (ctx, item, index, all) => { await Task.Yield(); return 999; },
+ name: "m"));
+
+ Assert.Equal(CompletionReason.FailureToleranceExceeded, ex.CompletionReason);
+ var typed = Assert.IsAssignableFrom>(ex.Result);
+ Assert.Equal(1, typed.FailureCount);
+ }
+
+ [Fact]
+ public async Task MapAsync_ReplayWithDriftedItemName_ThrowsNonDeterministic()
+ {
+ // A checkpointed item name that differs from the current ItemNamer output
+ // indicates the item set was reordered/renamed between deployments.
+ var parentOpId = IdAt(1);
+ var i0 = ChildIdAt(parentOpId, 1);
+
+ var summaryJson = """
+ {"CompletionReason":"ALL_COMPLETED","Units":[
+ {"Index":0,"Name":"alpha","Status":"SUCCEEDED"}
+ ]}
+ """;
+
+ var (context, _, _, _) = CreateContext(new InitialExecutionState
+ {
+ Operations = new List
+ {
+ new()
+ {
+ Id = parentOpId,
+ Type = OperationTypes.Context,
+ Status = OperationStatuses.Succeeded,
+ SubType = OperationSubTypes.Map,
+ Name = "m",
+ ContextDetails = new ContextDetails { Result = summaryJson }
+ },
+ new()
+ {
+ Id = i0,
+ Type = OperationTypes.Context,
+ Status = OperationStatuses.Succeeded,
+ SubType = OperationSubTypes.MapItem,
+ Name = "alpha",
+ ContextDetails = new ContextDetails { Result = "10" }
+ }
+ }
+ });
+
+ await Assert.ThrowsAsync(() =>
+ context.MapAsync(
+ new[] { 1 },
+ async (ctx, item, index, all) => { await Task.Yield(); return 999; },
+ name: "m",
+ // Namer now yields "renamed" instead of the checkpointed "alpha".
+ config: new MapConfig { ItemNamer = (item, index) => "renamed" }));
+ }
+
+ // ──────────────────────────────────────────────────────────────────────
+ // Replay determinism
+ // ──────────────────────────────────────────────────────────────────────
+
+ [Fact]
+ public async Task MapAsync_TwoFreshRuns_ProduceIdenticalItemOperationIds()
+ {
+ // Item operation IDs are derived from the parent op ID + index, so two
+ // independent fresh runs of the same workflow shape must emit the same
+ // child IDs (the foundation of replay correctness).
+ string[] IdsFromRun()
+ {
+ var (context, recorder, _, _) = CreateContext();
+ context.MapAsync(
+ new[] { 1, 2, 3 },
+ async (ctx, item, index, all) => { await Task.Yield(); return item; }).GetAwaiter().GetResult();
+ recorder.Batcher.DrainAsync().GetAwaiter().GetResult();
+ return recorder.Flushed
+ .Where(o => o.Type == "CONTEXT" && o.SubType == "MapItem" && o.Action == "START")
+ .Select(o => o.Id)
+ .OrderBy(id => id)
+ .ToArray();
+ }
+
+ var first = IdsFromRun();
+ var second = IdsFromRun();
+
+ Assert.Equal(3, first.Length);
+ Assert.Equal(first, second);
+ }
+}
diff --git a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ParallelOperationTests.cs b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ParallelOperationTests.cs
index 941d62fe9..b41983ba7 100644
--- a/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ParallelOperationTests.cs
+++ b/Libraries/test/Amazon.Lambda.DurableExecution.Tests/ParallelOperationTests.cs
@@ -574,7 +574,7 @@ public async Task ParallelAsync_ReplaySucceeded_RebuildsResultFromCheckpoints()
var b1 = ChildIdAt(parentOpId, 2);
var summaryJson = """
- {"CompletionReason":"ALL_COMPLETED","Branches":[
+ {"CompletionReason":"ALL_COMPLETED","Units":[
{"Index":0,"Name":"0","Status":"SUCCEEDED","OperationId":"placeholder0"},
{"Index":1,"Name":"1","Status":"SUCCEEDED","OperationId":"placeholder1"}
]}
@@ -639,7 +639,7 @@ public async Task ParallelAsync_ReplayFailed_ThrowsParallelException()
var b1 = ChildIdAt(parentOpId, 2);
var summaryJson = """
- {"CompletionReason":"FAILURE_TOLERANCE_EXCEEDED","Branches":[
+ {"CompletionReason":"FAILURE_TOLERANCE_EXCEEDED","Units":[
{"Index":0,"Name":"0","Status":"FAILED","OperationId":"placeholder0"},
{"Index":1,"Name":"1","Status":"FAILED","OperationId":"placeholder1"}
]}
@@ -1101,7 +1101,7 @@ public async Task ParallelAsync_ReplayMixedStatus_PreservesStartedShortCircuited
var b1 = ChildIdAt(parentOpId, 2);
var summaryJson = """
- {"CompletionReason":"MIN_SUCCESSFUL_REACHED","Branches":[
+ {"CompletionReason":"MIN_SUCCESSFUL_REACHED","Units":[
{"Index":0,"Name":"0","Status":"SUCCEEDED"},
{"Index":1,"Name":"1","Status":"SUCCEEDED"},
{"Index":2,"Name":"2","Status":"STARTED"}
@@ -1254,7 +1254,7 @@ public async Task ParallelAsync_ReplayUsesCheckpointedBranchName_NotCurrentName(
var b0 = ChildIdAt(parentOpId, 1);
var summaryJson = """
- {"CompletionReason":"MIN_SUCCESSFUL_REACHED","Branches":[
+ {"CompletionReason":"MIN_SUCCESSFUL_REACHED","Units":[
{"Index":0,"Name":"alpha","Status":"SUCCEEDED"},
{"Index":1,"Name":"beta","Status":"STARTED"}
]}
@@ -1308,7 +1308,7 @@ public async Task ParallelAsync_ReplayWithDriftedBranchName_ThrowsNonDeterminist
var b0 = ChildIdAt(parentOpId, 1);
var summaryJson = """
- {"CompletionReason":"ALL_COMPLETED","Branches":[
+ {"CompletionReason":"ALL_COMPLETED","Units":[
{"Index":0,"Name":"alpha","Status":"SUCCEEDED"}
]}
""";