Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)
Comment thread
tvaron3 marked this conversation as resolved.

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ private case class ChangeFeedPartitionReader
.setEndLSN(options, this.partition.endLsn.get)
}

// Bubble empty pages up to the iterator so the per-page end-to-end timeout
// applies to each individual page rather than being exceeded by serial
// empty-page drains inside ChangeFeedFetcher.
ImplementationBridgeHelpers.CosmosChangeFeedRequestOptionsHelper.getCosmosChangeFeedRequestOptionsAccessor
Comment thread
tvaron3 marked this conversation as resolved.
.setAllowNotModifiedPages(options, true)

options.setCustomItemSerializer(itemDeserializer)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ private case class ItemsPartitionReader
.getCosmosQueryRequestOptionsAccessor
.disallowQueryPlanRetrieval(new CosmosQueryRequestOptions())

// Bubble empty pages up to the iterator so the per-page end-to-end timeout
// applies to each individual page rather than being exceeded by serial
// empty-page drains inside ParallelDocumentQueryExecutionContext.
ImplementationBridgeHelpers
.CosmosQueryRequestOptionsHelper
.getCosmosQueryRequestOptionsAccessor
.setAllowEmptyPages(queryOptions, true)

private val readConfig = CosmosReadConfig.parseCosmosReadConfig(config)
ThroughputControlHelper.populateThroughputControlGroupName(
ImplementationBridgeHelpers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,68 @@ class TransientIOErrorsRetryingIteratorSpec extends UnitSpec with BasicLoggingTr
factoryCallCount.get shouldEqual 1
}

"TransientIOErrors" should "drain long runs of empty pages without hitting the end-to-end timeout" in {
// Regression test for the empty-page drain scenario: when the SDK is configured with
// emptyPagesAllowed=true the iterator must surface many consecutive empty
// pages without busy-waiting beyond the per-page end-to-end timeout. Even
// with hundreds of empty pages followed by real data, the iterator should
// return all real rows.
val emptyLeadingPages = 200
val realPages = 5
val totalPages = emptyLeadingPages + realPages
val iterator = new TransientIOErrorsRetryingIterator(
continuationToken => generateMockedCosmosPagedFluxWithEmptyPrefix(
continuationToken, totalPages, emptyLeadingPages),
pageSize,
1,
None,
None
)
iterator.maxRetryIntervalInMs = 5

// 2 producers (Left/Right) each emit realPages * pageSize rows
iterator.count(_ => true) shouldEqual (realPages * pageSize * 2)
}

private def generateMockedCosmosPagedFluxWithEmptyPrefix
(
continuationToken: String,
initialPageCount: Int,
leadingEmptyPageCount: Int
) = {

val leftProducer = generateFeedResponseFluxWithEmptyPrefix(
"Left", initialPageCount, leadingEmptyPageCount, Option.apply(continuationToken))
val rightProducer = generateFeedResponseFluxWithEmptyPrefix(
"Right", initialPageCount, leadingEmptyPageCount, Option.apply(continuationToken))
val toBeMerged = Array(leftProducer, rightProducer).toIterable.asJava
val mergedFlux = Flux.mergeSequential(toBeMerged, 1, 2)
UtilBridgeInternal.createCosmosPagedFlux(_ => mergedFlux)
}

private def generateFeedResponseFluxWithEmptyPrefix
(
prefix: String,
pageCount: Int,
leadingEmptyPageCount: Int,
requestContinuationToken: Option[String]
): Flux[FeedResponse[SparkRowItem]] = {

// generateFeedResponse uses documentStartIndex=-1 as the "emit an empty page" sentinel.
val emptyPageSentinel = -1
val firstDataPageStartIndex = 1

val responses = Array.range(1, pageCount + 1)
.map(i => generateFeedResponse(
prefix,
i,
if (i <= leadingEmptyPageCount) emptyPageSentinel else firstDataPageStartIndex))
.filter(response => requestContinuationToken.isEmpty ||
requestContinuationToken.get < response.getContinuationToken)

Flux.fromArray(responses)
}

private val objectMapper = new ObjectMapper

@throws[JsonProcessingException]
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_4-0_2-13/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_4-1_2-13/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Improved partition planning performance for change feed with large number of feed ranges. - See [PR 49086](https://github.com/Azure/azure-sdk-for-java/pull/49086)
* Fixed `OperationCancelledException` ("End-to-end timeout hit") on sparse cross-partition queries / change feed by opting into the SDK's `emptyPagesAllowed` behavior, so the per-page timeout applies per page instead of being exceeded by serial empty-page drains. Note: this surfaces one iterator callback per empty page where previously a single callback could drain many. - See [PR 49276](https://github.com/Azure/azure-sdk-for-java/pull/49276)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,14 @@ public void asyncChangeFeed_fromBeginning_incremental_forLogicalPartition() thro
}
}

