diff --git a/sdk-core/api/sdk-core.api b/sdk-core/api/sdk-core.api index 7ebcc312..4fab19ad 100644 --- a/sdk-core/api/sdk-core.api +++ b/sdk-core/api/sdk-core.api @@ -1977,6 +1977,14 @@ public abstract interface class org/dexpace/sdk/core/io/Source : java/io/Closeab public abstract fun read (Lorg/dexpace/sdk/core/io/Buffer;J)J } +public final class org/dexpace/sdk/core/pagination/AsyncPaginator { + public fun (Lorg/dexpace/sdk/core/client/AsyncHttpClient;Lorg/dexpace/sdk/core/http/request/Request;Lorg/dexpace/sdk/core/pagination/PaginationStrategy;)V + public fun (Lorg/dexpace/sdk/core/client/AsyncHttpClient;Lorg/dexpace/sdk/core/http/request/Request;Lorg/dexpace/sdk/core/pagination/PaginationStrategy;J)V + public synthetic fun (Lorg/dexpace/sdk/core/client/AsyncHttpClient;Lorg/dexpace/sdk/core/http/request/Request;Lorg/dexpace/sdk/core/pagination/PaginationStrategy;JILkotlin/jvm/internal/DefaultConstructorMarker;)V + public final fun collectAllAsync ()Ljava/util/concurrent/CompletableFuture; + public final fun forEachAsync (Ljava/util/function/Consumer;)Ljava/util/concurrent/CompletableFuture; +} + public final class org/dexpace/sdk/core/pagination/CursorPaginationStrategy : org/dexpace/sdk/core/pagination/PaginationStrategy { public fun (Lkotlin/jvm/functions/Function1;)V public fun (Lkotlin/jvm/functions/Function1;Ljava/lang/String;)V diff --git a/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt new file mode 100644 index 00000000..ef80d580 --- /dev/null +++ b/sdk-core/src/main/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginator.kt @@ -0,0 +1,289 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.pagination + +import org.dexpace.sdk.core.client.AsyncHttpClient +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.http.response.Response +import org.dexpace.sdk.core.util.Futures +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicBoolean +import java.util.function.Consumer + +/** + * Asynchronous, strategy-driven paginator over an [AsyncHttpClient] — the non-blocking + * counterpart of [Paginator]. + * + * `AsyncPaginator` executes [initialRequest] against [asyncHttpClient], delegates each + * response to [strategy], and drives the page-to-page walk through a chain of + * [CompletableFuture]s. No thread blocks waiting on a page: each page is fetched, parsed, + * drained to the caller's consumer, and the next page is re-armed inside the future's + * completion graph. The same wire conventions ([PaginationStrategy], [Page]) back both the + * sync and async paginators, so a strategy written for [Paginator] works here unchanged. + * + * ## Laziness and the fetch budget + * + * Iteration is page-lazy in the same sense as [Paginator]: exactly one HTTP exchange happens + * per page consumed. A new page is fetched only after the previous page has been drained to + * the consumer and reports `hasNext` with a non-null next request. Empty pages still count + * toward the [maxPages] budget. + * + * ## Termination + * + * The walk completes when any of: + * + * - the current page reports `hasNext == false`, or + * - the current page's `nextPageRequest()` returns `null`, or + * - [maxPages] pages have been fetched (the safety cap), or + * - the consumer throws, or a transport/parse failure occurs (the result future completes + * exceptionally). + * + * ## Safety cap + * + * [maxPages] defaults to `Long.MAX_VALUE` (effectively unbounded). A misbehaving server that + * never advances its paging cursor would otherwise drive an unbounded fetch loop. **Production + * callers should set a finite cap.** The cap counts pages fetched (HTTP exchanges), not items. + * + * ## Response lifecycle + * + * Each [Response] is closed by the paginator after the strategy has parsed it — including on + * the exceptional path, where the response is closed before the result future is completed + * exceptionally. Strategies MUST read everything they need synchronously inside `parse(...)` + * and MUST NOT retain the response or its body past the call. Items in the returned [Page] + * outlive the response. + * + * ## Consumer threading + * + * The [Consumer] passed to [forEachAsync] is invoked on whichever thread completes the page + * future — typically a transport callback thread. It runs inline in the completion graph, so + * a slow consumer holds up the walk. The consumer is never invoked concurrently for a single + * [forEachAsync] call, but it MUST NOT assume any particular thread. A consumer that throws + * aborts the walk and surfaces through the result future. + * + * ## Stack safety + * + * The driver is trampolined: synchronously completed page futures (e.g. from an in-memory + * fake transport, or a cache hit) are processed in a loop rather than via recursive + * `thenCompose`, so a long run of already-complete pages does not overflow the stack. + * + * ## Thread-safety + * + * `AsyncPaginator` itself holds only immutable fields and is safe to share. Each call to + * [forEachAsync] / [collectAllAsync] starts an independent walk with its own state. + * + * ## Java interop + * + * ``` + * AsyncPaginator paginator = new AsyncPaginator<>(client, request, strategy); + * paginator.forEachAsync(item -> handle(item)) + * .thenRun(() -> done()); + * paginator.collectAllAsync().thenAccept(items -> ...); + * ``` + * + * @param T Element type yielded by the paginator. + * @property asyncHttpClient Async transport used to execute each page request. + * @property initialRequest Request used to fetch the first page; also passed to the strategy + * as a template for building subsequent page requests. + * @property strategy Strategy that parses each response into a [Page]. + * @property maxPages Safety cap on the total number of pages (HTTP exchanges) the walk will + * fetch. Defaults to `Long.MAX_VALUE` (unbounded). Must be positive. + */ +public class AsyncPaginator + @JvmOverloads + constructor( + private val asyncHttpClient: AsyncHttpClient, + private val initialRequest: Request, + private val strategy: PaginationStrategy, + private val maxPages: Long = Long.MAX_VALUE, + ) { + init { + require(maxPages > 0L) { "maxPages must be positive, was $maxPages" } + } + + /** + * Walks every item across every page, invoking [consumer] for each item in + * server-defined order. The returned future completes (with `null`) once the walk has + * terminated normally, or completes exceptionally if the transport fails, a strategy + * `parse` throws, or [consumer] throws. + * + * Each call starts a fresh walk from [initialRequest]. + * + * @param consumer Invoked once per item. See the class-level "Consumer threading" KDoc. + * @return A future that completes when the walk finishes. + */ + public fun forEachAsync(consumer: Consumer): CompletableFuture { + val result = CompletableFuture() + Walk(consumer, result).start() + return result + } + + /** + * Collects every item across every page into a single list, in server-defined order. + * The returned future completes with the accumulated list, or completes exceptionally + * on transport/parse/consumer failure. + * + * Buffers all items in memory — prefer [forEachAsync] for large or unbounded result + * sets. + * + * @return A future that completes with all items. + */ + public fun collectAllAsync(): CompletableFuture> { + val items = ArrayList() + return forEachAsync { items.add(it) }.thenApply { items } + } + + /** + * Drives one independent walk. Single-call state; never reused across [forEachAsync] + * invocations. + */ + private inner class Walk( + private val consumer: Consumer, + private val result: CompletableFuture, + ) { + private var nextRequest: Request? = initialRequest + private var pagesFetched: Long = 0L + + // Guards against re-entrant driving: a page future that completes synchronously + // would otherwise recurse into the loop that scheduled it. We trampoline instead — + // whichever stack frame owns the loop picks up the staged work. + private val driving = AtomicBoolean(false) + private var pendingPage: Page? = null + + fun start() { + drive() + } + + /** + * Trampoline loop. Each [step] advances the walk by one unit of synchronously + * available work and reports back via [Step]: keep looping ([Step.CONTINUE]), + * terminate ([Step.DONE]), or yield the loop to an async completion callback + * ([Step.SUSPEND]). On `SUSPEND` the `driving` flag is left set — the callback owns + * it and clears it before re-entering [drive] — so this frame must not clear it. + */ + private fun drive() { + if (!driving.compareAndSet(false, true)) { + // Another stack frame owns the loop; it will observe the work we queued. + return + } + var suspended = false + try { + var outcome = step() + while (outcome == Step.CONTINUE) { + outcome = step() + } + suspended = outcome == Step.SUSPEND + } finally { + if (!suspended) { + driving.set(false) + } + } + } + + /** + * Performs one unit of synchronously available work: drain a staged page, finish the + * walk, or fetch the next page. A synchronously completed fetch yields [Step.CONTINUE]; + * a genuinely pending fetch arms a completion callback and yields [Step.SUSPEND]. + */ + private fun step(): Step { + val staged = pendingPage + if (staged != null) { + pendingPage = null + return if (drainPage(staged)) Step.CONTINUE else Step.DONE + } + val request = nextRequest + if (request == null || pagesFetched >= maxPages) { + // No next request, or the cap is reached before fetching a page we would + // otherwise yield: the walk is complete. + result.complete(null) + return Step.DONE + } + nextRequest = null + val pageFuture = fetchPage(request) + if (pageFuture.isDone) { + // Synchronous completion (fake transport, cache hit): handle inline to keep + // the stack flat instead of recursing through a callback. + return if (stagePage(pageFuture)) Step.CONTINUE else Step.DONE + } + // Genuinely async: hand the `driving` flag to the callback and suspend the loop. + pageFuture.whenComplete { _, _ -> + if (stagePage(pageFuture)) { + driving.set(false) + drive() + } + } + return Step.SUSPEND + } + + /** + * Stages a completed page future for draining. Returns `true` to continue driving, + * `false` if the future failed (the walk is then terminated exceptionally). + */ + private fun stagePage(future: CompletableFuture>): Boolean { + val page: Page = + try { + future.join() + } catch (t: Throwable) { + result.completeExceptionally(Futures.unwrap(t)) + return false + } + pendingPage = page + return true + } + + /** + * Emits a page's items to the consumer, then schedules the next request. Returns + * `true` to continue driving, `false` if the consumer threw (walk aborted). + */ + private fun drainPage(page: Page): Boolean { + try { + val items = page.items + var i = 0 + val size = items.size + while (i < size) { + consumer.accept(items[i]) + i++ + } + } catch (t: Throwable) { + result.completeExceptionally(t) + return false + } + nextRequest = if (page.hasNext) page.nextPageRequest() else null + return true + } + + /** + * Executes [request], parses the response into a [Page], and closes the response — + * mirroring [Paginator]'s per-page lifecycle. The returned future completes with the + * parsed page or exceptionally if the transport or strategy fails. + */ + private fun fetchPage(request: Request): CompletableFuture> { + pagesFetched++ + val transportFuture: CompletableFuture = + try { + asyncHttpClient.executeAsync(request) + } catch (t: Throwable) { + // An eager throw from executeAsync (a contract violation, but be + // defensive) becomes an exceptional future so the driver stays uniform. + return Futures.failed(t) + } + return transportFuture.handle { response, error -> + if (error != null) { + throw Futures.unwrap(error) + } + try { + strategy.parse(response, initialRequest) + } finally { + response.close() + } + } + } + } + } + +/** Outcome of a single step of [AsyncPaginator]'s trampoline loop. */ +private enum class Step { CONTINUE, DONE, SUSPEND } diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginatorTest.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginatorTest.kt new file mode 100644 index 00000000..025f2dff --- /dev/null +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/AsyncPaginatorTest.kt @@ -0,0 +1,338 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.pagination + +import org.dexpace.sdk.core.http.request.Method +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.http.response.Response +import java.io.IOException +import java.util.concurrent.CompletionException +import java.util.concurrent.ExecutionException +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue + +class AsyncPaginatorTest { + private val itemsExtractor: (Response) -> List = { resp -> + val body = resp.body!!.source().use { it.readUtf8() } + if (body.isEmpty()) emptyList() else body.split(",") + } + + @BeforeTest + fun setup() { + installIoProvider() + } + + private fun initialRequest(): Request = + Request.builder() + .url("https://api.example.com/items") + .method(Method.GET) + .build() + + private fun strategy(): LinkHeaderPaginationStrategy = LinkHeaderPaginationStrategy(itemsExtractor) + + /** A three-page Link-header stub: items -> ?page=2 -> ?page=3 (terminal). */ + private fun threePageClient(executor: java.util.concurrent.Executor? = null): StubAsyncHttpClient { + val client = StubAsyncHttpClient(executor) + client.on("https://api.example.com/items") { req -> + textResponse( + req, + "a,b", + extraHeaders = + mapOf("Link" to "; rel=\"next\""), + ) + } + client.on("https://api.example.com/items?page=2") { req -> + textResponse( + req, + "c,d", + extraHeaders = + mapOf("Link" to "; rel=\"next\""), + ) + } + client.on("https://api.example.com/items?page=3") { req -> + textResponse(req, "e") + } + return client + } + + @Test + fun `collectAllAsync walks every page synchronously-completed`() { + val client = threePageClient() + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val items = paginator.collectAllAsync().get(5, TimeUnit.SECONDS) + + assertEquals(listOf("a", "b", "c", "d", "e"), items) + assertEquals(3, client.callCount) + assertEquals( + listOf( + "https://api.example.com/items", + "https://api.example.com/items?page=2", + "https://api.example.com/items?page=3", + ), + client.receivedUrls, + ) + } + + @Test + fun `forEachAsync visits each item in order`() { + val client = threePageClient() + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val seen = ArrayList() + paginator.forEachAsync { seen.add(it) }.get(5, TimeUnit.SECONDS) + + assertEquals(listOf("a", "b", "c", "d", "e"), seen) + } + + @Test + fun `walk completes when pages complete on a background executor`() { + val executor = Executors.newFixedThreadPool(2) + try { + val client = threePageClient(executor) + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val items = paginator.collectAllAsync().get(5, TimeUnit.SECONDS) + + assertEquals(listOf("a", "b", "c", "d", "e"), items) + assertEquals(3, client.callCount) + } finally { + executor.shutdownNow() + } + } + + @Test + fun `maxPages caps a server that returns the same cursor forever`() { + val client = StubAsyncHttpClient() + client.on("https://api.example.com/items") { req -> + textResponse( + req, + "x", + extraHeaders = mapOf("Link" to "; rel=\"next\""), + ) + } + val paginator = AsyncPaginator(client, initialRequest(), strategy(), maxPages = 3L) + + val items = paginator.collectAllAsync().get(5, TimeUnit.SECONDS) + + assertEquals(listOf("x", "x", "x"), items) + assertEquals(3, client.callCount) + } + + @Test + fun `maxPages of one fetches only the first page`() { + val client = StubAsyncHttpClient() + client.on("https://api.example.com/items") { req -> + textResponse( + req, + "only", + extraHeaders = mapOf("Link" to "; rel=\"next\""), + ) + } + val paginator = AsyncPaginator(client, initialRequest(), strategy(), maxPages = 1L) + + assertEquals(listOf("only"), paginator.collectAllAsync().get(5, TimeUnit.SECONDS)) + assertEquals(1, client.callCount) + } + + @Test + fun `non-positive maxPages is rejected`() { + val client = StubAsyncHttpClient() + assertFailsWith { + AsyncPaginator(client, initialRequest(), strategy(), maxPages = 0L) + } + } + + @Test + fun `transport failure completes the result future exceptionally with the original cause`() { + val client = StubAsyncHttpClient() + client.on("https://api.example.com/items") { error("boom from transport") } + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val ex = + assertFailsWith { + paginator.collectAllAsync().get(5, TimeUnit.SECONDS) + } + // Futures.unwrap unwraps the CompletionException wrapper so callers see the cause. + assertTrue(ex.cause is IllegalStateException, "cause was ${ex.cause}") + assertEquals("boom from transport", ex.cause?.message) + } + + @Test + fun `strategy parse failure surfaces and still closes the response`() { + val closed = AtomicInteger(0) + val client = StubAsyncHttpClient() + client.on("https://api.example.com/items") { req -> + countingResponse(req, "a,b", closed) + } + // Strategy that always throws while parsing. + val failing = + PaginationStrategy { _, _ -> throw IOException("parse exploded") } + val paginator = AsyncPaginator(client, initialRequest(), failing) + + val ex = + assertFailsWith { + paginator.collectAllAsync().get(5, TimeUnit.SECONDS) + } + assertTrue(ex.cause is IOException, "cause was ${ex.cause}") + assertEquals(1, closed.get(), "response must be closed even when parse throws") + } + + @Test + fun `consumer throwing aborts the walk and surfaces the exception`() { + val client = threePageClient() + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val ex = + assertFailsWith { + paginator + .forEachAsync { item -> + if (item == "c") error("stop at c") + // else: keep going + }.get(5, TimeUnit.SECONDS) + } + assertTrue(ex.cause is IllegalStateException, "cause was ${ex.cause}") + assertEquals("stop at c", ex.cause?.message) + // The walk fetched page 1 (a,b) and page 2 (c,d), then aborted on 'c'. Page 3 untouched. + assertEquals(2, client.callCount) + } + + @Test + fun `each response is closed exactly once after parsing`() { + val closes = AtomicInteger(0) + val client = StubAsyncHttpClient() + client.on("https://api.example.com/items") { req -> + countingResponse( + req, + "a", + closes, + extraHeaders = + mapOf("Link" to "; rel=\"next\""), + ) + } + client.on("https://api.example.com/items?page=2") { req -> + countingResponse(req, "b", closes) + } + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + assertEquals(listOf("a", "b"), paginator.collectAllAsync().get(5, TimeUnit.SECONDS)) + assertEquals(2, closes.get(), "both responses must be closed") + } + + @Test + fun `empty terminal page yields no extra items and stops`() { + val client = StubAsyncHttpClient() + client.on("https://api.example.com/items") { req -> + textResponse( + req, + "a,b", + extraHeaders = + mapOf("Link" to "; rel=\"next\""), + ) + } + client.on("https://api.example.com/items?page=2") { req -> + // Empty body, no Link header → end of stream. + textResponse(req, "") + } + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + assertEquals(listOf("a", "b"), paginator.collectAllAsync().get(5, TimeUnit.SECONDS)) + assertEquals(2, client.callCount) + } + + @Test + fun `deep run of synchronously-completed pages stays stack-safe`() { + // A server that advances its cursor every page for many pages. With synchronous + // completion, a naive recursive driver would overflow the stack; the trampoline must + // process these in a loop. + val pageCount = 5_000 + val client = StubAsyncHttpClient() + for (page in 0 until pageCount) { + val url = + if (page == 0) { + "https://api.example.com/items" + } else { + "https://api.example.com/items?page=$page" + } + val isLast = page == pageCount - 1 + client.on(url) { req -> + if (isLast) { + textResponse(req, "p$page") + } else { + textResponse( + req, + "p$page", + extraHeaders = + mapOf( + "Link" to + "; rel=\"next\"", + ), + ) + } + } + } + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + val items = paginator.collectAllAsync().get(30, TimeUnit.SECONDS) + assertEquals(pageCount, items.size) + assertEquals("p0", items.first()) + assertEquals("p${pageCount - 1}", items.last()) + } + + @Test + fun `collectAllAsync wrapper rethrows as CompletionException on join`() { + val client = StubAsyncHttpClient() + client.on("https://api.example.com/items") { error("kaboom") } + val paginator = AsyncPaginator(client, initialRequest(), strategy()) + + // join() (vs get()) wraps in CompletionException — confirms the failure path is wired + // through the standard CompletableFuture contract. + assertFailsWith { + paginator.collectAllAsync().join() + } + } + + @AfterTest + fun teardown() { + // no-op + } +} + +/** + * A [textResponse] whose [Response.close] increments [closeCounter], letting tests assert the + * paginator closes each response exactly once. + */ +private fun countingResponse( + request: Request, + body: String, + closeCounter: AtomicInteger, + extraHeaders: Map = emptyMap(), +): Response { + val delegate = textResponse(request, body, extraHeaders) + val countingBody = + object : org.dexpace.sdk.core.http.response.ResponseBody() { + override fun mediaType(): org.dexpace.sdk.core.http.common.MediaType? = delegate.body?.mediaType() + + override fun contentLength(): Long = delegate.body?.contentLength() ?: -1L + + override fun source(): org.dexpace.sdk.core.io.BufferedSource = delegate.body!!.source() + + override fun close() { + closeCounter.incrementAndGet() + delegate.close() + } + } + return delegate.newBuilder().body(countingBody).build() +} diff --git a/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/StubAsyncHttpClient.kt b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/StubAsyncHttpClient.kt new file mode 100644 index 00000000..977d59ae --- /dev/null +++ b/sdk-core/src/test/kotlin/org/dexpace/sdk/core/pagination/StubAsyncHttpClient.kt @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2026 dexpace and Omar Aljarrah + * + * Licensed under the MIT License. See LICENSE in the project root. + * SPDX-License-Identifier: MIT + */ + +package org.dexpace.sdk.core.pagination + +import org.dexpace.sdk.core.client.AsyncHttpClient +import org.dexpace.sdk.core.http.request.Request +import org.dexpace.sdk.core.http.response.Response +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executor + +/** + * Test support: an async stub driven by canned responses keyed by request URL — the async + * mirror of [StubHttpClient]. + * + * Each registered responder produces a [Response] (or throws). The future returned by + * [executeAsync] is completed either inline (synchronous completion, exercising the + * trampoline's already-done fast path) or via [completionExecutor] when one is supplied + * (genuinely deferred completion, exercising the callback re-arm path). + */ +internal class StubAsyncHttpClient( + private val completionExecutor: Executor? = null, +) : AsyncHttpClient { + private val responders: MutableMap Response> = LinkedHashMap() + private val urls: MutableList = ArrayList() + + fun on( + url: String, + responseBuilder: (Request) -> Response, + ): StubAsyncHttpClient { + responders[url] = responseBuilder + return this + } + + /** All URLs received, in call order. */ + val receivedUrls: List get() = urls.toList() + + /** Number of HTTP calls executed. */ + val callCount: Int get() = urls.size + + override fun executeAsync(request: Request): CompletableFuture { + val url = request.url.toString() + urls.add(url) + val responder = + responders[url] + ?: error("StubAsyncHttpClient: no canned response for URL: $url\nKnown: ${responders.keys}") + val executor = completionExecutor + if (executor == null) { + return try { + CompletableFuture.completedFuture(responder(request)) + } catch (t: Throwable) { + val f = CompletableFuture() + f.completeExceptionally(t) + f + } + } + val future = CompletableFuture() + executor.execute { + try { + future.complete(responder(request)) + } catch (t: Throwable) { + future.completeExceptionally(t) + } + } + return future + } +}