diff --git a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
index 5045991f3542..7e2ef0f18fb4 100644
--- a/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
+++ b/sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
@@ -8,6 +8,7 @@
#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
+* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)
#### Other Changes
diff --git a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
index 777177708547..48c15798f4ae 100644
--- a/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
+++ b/sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
@@ -8,6 +8,7 @@
#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
+* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)
#### Other Changes
diff --git a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
index 1360614308a3..85d6f36f2b1b 100644
--- a/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
+++ b/sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
@@ -8,6 +8,7 @@
#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
+* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)
#### Other Changes
diff --git a/sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md
index 6674b6f8bb74..e7561d05958e 100644
--- a/sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md
+++ b/sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md
@@ -8,6 +8,7 @@
#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
+* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)
#### Other Changes
diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala
index 5d83a5139ef2..f35b580a0fa7 100644
--- a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala
+++ b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ChangeFeedPartitionReader.scala
@@ -214,6 +214,12 @@ private case class ChangeFeedPartitionReader
.setEndLSN(options, this.partition.endLsn.get)
}
+ // Bubble empty pages up to the iterator so the per-page end-to-end timeout
+ // applies to each individual page rather than being exceeded by serial
+ // empty-page drains inside ChangeFeedFetcher.
+ ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor
+ .setAllowNotModifiedPages(options, true)
+
options.setCustomItemSerializer(itemDeserializer)
}
diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala
index 44027bafbe7e..7739485bbc91 100644
--- a/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala
+++ b/sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/ItemsPartitionReader.scala
@@ -45,6 +45,14 @@ private case class ItemsPartitionReader
.getCosmosQueryRequestOptionsAccessor
.disallowQueryPlanRetrieval(new CosmosQueryRequestOptions())
+ // Bubble empty pages up to the iterator so the per-page end-to-end timeout
+ // applies to each individual page rather than being exceeded by serial
+ // empty-page drains inside ParallelDocumentQueryExecutionContext.
+ ImplementationBridgeHelpers
+ .CosmosQueryRequestOptionsHelper
+ .getCosmosQueryRequestOptionsAccessor
+ .setAllowEmptyPages(queryOptions, true)
+
private val readConfig = CosmosReadConfig.parseCosmosReadConfig(config)
ThroughputControlHelper.populateThroughputControlGroupName(
ImplementationBridgeHelpers
diff --git a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala
index b8400fdd3eff..9185e7529d35 100644
--- a/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala
+++ b/sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransientIOErrorsRetryingIteratorSpec.scala
@@ -180,6 +180,68 @@ class TransientIOErrorsRetryingIteratorSpec extends UnitSpec with BasicLoggingTr
factoryCallCount.get shouldEqual 1
}
+ "TransientIOErrors" should "drain long runs of empty pages without hitting the end-to-end timeout" in {
+ // Regression test for the empty-page drain scenario: when the SDK is configured with
+ // emptyPagesAllowed=true the iterator must surface many consecutive empty
+ // pages without busy-waiting beyond the per-page end-to-end timeout. Even
+ // with hundreds of empty pages followed by real data, the iterator should
+ // return all real rows.
+ val emptyLeadingPages = 200
+ val realPages = 5
+ val totalPages = emptyLeadingPages + realPages
+ val iterator = new TransientIOErrorsRetryingIterator(
+ continuationToken => generateMockedCosmosPagedFluxWithEmptyPrefix(
+ continuationToken, totalPages, emptyLeadingPages),
+ pageSize,
+ 1,
+ None,
+ None
+ )
+ iterator.maxRetryIntervalInMs = 5
+
+ // 2 producers (Left/Right) each emit realPages * pageSize rows
+ iterator.count(_ => true) shouldEqual (realPages * pageSize * 2)
+ }
+
+ private def generateMockedCosmosPagedFluxWithEmptyPrefix
+ (
+ continuationToken: String,
+ initialPageCount: Int,
+ leadingEmptyPageCount: Int
+ ) = {
+
+ val leftProducer = generateFeedResponseFluxWithEmptyPrefix(
+ "Left", initialPageCount, leadingEmptyPageCount, Option.apply(continuationToken))
+ val rightProducer = generateFeedResponseFluxWithEmptyPrefix(
+ "Right", initialPageCount, leadingEmptyPageCount, Option.apply(continuationToken))
+ val toBeMerged = Array(leftProducer, rightProducer).toIterable.asJava
+ val mergedFlux = Flux.mergeSequential(toBeMerged, 1, 2)
+ UtilBridgeInternal.createCosmosPagedFlux(_ => mergedFlux)
+ }
+
+ private def generateFeedResponseFluxWithEmptyPrefix
+ (
+ prefix: String,
+ pageCount: Int,
+ leadingEmptyPageCount: Int,
+ requestContinuationToken: Option[String]
+ ): Flux[FeedResponse[SparkRowItem]] = {
+
+ // generateFeedResponse uses documentStartIndex=-1 as the "emit an empty page" sentinel.
+ val emptyPageSentinel = -1
+ val firstDataPageStartIndex = 1
+
+ val responses = Array.range(1, pageCount + 1)
+ .map(i => generateFeedResponse(
+ prefix,
+ i,
+ if (i <= leadingEmptyPageCount) emptyPageSentinel else firstDataPageStartIndex))
+ .filter(response => requestContinuationToken.isEmpty ||
+ requestContinuationToken.get < response.getContinuationToken)
+
+ Flux.fromArray(responses)
+ }
+
private val objectMapper = new ObjectMapper
@throws[JsonProcessingException]
diff --git a/sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md
index 9b87a3fcf675..32117a8d7d32 100644
--- a/sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md
+++ b/sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md
@@ -8,6 +8,7 @@
#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
+* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)
#### Other Changes
diff --git a/sdk/cosmos/azure-cosmos-spark_4-1_2-13/CHANGELOG.md b/sdk/cosmos/azure-cosmos-spark_4-1_2-13/CHANGELOG.md
index 570aec149b2c..d1afe6f098ff 100644
--- a/sdk/cosmos/azure-cosmos-spark_4-1_2-13/CHANGELOG.md
+++ b/sdk/cosmos/azure-cosmos-spark_4-1_2-13/CHANGELOG.md
@@ -8,6 +8,7 @@
#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
+* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)
#### Other Changes
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java
index 7c8418ed8992..29355ae002c3 100644
--- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosContainerChangeFeedTest.java
@@ -332,8 +332,14 @@ public void asyncChangeFeed_fromBeginning_incremental_forLogicalPartition() thro
}
}
- @Test(groups = { "emulator" }, dataProvider = "changeFeedQueryPrefetchingDataProvider", timeOut = TIMEOUT)
+ @Test(groups = { "emulator" }, dataProvider = "changeFeedQueryPrefetchingDataProvider",
+ timeOut = TIMEOUT, retryAnalyzer = FlakyTestRetryAnalyzer.class)
public void asyncChangeFeedPrefetching(ChangeFeedMode changeFeedMode) throws Exception {
+ // Note on shape: this test verifies Reactor's prefetch behavior on the change-feed
+ // byPage stream. The two fire-and-forget `.subscribe()` calls + `Thread.sleep(3000)`
+ // are intentional — they exercise the prefetch path without backpressure-bounded
+ // collection. retryAnalyzer = FlakyTestRetryAnalyzer absorbs occasional slow-runner
+ // jitter (Windows EmulatorTcp Java 8 can take >3s to deliver the first 3 pages).
this.createContainer(
(cp) -> {
if (changeFeedMode.equals(ChangeFeedMode.INCREMENTAL)) {
@@ -1097,6 +1103,78 @@ public void changeFeedQueryCompleteAfterAvailableNow(
}
}
+ @Test(groups = { "emulator" }, timeOut = TIMEOUT * 5)
+ public void changeFeedQuery_notModifiedPagesAllowed_surfacesNoChangesPagesAndTerminates() {
+ // End-to-end guard: when the SDK is opted into notModifiedPagesAllowed=true
+ // (via the friend-API bridge accessor — the same opt-in the Cosmos Spark connector uses),
+ // change-feed reads against a multi-partition container must:
+ // (a) surface 304/noChanges pages individually to the caller, AND
+ // (b) terminate via the FeedRangeCompositeContinuationImpl >4*(size+1) consecutive-304
+ // defense rather than poll indefinitely.
+ //
+ // This is the integration-level pin for the contract that ChangeFeedFetcher.nextPageInternal
+ // branch 2 explicitly calls disableShouldFetchMore() on NO_RETRY noChanges. Without that
+ // arm, a caller that drained the flux to completion would hang.
+ String testContainerId = UUID.randomUUID().toString();
+ try {
+ CosmosContainerProperties containerProperties = new CosmosContainerProperties(testContainerId, "/mypk");
+ CosmosAsyncContainer testContainer =
+ createCollection(
+ this.createdAsyncDatabase,
+ containerProperties,
+ new CosmosContainerRequestOptions(),
+ // throughput high enough to provision multiple physical partitions
+ 11000);
+
+ // Sparse workload: a few docs spread across partitions; most physical partitions
+ // will return 304 / noChanges on read, exercising the empty-page surfacing path.
+ insertDocuments(/* partitionCount */ 3, /* documentCount */ 2, testContainer);
+
+ CosmosChangeFeedRequestOptions options =
+ CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange());
+ ImplementationBridgeHelpers
+ .CosmosChangeFeedRequestOptionsHelper
+ .getCosmosChangeFeedRequestOptionsAccessor()
+ .setAllowNotModifiedPages(options, true);
+
+ AtomicInteger totalPagesObserved = new AtomicInteger(0);
+ AtomicInteger totalDocsObserved = new AtomicInteger(0);
+ AtomicInteger notModifiedPagesObserved = new AtomicInteger(0);
+
+ // Drain a bounded slice of the change feed - the iteration must terminate within
+ // a reasonable page count via the SDK's consecutive-304 defense.
+ testContainer.queryChangeFeed(options, JsonNode.class)
+ .byPage(1)
+ .take(100)
+ .doOnNext(response -> {
+ totalPagesObserved.incrementAndGet();
+ int pageSize = response.getResults().size();
+ totalDocsObserved.addAndGet(pageSize);
+ if (pageSize == 0) {
+ notModifiedPagesObserved.incrementAndGet();
+ }
+ })
+ .blockLast();
+
+ // (a) at least some empty pages must have surfaced - the whole point of the opt-in
+ assertThat(notModifiedPagesObserved.get())
+ .describedAs("notModifiedPagesAllowed=true must surface 304/noChanges pages individually")
+ .isGreaterThan(0);
+ // (b) all inserted docs must be observed - empty-page surfacing must not interfere
+ // with data-page emission
+ assertThat(totalDocsObserved.get())
+ .describedAs("all inserted documents must surface")
+ .isEqualTo(6);
+ // (c) iteration must have terminated (we didn't hit the take(100) cap, otherwise
+ // we'd be polling indefinitely - that's the regression the defense-in-depth arm prevents)
+ assertThat(totalPagesObserved.get())
+ .describedAs("iteration must terminate via consecutive-304 defense, not hit the take(100) cap")
+ .isLessThan(100);
+ } finally {
+ safeDeleteCollection(this.createdAsyncDatabase.getContainer(testContainerId));
+ }
+ }
+
void insertDocuments(
int partitionCount,
int documentCount) {
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest.java
new file mode 100644
index 000000000000..818c16c6277b
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest.java
@@ -0,0 +1,184 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.implementation;
+
+import com.azure.cosmos.CosmosItemSerializer;
+import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
+import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStartFromInternal;
+import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
+import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
+import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
+import com.azure.cosmos.models.ModelBridgeInternal;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit tests for the paged-flux pull continuation path on
+ * {@link CosmosChangeFeedRequestOptions#withCosmosPagedFluxOptions(CosmosPagedFluxOptions)} (package-visible via
+ * {@link ModelBridgeInternal#getEffectiveChangeFeedRequestOptions(CosmosChangeFeedRequestOptions, CosmosPagedFluxOptions)}).
+ *
+ *
That method silently builds a brand-new {@code CosmosChangeFeedRequestOptionsImpl} when the caller supplies a
+ * continuation token via {@link CosmosPagedFluxOptions}, so any field NOT explicitly copied is dropped. These tests
+ * lock in the propagation of fields whose loss would silently break a feature.
+ */
+public class CosmosChangeFeedRequestOptionsWithPagedFluxOptionsTest {
+
+ @Test(groups = { "unit" })
+ public void notModifiedPagesAllowed_isPropagated_whenContinuationTokenSupplied() {
+ // GIVEN a CosmosChangeFeedRequestOptions with notModifiedPagesAllowed=true (the value the Spark connector sets)
+ CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions
+ .createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange());
+ ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper
+ .getCosmosChangeFeedRequestOptionsAccessor()
+ .setAllowNotModifiedPages(src, true);
+
+ // AND a continuation token supplied via the paged-flux pull mechanism
+ CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions();
+ pagedFluxOptions.setRequestContinuation(buildContinuationToken());
+
+ // WHEN computing the effective options
+ CosmosChangeFeedRequestOptions effective = ModelBridgeInternal
+ .getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions);
+
+ // THEN notModifiedPagesAllowed must be preserved on the freshly-built impl
+ assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper
+ .getCosmosChangeFeedRequestOptionsAccessor()
+ .getAllowNotModifiedPages(effective))
+ .describedAs("notModifiedPagesAllowed must survive the paged-flux pull continuation rebuild")
+ .isTrue();
+ }
+
+ @Test(groups = { "unit" })
+ public void notModifiedPagesAllowedFalse_isPropagated_whenContinuationTokenSupplied() {
+ // The default value should also round-trip cleanly (sanity check that we're not just hard-coding true).
+ CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions
+ .createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange());
+
+ CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions();
+ pagedFluxOptions.setRequestContinuation(buildContinuationToken());
+
+ CosmosChangeFeedRequestOptions effective = ModelBridgeInternal
+ .getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions);
+
+ assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper
+ .getCosmosChangeFeedRequestOptionsAccessor()
+ .getAllowNotModifiedPages(effective))
+ .describedAs("notModifiedPagesAllowed default (false) must survive the paged-flux pull continuation rebuild")
+ .isFalse();
+ }
+
+ @Test(groups = { "unit" })
+ public void notModifiedPagesAllowed_isPreserved_whenNoContinuationTokenSupplied() {
+ // No continuation → withCosmosPagedFluxOptions returns `this` unchanged.
+ CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions
+ .createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange());
+ ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper
+ .getCosmosChangeFeedRequestOptionsAccessor()
+ .setAllowNotModifiedPages(src, true);
+
+ CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions();
+ pagedFluxOptions.setMaxItemCount(50);
+
+ CosmosChangeFeedRequestOptions effective = ModelBridgeInternal
+ .getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions);
+
+ assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper
+ .getCosmosChangeFeedRequestOptionsAccessor()
+ .getAllowNotModifiedPages(effective))
+ .isTrue();
+ }
+
+ @Test(groups = { "unit" })
+ public void endLSN_isPropagated_whenContinuationTokenSupplied() {
+ // Locks in the bounded-change-feed contract across a byPage(savedContinuation) round-trip:
+ // a caller who set endLSN=42 must continue to see iteration bounded by LSN 42 after resume.
+ // Before the inheritNonContinuationFieldsFrom fix, endLSN was silently dropped on the rebuild
+ // path, turning a bounded change feed into an unbounded one.
+ CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions
+ .createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange());
+ ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper
+ .getCosmosChangeFeedRequestOptionsAccessor()
+ .setEndLSN(src, 42L);
+
+ CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions();
+ pagedFluxOptions.setRequestContinuation(buildContinuationToken());
+
+ CosmosChangeFeedRequestOptions effective = ModelBridgeInternal
+ .getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions);
+
+ assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper
+ .getCosmosChangeFeedRequestOptionsAccessor()
+ .getEndLSN(effective))
+ .describedAs("endLSN must survive the paged-flux pull continuation rebuild")
+ .isEqualTo(42L);
+ }
+
+ @Test(groups = { "unit" })
+ public void customSerializer_isPropagated_whenContinuationTokenSupplied() {
+ // Locks in custom-serializer preservation across a byPage(savedContinuation) round-trip:
+ // a caller who registered a custom CosmosItemSerializer must continue to see items
+ // deserialized through that serializer after resume. Before the inheritNonContinuationFieldsFrom
+ // fix, the customSerializer was silently dropped on the rebuild path, falling back to the
+ // SDK's internal default serializer and potentially producing wrong field values.
+ CosmosItemSerializer sentinel = new CosmosItemSerializer() {
+ @Override
+ public java.util.Map serialize(T item) { return null; }
+
+ @Override
+ public T deserialize(java.util.Map jsonNodeMap, Class classType) { return null; }
+ };
+ CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions
+ .createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange());
+ src.setCustomItemSerializer(sentinel);
+
+ CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions();
+ pagedFluxOptions.setRequestContinuation(buildContinuationToken());
+
+ CosmosChangeFeedRequestOptions effective = ModelBridgeInternal
+ .getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions);
+
+ assertThat(effective.getCustomItemSerializer())
+ .describedAs("customSerializer must survive the paged-flux pull continuation rebuild")
+ .isSameAs(sentinel);
+ }
+
+ @Test(groups = { "unit" })
+ public void tokenEncodedFields_overrideCallerSuppliedValues_whenContinuationTokenSupplied() {
+ // Negative pin: the four token-encoded fields (continuationState, feedRangeInternal, mode,
+ // startFromInternal) MUST come from the token, not from the caller's pre-resume options.
+ // The caller's options here have continuationState=null (createForProcessingFromBeginning),
+ // but the resulting effective options must have a non-null continuationState parsed from
+ // the supplied token. If a future refactor accidentally inherits the token-encoded fields
+ // from the source impl (e.g. moving them into inheritNonContinuationFieldsFrom), this test
+ // catches the regression because the source's continuationState would clobber the token's.
+ CosmosChangeFeedRequestOptions src = CosmosChangeFeedRequestOptions
+ .createForProcessingFromBeginning(FeedRangeEpkImpl.forFullRange());
+
+ CosmosPagedFluxOptions pagedFluxOptions = new CosmosPagedFluxOptions();
+ pagedFluxOptions.setRequestContinuation(buildContinuationToken());
+
+ CosmosChangeFeedRequestOptions effective = ModelBridgeInternal
+ .getEffectiveChangeFeedRequestOptions(src, pagedFluxOptions);
+
+ assertThat(ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper
+ .getCosmosChangeFeedRequestOptionsAccessor()
+ .getImpl(effective)
+ .getContinuation())
+ .describedAs("continuationState is encoded in the token and MUST override the caller's pre-resume value")
+ .isNotNull();
+ }
+
+ private static String buildContinuationToken() {
+ // Build a real ChangeFeedState so we can serialize a valid (base64-encoded) continuation token.
+ // We use the state's own toString() which round-trips through createForProcessingFromContinuation.
+ ChangeFeedStateV1 state = new ChangeFeedStateV1(
+ "someContainerRid",
+ FeedRangeEpkImpl.forFullRange(),
+ ChangeFeedMode.INCREMENTAL,
+ ChangeFeedStartFromInternal.createFromBeginning(),
+ null);
+ return state.toString();
+ }
+}
diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherNotModifiedPagesTest.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherNotModifiedPagesTest.java
new file mode 100644
index 000000000000..b917f55864a7
--- /dev/null
+++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/query/ChangeFeedFetcherNotModifiedPagesTest.java
@@ -0,0 +1,456 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.cosmos.implementation.query;
+
+import com.azure.cosmos.implementation.Document;
+import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
+import com.azure.cosmos.implementation.DocumentServiceRequestContext;
+import com.azure.cosmos.implementation.GlobalEndpointManager;
+import com.azure.cosmos.implementation.IRetryPolicyFactory;
+import com.azure.cosmos.implementation.OperationType;
+import com.azure.cosmos.implementation.ResourceType;
+import com.azure.cosmos.implementation.RxDocumentClientImpl;
+import com.azure.cosmos.implementation.RxDocumentServiceRequest;
+import com.azure.cosmos.implementation.ShouldRetryResult;
+import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
+import com.azure.cosmos.implementation.faultinjection.FaultInjectionRequestContext;
+import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
+import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
+import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
+import com.azure.cosmos.implementation.perPartitionCircuitBreaker.GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker;
+import com.azure.cosmos.models.FeedResponse;
+import com.azure.cosmos.models.ModelBridgeInternal;
+import org.mockito.Mockito;
+import org.testng.annotations.Test;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link ChangeFeedFetcher} covering the {@code notModifiedPagesAllowed} behavior change.
+ *
+ * The two behaviors locked in here are:
+ *
+ * - {@code isFullyDrained} consults only the continuation (no {@code noChanges} short-circuit),
+ * which is what allows {@code notModifiedPagesAllowed=true} to surface empty pages without first
+ * having to call {@code reEnableShouldFetchMoreForRetry()} to undo a base-class decision.
+ * - When {@code notModifiedPagesAllowed=true}, {@code nextPageInternal} returns
+ * {@code Mono.just(noChangesResponse)} instead of {@code Mono.empty()} so the empty pages
+ * bubble up to the caller (Spark connector) where the per-page end-to-end timeout applies to
+ * each individual page rather than being exceeded by serial empty-page drains.
+ *
+ */
+public class ChangeFeedFetcherNotModifiedPagesTest {
+
+ @Test(groups = { "unit" })
+ public void isFullyDrained_noChangesResponseWithNotModifiedPagesAllowedTrue_returnsTrue() {
+ // GIVEN a ChangeFeedFetcher with notModifiedPagesAllowed=true and a noChanges response
+ // whose continuation reports !isDone()
+ FeedRangeContinuation continuation = mock(FeedRangeContinuation.class);
+ when(continuation.isDone()).thenReturn(false);
+ ChangeFeedFetcher fetcher = newFetcher(continuation, /* notModifiedPagesAllowed */ true);
+
+ FeedResponse noChangesResponse = changeFeedNoChanges("token-A");
+
+ // WHEN
+ boolean drained = invokeIsFullyDrained(fetcher, noChangesResponse);
+
+ // THEN: isFullyDrained short-circuits on noChanges regardless of the
+ // notModifiedPagesAllowed flag. With notModifiedPagesAllowed=true the bottom of
+ // nextPageInternal undoes the side effect via reEnableShouldFetchMoreForRetry()
+ // so the iteration continues; with notModifiedPagesAllowed=false the
+ // shouldFetchMore=false sticks and the iteration terminates.
+ assertThat(drained).isTrue();
+ }
+
+ @Test(groups = { "unit" })
+ public void isFullyDrained_noChangesResponseWithFinishedContinuation_returnsTrue() {
+ FeedRangeContinuation continuation = mock(FeedRangeContinuation.class);
+ when(continuation.isDone()).thenReturn(true);
+ ChangeFeedFetcher fetcher = newFetcher(continuation, /* notModifiedPagesAllowed */ true);
+
+ FeedResponse noChangesResponse = changeFeedNoChanges("token-B");
+
+ assertThat(invokeIsFullyDrained(fetcher, noChangesResponse)).isTrue();
+ }
+
+ @Test(groups = { "unit" })
+ public void isFullyDrained_realResponseWithFinishedContinuation_returnsTrue() {
+ FeedRangeContinuation continuation = mock(FeedRangeContinuation.class);
+ when(continuation.isDone()).thenReturn(true);
+ ChangeFeedFetcher fetcher = newFetcher(continuation, /* notModifiedPagesAllowed */ false);
+
+ FeedResponse dataResponse = changeFeedDataPage("token-C", new Document());
+
+ assertThat(invokeIsFullyDrained(fetcher, dataResponse)).isTrue();
+ }
+
+ @Test(groups = { "unit" })
+ public void isFullyDrained_noChangesResponseWithNotModifiedPagesAllowedFalse_returnsTrue() {
+ // Regression test: with the default notModifiedPagesAllowed=false the noChanges short-circuit
+ // MUST stay in place. Otherwise any non-Spark caller that drains the change feed flux
+ // (e.g. queryChangeFeed(...).byPage().toIterable().iterator()) would loop forever,
+ // because FeedRangeCompositeContinuationImpl.isDone() returns
+ // compositeContinuationTokens.size()==0, which is permanently false for incremental
+ // change feed (moveToNextToken rotates the deque, never shrinks it). The
+ // NO_RETRY result from handleChangeFeedNotModified is the only termination signal,
+ // and Paginator only honors it via Fetcher.shouldFetchMore() being flipped by
+ // isFullyDrained=true on this noChanges page.
+ FeedRangeContinuation continuation = mock(FeedRangeContinuation.class);
+ when(continuation.isDone()).thenReturn(false);
+ ChangeFeedFetcher fetcher = newFetcher(continuation, /* notModifiedPagesAllowed */ false);
+
+ FeedResponse noChangesResponse = changeFeedNoChanges("token-D");
+
+ assertThat(invokeIsFullyDrained(fetcher, noChangesResponse))
+ .describedAs("With notModifiedPagesAllowed=false the noChanges short-circuit must remain to terminate iteration")
+ .isTrue();
+ // The short-circuit must fire WITHOUT consulting the continuation; if a future
+ // refactor accidentally drops the noChanges check and falls through to
+ // continuation.isDone() (which is permanently false in incremental change feed),
+ // this verify catches it as a loud failure rather than a hard-to-diagnose hang.
+ Mockito.verify(continuation, Mockito.never()).isDone();
+ }
+
+ @Test(groups = { "unit" }, timeOut = 10_000)
+ public void nextPage_notModifiedPagesAllowedTrue_surfacesNoChangesPagesIndividually() {
+ // Scenario: 3 consecutive noChanges (304) pages from the same sub-feedRange,
+ // followed by a real data page. With notModifiedPagesAllowed=true, each of the 4
+ // physical responses must surface as its own Mono emission so the Spark
+ // iterator can drain them under the per-page end-to-end timeout window.
+ FeedRangeContinuation continuation = mock(FeedRangeContinuation.class);
+ when(continuation.isDone()).thenReturn(false);
+ // handleChangeFeedNotModified returns RETRY_NOW only for actual noChanges responses
+ // (matching the real FeedRangeCompositeContinuationImpl behavior).
+ when(continuation.handleChangeFeedNotModified(any())).thenAnswer(invocation -> {
+ FeedResponse> rsp = invocation.getArgument(0);
+ return ModelBridgeInternal.noChanges(rsp) ? ShouldRetryResult.RETRY_NOW : ShouldRetryResult.noRetry();
+ });
+
+ FeedResponse noChanges1 = changeFeedNoChanges("t1");
+ FeedResponse noChanges2 = changeFeedNoChanges("t2");
+ FeedResponse noChanges3 = changeFeedNoChanges("t3");
+ FeedResponse data = changeFeedDataPage("t4", new Document());
+
+ FeedResponse[] script = new FeedResponse[] { noChanges1, noChanges2, noChanges3, data };
+ AtomicInteger callIndex = new AtomicInteger();
+ Function>> executeFunc =
+ req -> Mono.just(script[callIndex.getAndIncrement()]);
+
+ ChangeFeedFetcher fetcher =
+ newFetcherWithExecuteFunc(continuation, /* notModifiedPagesAllowed */ true, executeFunc);
+
+ // Drive the fetcher across 4 nextPage() invocations and assert each one surfaces.
+ StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == noChanges1).verifyComplete();
+ StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == noChanges2).verifyComplete();
+ StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == noChanges3).verifyComplete();
+ StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == data).verifyComplete();
+
+ assertThat(callIndex.get()).describedAs("executeFunc should have been called once per surfaced page").isEqualTo(4);
+ }
+
+ @Test(groups = { "unit" }, timeOut = 10_000)
+ public void nextPage_notModifiedPagesAllowedTrueWithNoRetryOnNoChanges_terminatesIteration() {
+ // Defense-in-depth: with notModifiedPagesAllowed=true, isFullyDrained() consults only
+ // continuation.isDone() (permanently false in incremental change feed), so the
+ // SDK's own termination signal would otherwise be lost. nextPageInternal must
+ // explicitly disableShouldFetchMore() when handleChangeFeedNotModified returns
+ // NO_RETRY on a noChanges page (single-partition case, multi-partition full
+ // cycle complete, or the >4*(size+1) consecutive-304 defense).
+ //
+ // This test scripts: 3 noChanges with RETRY_NOW (mid-cycle), followed by a 4th
+ // noChanges with NO_RETRY (terminal). All 4 must surface, and shouldFetchMore()
+ // must be false after the terminal page so Paginator's outer loop stops.
+ FeedRangeContinuation continuation = mock(FeedRangeContinuation.class);
+ when(continuation.isDone()).thenReturn(false);
+
+ FeedResponse mid1 = changeFeedNoChanges("t1");
+ FeedResponse mid2 = changeFeedNoChanges("t2");
+ FeedResponse mid3 = changeFeedNoChanges("t3");
+ FeedResponse terminal = changeFeedNoChanges("t4");
+
+ // The continuation distinguishes terminal from mid-cycle by reference identity.
+ when(continuation.handleChangeFeedNotModified(any())).thenAnswer(invocation -> {
+ FeedResponse> rsp = invocation.getArgument(0);
+ return rsp == terminal ? ShouldRetryResult.noRetry() : ShouldRetryResult.RETRY_NOW;
+ });
+
+ FeedResponse[] script = new FeedResponse[] { mid1, mid2, mid3, terminal };
+ AtomicInteger callIndex = new AtomicInteger();
+ Function>> executeFunc =
+ req -> Mono.just(script[callIndex.getAndIncrement()]);
+
+ ChangeFeedFetcher fetcher =
+ newFetcherWithExecuteFunc(continuation, /* notModifiedPagesAllowed */ true, executeFunc);
+
+ StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == mid1).verifyComplete();
+ assertThat(fetcher.shouldFetchMore()).describedAs("after mid1").isTrue();
+ StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == mid2).verifyComplete();
+ assertThat(fetcher.shouldFetchMore()).describedAs("after mid2").isTrue();
+ StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == mid3).verifyComplete();
+ assertThat(fetcher.shouldFetchMore()).describedAs("after mid3").isTrue();
+ StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == terminal).verifyComplete();
+ assertThat(fetcher.shouldFetchMore())
+ .describedAs("NO_RETRY on terminal noChanges page MUST stop Paginator from polling again")
+ .isFalse();
+ assertThat(callIndex.get())
+ .describedAs("after NO_RETRY termination, executeFunc must NOT be called again")
+ .isEqualTo(4);
+ }
+
+ @Test(groups = { "unit" }, timeOut = 10_000)
+ public void nextPage_notModifiedPagesAllowedTrueWithDataPages_doesNotTerminate() {
+ // Defense-in-depth contract guard: handleChangeFeedNotModified in the real
+ // FeedRangeCompositeContinuationImpl returns NO_RETRY for EVERY non-noChanges
+ // response (the early `if (!noChanges(r))` clause resets state and then falls
+ // through to the final `return NO_RETRY`). The branch-2 termination logic in
+ // ChangeFeedFetcher.nextPageInternal therefore MUST gate the
+ // disableShouldFetchMore() call on `noChanges(r) && notModifiedPagesAllowed`; if a
+ // future refactor drops the noChanges(r) guard, every data page would silently
+ // truncate iteration after the first emission. This test pins that contract.
+ FeedRangeContinuation continuation = mock(FeedRangeContinuation.class);
+ when(continuation.isDone()).thenReturn(false);
+ // Match real production behavior: NO_RETRY on data pages, RETRY_NOW on noChanges.
+ when(continuation.handleChangeFeedNotModified(any())).thenAnswer(invocation -> {
+ FeedResponse> rsp = invocation.getArgument(0);
+ return ModelBridgeInternal.noChanges(rsp) ? ShouldRetryResult.RETRY_NOW : ShouldRetryResult.noRetry();
+ });
+
+ FeedResponse data1 = changeFeedDataPage("d1", new Document());
+ FeedResponse data2 = changeFeedDataPage("d2", new Document());
+ FeedResponse data3 = changeFeedDataPage("d3", new Document());
+
+ FeedResponse[] script = new FeedResponse[] { data1, data2, data3 };
+ AtomicInteger callIndex = new AtomicInteger();
+ Function>> executeFunc =
+ req -> Mono.just(script[callIndex.getAndIncrement()]);
+
+ ChangeFeedFetcher fetcher =
+ newFetcherWithExecuteFunc(continuation, /* notModifiedPagesAllowed */ true, executeFunc);
+
+ StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == data1).verifyComplete();
+ assertThat(fetcher.shouldFetchMore())
+ .describedAs("data pages must NOT terminate iteration even when handleChangeFeedNotModified returns NO_RETRY")
+ .isTrue();
+ StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == data2).verifyComplete();
+ assertThat(fetcher.shouldFetchMore()).describedAs("after data2").isTrue();
+ StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == data3).verifyComplete();
+ assertThat(fetcher.shouldFetchMore()).describedAs("after data3").isTrue();
+ }
+
+ @Test(groups = { "unit" }, timeOut = 10_000)
+ public void nextPage_notModifiedPagesAllowedFalse_swallowsNoChangesPagesUntilData() {
+ // Same scenario, but with the default notModifiedPagesAllowed=false. The
+ // 3 noChanges responses should be swallowed via repeatWhenEmpty and
+ // only the data page should surface from a SINGLE nextPage() call.
+ FeedRangeContinuation continuation = mock(FeedRangeContinuation.class);
+ when(continuation.isDone()).thenReturn(false);
+ when(continuation.handleChangeFeedNotModified(any())).thenAnswer(invocation -> {
+ FeedResponse> rsp = invocation.getArgument(0);
+ return ModelBridgeInternal.noChanges(rsp) ? ShouldRetryResult.RETRY_NOW : ShouldRetryResult.noRetry();
+ });
+
+ FeedResponse noChanges1 = changeFeedNoChanges("t1");
+ FeedResponse noChanges2 = changeFeedNoChanges("t2");
+ FeedResponse noChanges3 = changeFeedNoChanges("t3");
+ FeedResponse data = changeFeedDataPage("t4", new Document());
+
+ FeedResponse[] script = new FeedResponse[] { noChanges1, noChanges2, noChanges3, data };
+ AtomicInteger callIndex = new AtomicInteger();
+ Function>> executeFunc =
+ req -> Mono.just(script[callIndex.getAndIncrement()]);
+
+ ChangeFeedFetcher fetcher =
+ newFetcherWithExecuteFunc(continuation, /* notModifiedPagesAllowed */ false, executeFunc);
+
+ // A single nextPage() should internally drain all 3 noChanges responses and
+ // emit the data response.
+ StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == data).verifyComplete();
+
+ assertThat(callIndex.get()).describedAs("executeFunc should be called once per physical fetch").isEqualTo(4);
+ }
+
+ @Test(groups = { "unit" }, timeOut = 10_000)
+ public void nextPage_endLsnSet_notModifiedPagesAllowedTrue_surfacesNoChangesUntilHasFetchedAllChanges() {
+ // Spark batch reads set BOTH endLSN AND notModifiedPagesAllowed=true, routing through
+ // ChangeFeedFetcher.nextPageInternal branch 1 (the
+ // `completeAfterAllCurrentChangesRetrieved || endLSN != null` path). That branch calls
+ // surfaceOrSwallowNoChangesPage(r) on noChanges pages, then terminates via
+ // hasFetchedAllChanges -> disableShouldFetchMore(). This test pins both halves of the
+ // contract: empty pages surface to the caller (per-page timeout enforcement) AND
+ // iteration eventually terminates when hasFetchedAllChanges returns true.
+ FeedRangeContinuation continuation = mock(FeedRangeContinuation.class);
+ when(continuation.isDone()).thenReturn(false);
+
+ FeedResponse noChanges1 = changeFeedNoChanges("t1");
+ FeedResponse noChanges2 = changeFeedNoChanges("t2");
+ FeedResponse terminal = changeFeedNoChanges("t3");
+
+ // hasFetchedAllChanges returns true only for the terminal page (reference identity).
+ when(continuation.hasFetchedAllChanges(any(), any())).thenAnswer(invocation -> {
+ FeedResponse> rsp = invocation.getArgument(0);
+ return rsp == terminal;
+ });
+
+ FeedResponse[] script = new FeedResponse[] { noChanges1, noChanges2, terminal };
+ AtomicInteger callIndex = new AtomicInteger();
+ Function>> executeFunc =
+ req -> Mono.just(script[callIndex.getAndIncrement()]);
+
+ ChangeFeedFetcher fetcher =
+ newFetcherWithExecuteFunc(continuation, /* notModifiedPagesAllowed */ true, /* endLSN */ 999L, executeFunc);
+
+ StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == noChanges1).verifyComplete();
+ assertThat(fetcher.shouldFetchMore()).describedAs("after noChanges1, mid-cycle").isTrue();
+ StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == noChanges2).verifyComplete();
+ assertThat(fetcher.shouldFetchMore()).describedAs("after noChanges2, mid-cycle").isTrue();
+ StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == terminal).verifyComplete();
+ assertThat(fetcher.shouldFetchMore())
+ .describedAs("hasFetchedAllChanges=true on terminal page MUST stop Paginator from polling again")
+ .isFalse();
+ assertThat(callIndex.get())
+ .describedAs("after termination, executeFunc must NOT be called again")
+ .isEqualTo(3);
+ }
+
+ @Test(groups = { "unit" }, timeOut = 10_000)
+ public void nextPage_endLsnSet_notModifiedPagesAllowedFalse_swallowsNoChangesViaRepeatWhenEmpty() {
+ // Legacy regression guard for branch 1 with the default notModifiedPagesAllowed=false.
+ // The 2 noChanges responses should be swallowed via repeatWhenEmpty; only the data
+ // page should surface from a SINGLE nextPage() call (matching legacy behavior).
+ FeedRangeContinuation continuation = mock(FeedRangeContinuation.class);
+ when(continuation.isDone()).thenReturn(false);
+
+ FeedResponse noChanges1 = changeFeedNoChanges("t1");
+ FeedResponse noChanges2 = changeFeedNoChanges("t2");
+ FeedResponse data = changeFeedDataPage("t3", new Document());
+
+ // hasFetchedAllChanges returns false for all 3 (iteration shouldn't terminate via this signal).
+ when(continuation.hasFetchedAllChanges(any(), any())).thenReturn(false);
+
+ FeedResponse[] script = new FeedResponse[] { noChanges1, noChanges2, data };
+ AtomicInteger callIndex = new AtomicInteger();
+ Function>> executeFunc =
+ req -> Mono.just(script[callIndex.getAndIncrement()]);
+
+ ChangeFeedFetcher fetcher =
+ newFetcherWithExecuteFunc(continuation, /* notModifiedPagesAllowed */ false, /* endLSN */ 999L, executeFunc);
+
+ // A single nextPage() should drain all 2 noChanges responses via repeatWhenEmpty
+ // and surface the data page.
+ StepVerifier.create(fetcher.nextPage()).expectNextMatches(r -> r == data).verifyComplete();
+ assertThat(callIndex.get()).describedAs("executeFunc should be called once per physical fetch").isEqualTo(3);
+ }
+
+ // Note: the >4*(size+1) consecutive-304 defense in FeedRangeCompositeContinuationImpl is
+ // pinned end-to-end by CosmosContainerChangeFeedTest.changeFeedQuery_notModifiedPagesAllowed_*
+ // (emulator group). That impl class is package-private, so a unit-level integration test
+ // here would require reflection — the emulator test exercises the real production path.
+
+ // ---- helpers ----
+
+ private static FeedResponse changeFeedNoChanges(String continuationToken) {
+ return FeedResponseBuilder.changeFeedResponseBuilder(Document.class)
+ .withContinuationToken(continuationToken)
+ .lastChangeFeedPage()
+ .build();
+ }
+
+ private static FeedResponse changeFeedDataPage(String continuationToken, Document... docs) {
+ return FeedResponseBuilder.changeFeedResponseBuilder(Document.class)
+ .withContinuationToken(continuationToken)
+ .withResults(docs)
+ .build();
+ }
+
+ private static ChangeFeedFetcher newFetcher(FeedRangeContinuation continuation, boolean notModifiedPagesAllowed) {
+ return newFetcherWithExecuteFunc(continuation, notModifiedPagesAllowed, /* endLSN */ null, req -> Mono.empty());
+ }
+
+ private static ChangeFeedFetcher newFetcherWithExecuteFunc(
+ FeedRangeContinuation continuation,
+ boolean notModifiedPagesAllowed,
+ Function>> executeFunc) {
+
+ return newFetcherWithExecuteFunc(continuation, notModifiedPagesAllowed, /* endLSN */ null, executeFunc);
+ }
+
+ private static ChangeFeedFetcher newFetcherWithExecuteFunc(
+ FeedRangeContinuation continuation,
+ boolean notModifiedPagesAllowed,
+ Long endLSN,
+ Function>> executeFunc) {
+
+ RxDocumentClientImpl client = mock(RxDocumentClientImpl.class);
+ IRetryPolicyFactory resetSessionTokenRetryPolicy = mock(IRetryPolicyFactory.class);
+ DocumentClientRetryPolicy noOpRetryPolicy = mock(DocumentClientRetryPolicy.class);
+ when(noOpRetryPolicy.shouldRetry(any())).thenReturn(Mono.just(ShouldRetryResult.noRetry()));
+ when(resetSessionTokenRetryPolicy.getRequestPolicy(any())).thenReturn(noOpRetryPolicy);
+ when(client.getResetSessionTokenRetryPolicy()).thenReturn(resetSessionTokenRetryPolicy);
+ ChangeFeedState changeFeedState = mock(ChangeFeedState.class);
+ FeedRangeInternal feedRange = FeedRangeEpkImpl.forFullRange();
+ when(changeFeedState.getContinuation()).thenReturn(continuation);
+ when(changeFeedState.getFeedRange()).thenReturn(feedRange);
+ doNothing().when(changeFeedState).populateRequest(any(RxDocumentServiceRequest.class), anyInt());
+
+ Supplier createRequestFunc = ChangeFeedFetcherNotModifiedPagesTest::stubRequest;
+
+ Map requestOptionProperties = new HashMap<>();
+ GlobalEndpointManager gem = mock(GlobalEndpointManager.class);
+ GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker gpe =
+ mock(GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker.class);
+ when(gpe.isPerPartitionLevelCircuitBreakingApplicable(any())).thenReturn(false);
+
+ return new ChangeFeedFetcher<>(
+ client,
+ createRequestFunc,
+ executeFunc,
+ changeFeedState,
+ requestOptionProperties,
+ /* top */ -1,
+ /* maxItemCount */ 100,
+ /* isSplitHandlingDisabled */ true,
+ /* completeAfterAllCurrentChangesRetrieved */ false,
+ notModifiedPagesAllowed,
+ endLSN,
+ /* operationContext */ null,
+ gem,
+ gpe,
+ /* diagnosticsClientContext */ null);
+ }
+
+ private static RxDocumentServiceRequest stubRequest() {
+ RxDocumentServiceRequest request = mock(RxDocumentServiceRequest.class);
+ when(request.getIsNameBased()).thenReturn(true);
+ when(request.getResourceAddress()).thenReturn("dbs/db1/colls/coll1");
+ when(request.getOperationType()).thenReturn(OperationType.ReadFeed);
+ when(request.getResourceType()).thenReturn(ResourceType.Document);
+ // requestContext and faultInjectionRequestContext are public fields on the real class;
+ // direct assignment on the mock works (Mockito only intercepts method calls).
+ request.requestContext = new DocumentServiceRequestContext();
+ request.faultInjectionRequestContext = new FaultInjectionRequestContext();
+ return request;
+ }
+
+ // isFullyDrained is `protected` on Fetcher; this test class lives in the same package
+ // (com.azure.cosmos.implementation.query), so we can call it directly without reflection.
+ private static boolean invokeIsFullyDrained(
+ ChangeFeedFetcher fetcher,
+ FeedResponse response) {
+
+ return fetcher.isFullyDrained(/* isChangeFeed */ true, response);
+ }
+}
diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md
index 4d6c3781b00b..e43597e1d10c 100644
--- a/sdk/cosmos/azure-cosmos/CHANGELOG.md
+++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md
@@ -10,6 +10,7 @@
#### Bugs Fixed
#### Other Changes
+* Added an internal `emptyPagesAllowed` field on `CosmosChangeFeedRequestOptionsImpl` (default `false`; not exposed publicly). When set, `ChangeFeedFetcher` surfaces 304/empty pages to the caller. Consumed by the Cosmos Spark connector to fix an `OperationCancelledException` on sparse cross-partition change-feed workloads. Default behavior is unchanged. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)
* Replaced per-client `Schedulers.newSingle()` schedulers in `GlobalEndpointManager` and `GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker` with shared `BoundedElastic` schedulers in `CosmosSchedulers` to prevent thread count from scaling linearly with client/tenant count. - See [PR 49062](https://github.com/Azure/azure-sdk-for-java/pull/49062)
* Fixed a sporadic `NullPointerException` in `JsonSerializable.getWithMapping` triggered by concurrent first-time calls to `DatabaseAccount.getConsistencyPolicy()` and its sibling lazy getters (`getReplicationPolicy`, `getSystemReplicationPolicy`, `getQueryEngineConfiguration`). The fix makes `JsonSerializable.propertyBag` `final`, closing an unsafe-publication race in the lazy-initialisation pattern. - See [Issue 49256](https://github.com/Azure/azure-sdk-for-java/issues/49256) and [PR #49258](https://github.com/Azure/azure-sdk-for-java/pull/49258)
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java
index c8bca12fcbf4..f280e2562159 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ChangeFeedQueryImpl.java
@@ -121,6 +121,7 @@ public Flux> executeAsync() {
this.options.getMaxPrefetchPageCount(),
ModelBridgeInternal.getChangeFeedIsSplitHandlingDisabled(this.options),
this.options.isCompleteAfterAllCurrentChangesRetrieved(),
+ changeFeedOptionsAccessor().getAllowNotModifiedPages(this.options),
changeFeedOptionsAccessor()
.getEndLSN(this.options),
changeFeedOptionsAccessor()
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java
index 8c4f6fdd646b..429d13f24da0 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosChangeFeedRequestOptionsImpl.java
@@ -53,6 +53,7 @@ public final class CosmosChangeFeedRequestOptionsImpl implements OverridableRequ
private boolean completeAfterAllCurrentChangesRetrieved;
private Long endLSN;
private ReadConsistencyStrategy readConsistencyStrategy;
+ private boolean notModifiedPagesAllowed;
public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toBeCloned) {
if (toBeCloned.continuationState != null) {
@@ -80,6 +81,48 @@ public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toB
this.keywordIdentifiers = toBeCloned.keywordIdentifiers;
this.completeAfterAllCurrentChangesRetrieved = toBeCloned.completeAfterAllCurrentChangesRetrieved;
this.endLSN = toBeCloned.endLSN;
+ this.notModifiedPagesAllowed = toBeCloned.notModifiedPagesAllowed;
+ }
+
+ /**
+ * Inherits all non-token-encoded fields from {@code source} onto this instance, preserving the
+ * caller-supplied configuration when this instance was freshly built from a continuation token.
+ *
+ * The four fields encoded in the continuation token itself ({@code continuationState},
+ * {@code feedRangeInternal}, {@code mode}, {@code startFromInternal}) are intentionally NOT
+ * copied — they describe "where to resume from" and must come from the token, not the caller's
+ * pre-resume options. Every other field IS copied so the caller's configuration (endLSN,
+ * customSerializer, excludeRegions, read-consistency strategy, throughput-control group,
+ * diagnostic thresholds, etc.) survives the {@code byPage(savedContinuation)} round-trip.
+ *
+ *
Maintenance contract: when a new field is added to this class, decide whether the
+ * continuation token encodes it. If not (the common case for caller-supplied configuration),
+ * propagate it here.
+ */
+ public void inheritNonContinuationFieldsFrom(CosmosChangeFeedRequestOptionsImpl source) {
+ // continuationState, feedRangeInternal, mode, startFromInternal:
+ // intentionally NOT copied (encoded in the continuation token itself).
+ // collectionRid IS preserved: it lives on the impl but is not embedded in the
+ // continuation token (the token's separate containerRid lives on ChangeFeedStateV1).
+ // The rest-of-SDK clone path (RxDocumentClientImpl.queryDocumentChangeFeedFromPagedFluxInternal
+ // -> accessor.clone -> copy ctor) preserves collectionRid; we match that here.
+ this.maxItemCount = source.maxItemCount;
+ this.maxPrefetchPageCount = source.maxPrefetchPageCount;
+ this.isSplitHandlingDisabled = source.isSplitHandlingDisabled;
+ this.quotaInfoEnabled = source.quotaInfoEnabled;
+ this.throughputControlGroupName = source.throughputControlGroupName;
+ this.customOptions = source.customOptions;
+ this.operationContextAndListenerTuple = source.operationContextAndListenerTuple;
+ this.thresholds = source.thresholds;
+ this.excludeRegions = source.excludeRegions;
+ this.customSerializer = source.customSerializer;
+ this.partitionKeyDefinition = source.partitionKeyDefinition;
+ this.collectionRid = source.collectionRid;
+ this.keywordIdentifiers = source.keywordIdentifiers;
+ this.completeAfterAllCurrentChangesRetrieved = source.completeAfterAllCurrentChangesRetrieved;
+ this.endLSN = source.endLSN;
+ this.readConsistencyStrategy = source.readConsistencyStrategy;
+ this.notModifiedPagesAllowed = source.notModifiedPagesAllowed;
}
public CosmosChangeFeedRequestOptionsImpl(
@@ -185,6 +228,39 @@ public void setQuotaInfoEnabled(boolean quotaInfoEnabled) {
this.quotaInfoEnabled = quotaInfoEnabled;
}
+ /**
+ * Returns whether the change-feed pipeline surfaces 304/noChanges (empty) pages to the caller.
+ *
+ * @return {@code true} when each 304/noChanges page is surfaced individually (default is {@code false}).
+ */
+ public boolean isNotModifiedPagesAllowed() {
+ return this.notModifiedPagesAllowed;
+ }
+
+ /**
+ * Controls whether {@code ChangeFeedFetcher} surfaces 304/noChanges pages to the caller instead
+ * of swallowing them via Reactor's {@code repeatWhenEmpty}. Defaults to {@code false} (legacy
+ * behavior: empty pages are absorbed and only the next non-empty page is emitted).
+ *
+ *
When set to {@code true}, every physical 304 response surfaces as its own
+ * {@code FeedResponse}, so the SDK's per-page end-to-end timeout applies to each page rather
+ * than being exceeded by serial empty-page drains. Caller iterators MUST handle empty
+ * {@code FeedResponse} pages without entering retry loops.
+ *
+ *
Intentionally not surfaced on the public {@link com.azure.cosmos.models.CosmosChangeFeedRequestOptions}
+ * API. The flag changes paging semantics in subtle ways the SDK does not want most callers
+ * to opt into; reachable only from sibling modules (e.g. the Cosmos Spark connector) via the
+ * {@code ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper} bridge accessor.
+ *
+ * @param notModifiedPagesAllowed {@code true} to surface 304/noChanges pages individually;
+ * {@code false} (default) to swallow them via {@code repeatWhenEmpty}.
+ * @return this instance for fluent chaining.
+ */
+ public CosmosChangeFeedRequestOptionsImpl setNotModifiedPagesAllowed(boolean notModifiedPagesAllowed) {
+ this.notModifiedPagesAllowed = notModifiedPagesAllowed;
+ return this;
+ }
+
public void setDiagnosticsThresholds(
CosmosDiagnosticsThresholds operationSpecificThresholds) {
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java
index c48555496c29..ea0529f70d44 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java
@@ -444,6 +444,29 @@ public interface CosmosChangeFeedRequestOptionsAccessor {
void setPartitionKeyDefinition(CosmosChangeFeedRequestOptions changeFeedRequestOptions, PartitionKeyDefinition partitionKeyDefinition);
Map getProperties(CosmosChangeFeedRequestOptions changeFeedRequestOptions);
CosmosChangeFeedRequestOptions disableSplitHandling(CosmosChangeFeedRequestOptions changeFeedRequestOptions);
+
+ /**
+ * Change-feed-side analogue of {@link CosmosQueryRequestOptionsAccessor#setAllowEmptyPages(CosmosQueryRequestOptions, boolean)}.
+ * Controls whether {@code ChangeFeedFetcher} surfaces 304/NotModified pages (originating from
+ * sub-partitions that report no changes) to the caller instead of swallowing them via
+ * {@code repeatWhenEmpty}.
+ *
+ * Default is {@code false} (legacy swallow behavior). When {@code true}, every physical
+ * 304 response surfaces as its own {@code FeedResponse}; caller iterators must handle
+ * empty {@code FeedResponse} pages without entering retry loops. Intentionally NOT
+ * exposed on the public {@code CosmosChangeFeedRequestOptions} API — friend-only.
+ *
+ *
Naming differs from the query-side {@code setAllowEmptyPages} on purpose: on change
+ * feed, data-bearing empty pages already bubble up; this flag specifically opts into
+ * surfacing 304/NotModified sub-partition pages.
+ */
+ void setAllowNotModifiedPages(CosmosChangeFeedRequestOptions options, boolean notModifiedPagesAllowed);
+
+ /**
+ * Returns whether 304/NotModified pages are surfaced individually to the caller. See
+ * {@link #setAllowNotModifiedPages(CosmosChangeFeedRequestOptions, boolean)}.
+ */
+ boolean getAllowNotModifiedPages(CosmosChangeFeedRequestOptions options);
}
}
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java
index d39f5dfc059c..01ea5f29680a 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java
@@ -45,6 +45,7 @@ private static ImplementationBridgeHelpers.FeedResponseHelper.FeedResponseAccess
private final Supplier createRequestFunc;
private final Supplier feedRangeContinuationRetryPolicySupplier;
private final boolean completeAfterAllCurrentChangesRetrieved;
+ private final boolean notModifiedPagesAllowed;
private final Long endLSN;
public ChangeFeedFetcher(
@@ -57,6 +58,7 @@ public ChangeFeedFetcher(
int maxItemCount,
boolean isSplitHandlingDisabled,
boolean completeAfterAllCurrentChangesRetrieved,
+ boolean notModifiedPagesAllowed,
Long endLSN,
OperationContextAndListenerTuple operationContext,
GlobalEndpointManager globalEndpointManager,
@@ -85,6 +87,7 @@ public ChangeFeedFetcher(
diagnosticsClientContext);
this.createRequestFunc = createRequestFunc;
this.completeAfterAllCurrentChangesRetrieved = completeAfterAllCurrentChangesRetrieved;
+ this.notModifiedPagesAllowed = notModifiedPagesAllowed;
this.endLSN = endLSN;
}
@@ -118,44 +121,66 @@ public Mono> nextPage() {
private Mono> nextPageInternal(DocumentClientRetryPolicy retryPolicy) {
return Mono.fromSupplier(() -> nextPageCore(retryPolicy))
.flatMap(Function.identity())
- .flatMap((r) -> {
- FeedRangeContinuation continuationSnapshot =
- this.changeFeedState.getContinuation();
-
- if (this.completeAfterAllCurrentChangesRetrieved || this.endLSN != null) {
- if (continuationSnapshot != null) {
-
- //track the end-LSN for each sub-feedRange and then find the next sub-feedRange to fetch more changes
- boolean shouldComplete = continuationSnapshot.hasFetchedAllChanges(r, endLSN);
- if (shouldComplete) {
- this.disableShouldFetchMore();
- return Mono.just(r);
- }
-
- if (ModelBridgeInternal.noChanges(r)) {
- // if we have reached here, it means we have got 304 for the current feedRange,
- // but we need to continue drain the changes from other sub-feedRange
- this.reEnableShouldFetchMoreForRetry();
- return Mono.empty();
- }
- }
- } else {
- // complete query based on 304s
- if (continuationSnapshot != null &&
- continuationSnapshot.handleChangeFeedNotModified(r) == ShouldRetryResult.RETRY_NOW) {
-
- // not all continuations have been drained yet
- // repeat with the next continuation
- this.reEnableShouldFetchMoreForRetry();
- return Mono.empty();
- }
- }
-
- return Mono.just(r);
- })
+ .flatMap(this::applyNoChangesDecision)
.repeatWhenEmpty(o -> o);
}
+ /**
+ * Decides what to do with a single FeedResponse before it reaches the outer Paginator loop:
+ * surface it, swallow it via {@code repeatWhenEmpty}, or terminate iteration entirely. The
+ * decision depends on the change-feed mode (bounded vs streaming), whether the response is
+ * a noChanges 304, the continuation's {@code handleChangeFeedNotModified} signal, and whether
+ * the caller opted into {@code notModifiedPagesAllowed=true}.
+ */
+ private Mono> applyNoChangesDecision(FeedResponse r) {
+ FeedRangeContinuation continuationSnapshot = this.changeFeedState.getContinuation();
+
+ if (this.completeAfterAllCurrentChangesRetrieved || this.endLSN != null) {
+ if (continuationSnapshot != null) {
+ //track the end-LSN for each sub-feedRange and then find the next sub-feedRange to fetch more changes
+ boolean shouldComplete = continuationSnapshot.hasFetchedAllChanges(r, endLSN);
+ if (shouldComplete) {
+ this.disableShouldFetchMore();
+ return Mono.just(r);
+ }
+
+ if (ModelBridgeInternal.noChanges(r)) {
+ // 304 for the current sub-feedRange; need to drain the next one.
+ return surfaceOrRetryNoChangesPage(r);
+ }
+ }
+ } else {
+ // Streaming change feed (no endLSN). Terminate either when no continuation
+ // exists or when handleChangeFeedNotModified signals NO_RETRY (single-partition
+ // case, multi-partition full-cycle complete, or the >4*(size+1) consecutive-304
+ // defense in FeedRangeCompositeContinuationImpl).
+ if (continuationSnapshot != null) {
+ ShouldRetryResult retryResult = continuationSnapshot.handleChangeFeedNotModified(r);
+ if (retryResult == ShouldRetryResult.RETRY_NOW) {
+ // not all continuations have been drained yet; repeat with the next continuation
+ return surfaceOrRetryNoChangesPage(r);
+ }
+ }
+ }
+
+ return Mono.just(r);
+ }
+
+ /**
+ * Either surface a noChanges page to the caller (when notModifiedPagesAllowed=true) or swallow it via
+ * Reactor's repeatWhenEmpty (the legacy behavior). When swallowing, shouldFetchMore must be
+ * re-enabled first because isFullyDrained() already flipped it off for the noChanges page.
+ */
+ private Mono> surfaceOrRetryNoChangesPage(FeedResponse r) {
+ this.reEnableShouldFetchMoreForRetry();
+
+ if (this.notModifiedPagesAllowed) {
+ ModelBridgeInternal.setFeedResponseContinuationToken(this.changeFeedState.toString(), r);
+ return Mono.just(r);
+ }
+ return Mono.empty();
+ }
+
@Override
protected String applyServerResponseContinuation(
String serverContinuationToken,
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java
index cbd3ec9fba90..bf2fd3d332cd 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/Paginator.java
@@ -97,6 +97,7 @@ public static Flux> getChangeFeedQueryResultAsObservable(
int preFetchCount,
boolean isSplitHandlingDisabled,
boolean completeAfterAllCurrentChangesRetrieved,
+ boolean emptyPagesAllowed,
Long endLsn,
OperationContextAndListenerTuple operationContext,
DiagnosticsClientContext diagnosticsClientContext) {
@@ -112,6 +113,7 @@ public static Flux> getChangeFeedQueryResultAsObservable(
maxPageSize,
isSplitHandlingDisabled,
completeAfterAllCurrentChangesRetrieved,
+ emptyPagesAllowed,
endLsn,
operationContext,
client.getGlobalEndpointManager(),
diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java
index dba5a536ddb8..e8a155cafae2 100644
--- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java
+++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosChangeFeedRequestOptions.java
@@ -442,11 +442,18 @@ CosmosChangeFeedRequestOptions withCosmosPagedFluxOptions(
CosmosChangeFeedRequestOptions effectiveRequestOptions = this;
if (pagedFluxOptions.getRequestContinuation() != null) {
+ // Rebuild from the saved continuation token (this produces the 4 token-encoded
+ // fields: continuationState, feedRangeInternal, mode, startFromInternal) and then
+ // inherit every other field from the caller's original options. Without this
+ // inheritance, a byPage(savedContinuation) round-trip would silently drop
+ // caller-supplied configuration like endLSN, customSerializer, excludeRegions,
+ // readConsistencyStrategy, etc. See
+ // CosmosChangeFeedRequestOptionsImpl.inheritNonContinuationFieldsFrom for the
+ // exhaustive field-by-field rationale.
effectiveRequestOptions =
CosmosChangeFeedRequestOptions.createForProcessingFromContinuation(
pagedFluxOptions.getRequestContinuation());
- effectiveRequestOptions.setMaxPrefetchPageCount(this.getMaxPrefetchPageCount());
- effectiveRequestOptions.setThroughputControlGroupName(this.getThroughputControlGroupName());
+ effectiveRequestOptions.getImpl().inheritNonContinuationFieldsFrom(this.actualRequestOptions);
}
if (pagedFluxOptions.getMaxItemCount() != null) {
@@ -782,6 +789,16 @@ public Map getProperties(CosmosChangeFeedRequestOptions changeFe
public CosmosChangeFeedRequestOptions disableSplitHandling(CosmosChangeFeedRequestOptions changeFeedRequestOptions) {
return changeFeedRequestOptions.disableSplitHandling();
}
+
+ @Override
+ public void setAllowNotModifiedPages(CosmosChangeFeedRequestOptions options, boolean notModifiedPagesAllowed) {
+ options.getImpl().setNotModifiedPagesAllowed(notModifiedPagesAllowed);
+ }
+
+ @Override
+ public boolean getAllowNotModifiedPages(CosmosChangeFeedRequestOptions options) {
+ return options.getImpl().isNotModifiedPagesAllowed();
+ }
});
}