@Test(groups = { "emulator" }, dataProvider = "changeFeedQueryPrefetchingDataProvider", timeOut = TIMEOUT)
@Test(groups = { "emulator" }, dataProvider = "changeFeedQueryPrefetchingDataProvider",
timeOut = TIMEOUT, retryAnalyzer = FlakyTestRetryAnalyzer.class)
public void asyncChangeFeedPrefetching(ChangeFeedMode changeFeedMode) throws Exception {
// Note on shape: this test verifies Reactor's prefetch behavior on the change-feed
// byPage stream. The two fire-and-forget `.subscribe()` calls + `Thread.sleep(3000)`
// are intentional — they exercise the prefetch path without backpressure-bounded
// collection. retryAnalyzer = FlakyTestRetryAnalyzer absorbs occasional slow-runner
// jitter (Windows EmulatorTcp Java 8 can take >3s to deliver the first 3 pages).
this.createContainer(
(cp) -> {
if (changeFeedMode.equals(ChangeFeedMode.INCREMENTAL)) {
Expand Down Expand Up @@ -1097,6 +1103,78 @@ public void changeFeedQueryCompleteAfterAvailableNow(
}
}

@Test(groups = { "emulator" }, timeOut = TIMEOUT * 5)
public void changeFeedQuery_notModifiedPagesAllowed_surfacesNoChangesPagesAndTerminates() {
// End-to-end guard: when the SDK is opted into notModifiedPagesAllowed=true
// (via the friend-API bridge accessor — the same opt-in the Cosmos Spark connector uses),
// change-feed reads against a multi-partition container must:
// (a) surface 304/noChanges pages individually to the caller, AND
// (b) terminate via the FeedRangeCompositeContinuationImpl >4*(size+1) consecutive-304
// defense rather than poll indefinitely.
//
// This is the integration-level pin for the contract that ChangeFeedFetcher.nextPageInternal
// branch 2 explicitly calls disableShouldFetchMore() on NO_RETRY noChanges. Without that
// arm, a caller that drained the flux to completion would hang.
String testContainerId = UUID.randomUUID().toString();
try {
CosmosContainerProperties containerProperties = new CosmosContainerProperties(testContainerId, "/mypk");
CosmosAsyncContainer testContainer =
createCollection(
this.createdAsyncDatabase,
containerProperties,
new CosmosContainerRequestOptions(),
// throughput high enough to provision multiple physical partitions
11000);

// Sparse workload: a few docs spread across partitions; most physical partitions
// will return 304 / noChanges on read, exercising the empty-page surfacing path.
insertDocuments(/* partitionCount */ 3, /* documentCount */ 2, testContainer);

CosmosChangeFeedRequestOptions options =
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange());
ImplementationBridgeHelpers
.CosmosChangeFeedRequestOptionsHelper
.getCosmosChangeFeedRequestOptionsAccessor()
.setAllowNotModifiedPages(options, true);

AtomicInteger totalPagesObserved = new AtomicInteger(0);
AtomicInteger totalDocsObserved = new AtomicInteger(0);
AtomicInteger notModifiedPagesObserved = new AtomicInteger(0);

// Drain a bounded slice of the change feed - the iteration must terminate within
// a reasonable page count via the SDK's consecutive-304 defense.
testContainer.queryChangeFeed(options, JsonNode.class)
.byPage(1)
.take(100)
.doOnNext(response -> {
totalPagesObserved.incrementAndGet();
int pageSize = response.getResults().size();
totalDocsObserved.addAndGet(pageSize);
if (pageSize == 0) {
notModifiedPagesObserved.incrementAndGet();
}
})
.blockLast();

// (a) at least some empty pages must have surfaced - the whole point of the opt-in
assertThat(notModifiedPagesObserved.get())
.describedAs("notModifiedPagesAllowed=true must surface 304/noChanges pages individually")
.isGreaterThan(0);
// (b) all inserted docs must be observed - empty-page surfacing must not interfere
// with data-page emission
assertThat(totalDocsObserved.get())
.describedAs("all inserted documents must surface")
.isEqualTo(6);
// (c) iteration must have terminated (we didn't hit the take(100) cap, otherwise
// we'd be polling indefinitely - that's the regression the defense-in-depth arm prevents)
assertThat(totalPagesObserved.get())
.describedAs("iteration must terminate via consecutive-304 defense, not hit the take(100) cap")
.isLessThan(100);
} finally {
safeDeleteCollection(this.createdAsyncDatabase.getContainer(testContainerId));
}
}

void insertDocuments(
int partitionCount,
int documentCount) {
Expand Down
Loading
Loading