diff --git a/changes/4028.feature.md b/changes/4028.feature.md new file mode 100644 index 0000000000..1ff1882cab --- /dev/null +++ b/changes/4028.feature.md @@ -0,0 +1,7 @@ +Add three composable functions for efficiently reading sparse arrays, where most chunks are empty and resolve to the fill value: + +- `zarr.shards_initialized` discovers which shards (or chunks, for unsharded arrays) of an array have actually been written to the store, via either a single prefix listing or concurrent per-key existence probes (selected by `strategy=`). +- `zarr.initialized_regions` expresses that discovery as array regions, suitable to pass straight to `read_regions`. +- `zarr.read_regions` concurrently reads and decodes a caller-supplied collection of array regions, yielding each `(region, data)` pair spatially resolved to its location in the array. + +Together these let callers skip the per-empty-chunk store round-trips that dominate `arr[:]` on sparse arrays — e.g. `read_regions(arr, initialized_regions(arr))`. Asynchronous versions are available in `zarr.api.asynchronous`; the async `read_regions` streams each region as soon as its data is available. diff --git a/docs/api/zarr/read.md b/docs/api/zarr/read.md new file mode 100644 index 0000000000..87e84a1244 --- /dev/null +++ b/docs/api/zarr/read.md @@ -0,0 +1,7 @@ +--- +title: read +--- + +::: zarr.shards_initialized +::: zarr.initialized_regions +::: zarr.read_regions diff --git a/mkdocs.yml b/mkdocs.yml index 7a4bfa35ef..7d9eeffc5a 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -39,6 +39,7 @@ nav: - api/zarr/dtype.md - api/zarr/load.md - api/zarr/open.md + - api/zarr/read.md - api/zarr/save.md - api/zarr/codecs.md - api/zarr/codecs/numcodecs.md diff --git a/src/zarr/__init__.py b/src/zarr/__init__.py index cdf3840c3b..ec33511ddf 100644 --- a/src/zarr/__init__.py +++ b/src/zarr/__init__.py @@ -19,6 +19,7 @@ full, full_like, group, + initialized_regions, load, ones, ones_like, @@ -27,9 +28,11 @@ open_consolidated, open_group, open_like, + read_regions, save, save_array, save_group, + shards_initialized, tree, zeros, zeros_like, @@ -164,6 +167,7 @@ def set_format(log_format: str) -> None: "full", "full_like", "group", + "initialized_regions", "load", "ones", "ones_like", @@ -173,9 +177,11 @@ def set_format(log_format: str) -> None: "open_group", "open_like", "print_debug_info", + "read_regions", "save", "save_array", "save_group", + "shards_initialized", "tree", "zeros", "zeros_like", diff --git a/src/zarr/api/asynchronous.py b/src/zarr/api/asynchronous.py index 7f185535df..5a069ebd34 100644 --- a/src/zarr/api/asynchronous.py +++ b/src/zarr/api/asynchronous.py @@ -18,6 +18,9 @@ create_array, from_array, get_array_metadata, + initialized_regions, + read_regions, + shards_initialized, ) from zarr.core.array_spec import ArrayConfigLike, parse_array_config from zarr.core.buffer import NDArrayLike @@ -79,6 +82,7 @@ "full", "full_like", "group", + "initialized_regions", "load", "ones", "ones_like", @@ -87,9 +91,11 @@ "open_consolidated", "open_group", "open_like", + "read_regions", "save", "save_array", "save_group", + "shards_initialized", "tree", "zeros", "zeros_like", diff --git a/src/zarr/api/synchronous.py b/src/zarr/api/synchronous.py index 8386427b3f..780129f34c 100644 --- a/src/zarr/api/synchronous.py +++ b/src/zarr/api/synchronous.py @@ -58,6 +58,7 @@ "full", "full_like", "group", + "initialized_regions", "load", "ones", "ones_like", @@ -66,9 +67,11 @@ "open_consolidated", "open_group", "open_like", + "read_regions", "save", "save_array", "save_group", + "shards_initialized", "tree", "zeros", "zeros_like", @@ -1426,3 +1429,132 @@ def zeros_like(a: ArrayLike, **kwargs: Any) -> AnyArray: The new array. """ return Array(sync(async_api.zeros_like(a, **kwargs))) + + +def _as_async_array(array: Array[Any] | AsyncArray[Any]) -> AsyncArray[Any]: + return array._async_array if isinstance(array, Array) else array + + +def shards_initialized( + array: Array[Any] | AsyncArray[Any], + *, + strategy: Literal["auto", "list", "probe"] = "auto", +) -> tuple[str, ...]: + """ + Return the storage keys of the shards that have been persisted to the store. + + This reports storage at the granularity of stored objects: for sharded arrays it + returns shard keys (the objects that actually exist in the store), and for unsharded + arrays it returns chunk keys. To turn these into array regions, use + [initialized_regions][zarr.initialized_regions]. + + Parameters + ---------- + array : Array or AsyncArray + The array to inspect. + strategy : {"auto", "list", "probe"}, default "auto" + How to discover which shards exist. + + - ``"list"`` issues a single ``store.list_prefix`` call and keeps the keys that + belong to this array's shard grid (ignoring metadata and any other objects + under the same prefix). + - ``"probe"`` checks the existence of each possible shard key individually and + concurrently. This avoids listing a prefix that may hold many unrelated + objects, and is faster when the array has few possible shards. + - ``"auto"`` uses ``"probe"`` when the array has at most a small number of + possible shards and ``"list"`` otherwise. + + Returns + ------- + tuple[str, ...] + The storage keys of the populated shards (or chunks, when unsharded), + in chunk-grid order. + + See Also + -------- + initialized_regions : The array regions spanned by the populated shards. + read_regions : Read and decode a collection of array regions. + """ + return sync(async_api.shards_initialized(_as_async_array(array), strategy=strategy)) + + +def initialized_regions( + array: Array[Any] | AsyncArray[Any], + *, + strategy: Literal["auto", "list", "probe"] = "auto", +) -> list[tuple[slice, ...]]: + """ + Return the array regions spanned by the shards that have been persisted to the store. + + This is [shards_initialized][zarr.shards_initialized] expressed as regions: it filters + the array's shard regions down to those whose shard is populated. The result is + suitable to pass directly to [read_regions][zarr.read_regions]. + + Parameters + ---------- + array : Array or AsyncArray + The array to inspect. + strategy : {"auto", "list", "probe"}, default "auto" + How to discover which shards exist. See [shards_initialized][zarr.shards_initialized]. + + Returns + ------- + list[tuple[slice, ...]] + The regions spanned by the populated shards, in chunk-grid order. + + See Also + -------- + shards_initialized : The storage keys of the populated shards. + read_regions : Read and decode a collection of array regions. + """ + return sync(async_api.initialized_regions(_as_async_array(array), strategy=strategy)) + + +def read_regions( + array: Array[Any] | AsyncArray[Any], + regions: Iterable[tuple[slice, ...]], + *, + concurrency: int | None = None, +) -> list[tuple[tuple[slice, ...], NDArrayLikeOrScalar]]: + """ + Read and decode a collection of array regions, returning a list of ``(region, data)`` + pairs. + + Each pair associates a region (a tuple of slices into the array) with the decoded data + for that region. The regions to read are supplied by the caller; pass the result of + [initialized_regions][zarr.initialized_regions] to read only the populated parts of a + sparse array without materializing the full array. For lazy, streaming consumption use + the asynchronous [zarr.api.asynchronous.read_regions][] instead, which yields each pair + as soon as its data is available. + + Parameters + ---------- + array : Array or AsyncArray + The array to read from. + regions : iterable of tuple of slice + The regions to read. Each region is a tuple of slices, one per array dimension. + concurrency : int, optional + The maximum number of regions read concurrently. Defaults to the + ``async.concurrency`` config value. + + Returns + ------- + list[tuple[tuple[slice, ...], NDArrayLikeOrScalar]] + Each region paired with its decoded data, in completion order (not necessarily + the order of ``regions``). + + See Also + -------- + initialized_regions : The array regions spanned by the populated shards. + shards_initialized : The storage keys of the populated shards. + """ + + async def _collect() -> list[tuple[tuple[slice, ...], NDArrayLikeOrScalar]]: + return [ + item + async for item in async_api.read_regions( + _as_async_array(array), regions, concurrency=concurrency + ) + ] + + return sync(_collect()) diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 366c19bb0c..f2fae9b50f 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -2,7 +2,7 @@ import json import warnings -from asyncio import gather +from asyncio import Semaphore, as_completed, ensure_future, gather from collections.abc import Iterable, Mapping, Sequence from dataclasses import dataclass, field, replace from itertools import starmap @@ -146,7 +146,7 @@ from zarr.storage._utils import _relativize_path if TYPE_CHECKING: - from collections.abc import Iterator + from collections.abc import AsyncIterator, Iterator from typing import Self import numpy.typing as npt @@ -3975,15 +3975,182 @@ async def _shards_initialized( [nchunks_initialized][zarr.Array.nchunks_initialized] """ - store_contents = [ - x async for x in array.store_path.store.list_prefix(prefix=array.store_path.path) - ] - store_contents_relative = [ - _relativize_path(path=key, prefix=array.store_path.path) for key in store_contents + # Thin wrapper over the public, strategy-aware entry point. The "list" strategy + # preserves this function's historical behavior (a single prefix listing). + return await shards_initialized(array, strategy="list") + + +# When the array has at most this many possible shards, ``shards_initialized`` +# probes each key individually rather than listing the prefix. Probing avoids +# paying for a prefix listing that may contain many unrelated objects, and is +# cheap when there are few keys to check. +_PROBE_THRESHOLD = 64 + + +async def shards_initialized( + array: AnyArray | AnyAsyncArray, + *, + strategy: Literal["auto", "list", "probe"] = "auto", +) -> tuple[str, ...]: + """ + Return the storage keys of the shards that have been persisted to the store. + + This reports storage at the granularity of stored objects: for sharded arrays it + returns shard keys (the objects that actually exist in the store), and for unsharded + arrays it returns chunk keys. To turn these into array regions, use + [initialized_regions][zarr.initialized_regions]. + + Parameters + ---------- + array : Array or AsyncArray + The array to inspect. + strategy : {"auto", "list", "probe"}, default "auto" + How to discover which shards exist. + + - ``"list"`` issues a single ``store.list_prefix`` call and keeps the keys that + belong to this array's shard grid (ignoring metadata and any other objects + under the same prefix). + - ``"probe"`` checks the existence of each possible shard key individually and + concurrently. This avoids listing a prefix that may hold many unrelated + objects, and is faster when the array has few possible shards. + - ``"auto"`` uses ``"probe"`` when the array has at most a small number of + possible shards and ``"list"`` otherwise. + + Returns + ------- + tuple[str, ...] + The storage keys of the populated shards (or chunks, when unsharded), + in chunk-grid order. + + See Also + -------- + initialized_regions : The array regions spanned by the populated shards. + read_regions : Read and decode a collection of array regions. + """ + if isinstance(array, Array): + array = array._async_array + + keys = list(_iter_shard_keys(array)) + + if strategy == "auto": + strategy = "probe" if len(keys) <= _PROBE_THRESHOLD else "list" + + if strategy == "list": + # A single prefix listing, filtered to keys that belong to this array's shard + # grid. Non-chunk objects under the same prefix (metadata, etc.) are excluded by + # the intersection, handling prefixes that also hold unrelated objects. + contents = { + _relativize_path(path=key, prefix=array.store_path.path) + async for key in array.store_path.store.list_prefix(prefix=array.store_path.path) + } + return tuple(key for key in keys if key in contents) + elif strategy == "probe": + # Per-key existence checks, concurrently. Preferable when the prefix may + # contain many unrelated objects, or when there are few keys to check. + present = await concurrent_map( + [(array.store_path / key,) for key in keys], + lambda store_path: store_path.exists(), + zarr_config.get("async.concurrency"), + ) + return tuple(key for key, is_present in zip(keys, present, strict=True) if is_present) + else: + raise ValueError( + f"Unknown strategy {strategy!r}. Expected one of 'auto', 'list', or 'probe'." + ) + + +async def initialized_regions( + array: AnyArray | AnyAsyncArray, + *, + strategy: Literal["auto", "list", "probe"] = "auto", +) -> list[tuple[slice, ...]]: + """ + Return the array regions spanned by the shards that have been persisted to the store. + + This is [shards_initialized][zarr.shards_initialized] expressed as regions: it filters + the array's shard regions down to those whose shard is populated. The result is + suitable to pass directly to [read_regions][zarr.read_regions]. + + Parameters + ---------- + array : Array or AsyncArray + The array to inspect. + strategy : {"auto", "list", "probe"}, default "auto" + How to discover which shards exist. See [shards_initialized][zarr.shards_initialized]. + + Returns + ------- + list[tuple[slice, ...]] + The regions spanned by the populated shards, in chunk-grid order. + + See Also + -------- + shards_initialized : The storage keys of the populated shards. + read_regions : Read and decode a collection of array regions. + """ + if isinstance(array, Array): + array = array._async_array + initialized = frozenset(await shards_initialized(array, strategy=strategy)) + # "regions that are initialized" is a filter over the array's shard regions, keyed on + # whether the corresponding shard key is populated. + return [ + region + for region, key in zip(_iter_shard_regions(array), _iter_shard_keys(array), strict=True) + if key in initialized ] - return tuple( - chunk_key for chunk_key in array._iter_shard_keys() if chunk_key in store_contents_relative - ) + + +async def read_regions( + array: AnyArray | AnyAsyncArray, + regions: Iterable[tuple[slice, ...]], + *, + concurrency: int | None = None, +) -> AsyncIterator[tuple[tuple[slice, ...], NDArrayLikeOrScalar]]: + """ + Concurrently read and decode a collection of array regions, yielding each + ``(region, data)`` pair as soon as its data is available. + + Each yielded value pairs a region (a tuple of slices into the array) with the decoded + data for that region. The regions to read are supplied by the caller; pass the result + of [initialized_regions][zarr.initialized_regions] to read only the populated parts of + a sparse array without materializing the full array. + + Parameters + ---------- + array : Array or AsyncArray + The array to read from. + regions : iterable of tuple of slice + The regions to read. Each region is a tuple of slices, one per array dimension. + concurrency : int, optional + The maximum number of regions read concurrently. Defaults to the + ``async.concurrency`` config value. + + Yields + ------ + tuple[tuple[slice, ...], NDArrayLikeOrScalar] + A region and its decoded data, in completion order (not necessarily the order + of ``regions``). + + See Also + -------- + initialized_regions : The array regions spanned by the populated shards. + shards_initialized : The storage keys of the populated shards. + """ + if isinstance(array, Array): + array = array._async_array + if concurrency is None: + concurrency = zarr_config.get("async.concurrency") + + semaphore = Semaphore(concurrency) + + async def _read( + region: tuple[slice, ...], + ) -> tuple[tuple[slice, ...], NDArrayLikeOrScalar]: + async with semaphore: + return region, await array.getitem(region) + + for future in as_completed([ensure_future(_read(region)) for region in regions]): + yield await future type FiltersLike = ( diff --git a/tests/benchmarks/test_indexing.py b/tests/benchmarks/test_indexing.py index 385a85b5b5..021c0b1e04 100644 --- a/tests/benchmarks/test_indexing.py +++ b/tests/benchmarks/test_indexing.py @@ -11,7 +11,7 @@ import pytest -from zarr import create_array +from zarr import create_array, initialized_regions, read_regions indexers = ( (0,) * 3, @@ -277,3 +277,48 @@ def write_with_cache_clear() -> None: data[indexer] = write_data benchmark(write_with_cache_clear) + + +# Sparse-read benchmark: most chunks empty (resolve to the fill value). +sparse_shards = ( + None, + (64,), +) + + +@pytest.mark.parametrize("store", ["memory", "memory_get_latency"], indirect=["store"]) +@pytest.mark.parametrize("shards", sparse_shards, ids=str) +@pytest.mark.parametrize("reader", ["full", "read_regions"], ids=str) +def test_sparse_read( + store: Store, + shards: tuple[int, ...] | None, + reader: str, + benchmark: BenchmarkFixture, +) -> None: + """Benchmark reading a sparse array (most chunks empty) two ways. + + ``full`` is the stock ``arr[:]`` read, which issues a store request for every chunk + including the empty ones; ``read_regions`` discovers the populated regions and reads + only those. The gap is largest on ``memory_get_latency``, where each skipped + empty-chunk request avoids a round-trip. + """ + n_chunks = 256 + chunk = 16 + data = create_array( + store=store, + shape=(n_chunks * chunk,), + dtype="uint8", + chunks=(chunk,), + shards=shards, + compressors=None, + filters=None, + fill_value=0, + ) + # populate ~3% of chunks, spread across the array + for ci in range(0, n_chunks, 32): + data[ci * chunk : ci * chunk + chunk] = 1 + + if reader == "full": + benchmark(getitem, data, slice(None)) + else: + benchmark(lambda: read_regions(data, initialized_regions(data))) diff --git a/tests/test_array.py b/tests/test_array.py index 0d6d2d5906..b3c7046572 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -2374,3 +2374,199 @@ async def test_create_array_chunks_3d( shape = (10, 12, 15) arr = await create_array(store={}, shape=shape, chunks=chunk_input, dtype="float64") assert arr.write_chunk_sizes == expected + + +# --- shards_initialized / initialized_regions / read_regions --------------------------- + + +def _ca_sparse_1d(store: Store) -> tuple[Array[Any], npt.NDArray[Any]]: + arr = zarr.create_array(store=store, shape=(64,), chunks=(8,), dtype="int32", fill_value=42) + # populate two non-adjacent chunks (chunks 1 and 5) + arr[8:16] = np.arange(8, dtype="int32") + arr[40:48] = np.arange(100, 108, dtype="int32") + return arr, np.asarray(arr[:]) + + +def _ca_dense_1d(store: Store) -> tuple[Array[Any], npt.NDArray[Any]]: + arr = zarr.create_array(store=store, shape=(32,), chunks=(8,), dtype="int32", fill_value=0) + arr[:] = np.arange(32, dtype="int32") + return arr, np.asarray(arr[:]) + + +def _ca_sparse_2d(store: Store) -> tuple[Array[Any], npt.NDArray[Any]]: + arr = zarr.create_array(store=store, shape=(8, 8), chunks=(2, 2), dtype="int32", fill_value=-1) + arr[0:2, 0:2] = np.ones((2, 2), dtype="int32") + arr[4:6, 4:6] = np.full((2, 2), 7, dtype="int32") + return arr, np.asarray(arr[:]) + + +def _ca_sharded_sparse(store: Store) -> tuple[Array[Any], npt.NDArray[Any]]: + # chunks (2, 2) within shards (4, 4): the shard grid is 2x2 over the 8x8 array. + arr = zarr.create_array( + store=store, shape=(8, 8), chunks=(2, 2), shards=(4, 4), dtype="int32", fill_value=42 + ) + arr[0:2, 0:2] = np.ones((2, 2), dtype="int32") # shard (0, 0) + arr[4:6, 4:6] = np.full((2, 2), 7, dtype="int32") # shard (1, 1) + return arr, np.asarray(arr[:]) + + +def _ca_all_empty(store: Store) -> tuple[Array[Any], npt.NDArray[Any]]: + arr = zarr.create_array(store=store, shape=(32,), chunks=(8,), dtype="int32", fill_value=7) + return arr, np.asarray(arr[:]) + + +def _ca_all_populated(store: Store) -> tuple[Array[Any], npt.NDArray[Any]]: + arr = zarr.create_array(store=store, shape=(32,), chunks=(8,), dtype="int32", fill_value=0) + arr[:] = np.arange(32, dtype="int32") + return arr, np.asarray(arr[:]) + + +_CA_SETUPS = { + "sparse_1d": _ca_sparse_1d, + "dense_1d": _ca_dense_1d, + "sparse_2d": _ca_sparse_2d, + "sharded_sparse": _ca_sharded_sparse, + "all_empty": _ca_all_empty, + "all_populated": _ca_all_populated, +} +_CA_STRATEGIES = ["auto", "list", "probe"] + + +def _ca_pack( + arr: Array[Any], + results: list[tuple[tuple[slice, ...], Any]], + baseline: npt.NDArray[Any], +) -> npt.NDArray[Any]: + """Scatter ``(region, data)`` pairs onto a fill-valued array.""" + out = np.full(baseline.shape, arr.fill_value, dtype=baseline.dtype) + for region, data in results: + out[region] = np.asarray(data) + return out + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +@pytest.mark.parametrize("setup_name", list(_CA_SETUPS)) +@pytest.mark.parametrize("strategy", _CA_STRATEGIES) +def test_shards_initialized_strategies_agree( + store: Store, setup_name: str, strategy: Literal["auto", "list", "probe"] +) -> None: + """Every strategy reports the same set of populated keys.""" + arr, _ = _CA_SETUPS[setup_name](store) + keys = set(zarr.shards_initialized(arr, strategy=strategy)) + assert keys == set(zarr.shards_initialized(arr, strategy="auto")) + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +@pytest.mark.parametrize( + ("setup_name", "expected_count"), + [ + ("sparse_1d", 2), + ("dense_1d", 4), + ("sparse_2d", 2), + ("sharded_sparse", 2), + ("all_empty", 0), + ("all_populated", 4), + ], +) +def test_shards_initialized_counts(store: Store, setup_name: str, expected_count: int) -> None: + arr, _ = _CA_SETUPS[setup_name](store) + assert len(zarr.shards_initialized(arr)) == expected_count + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +def test_shards_initialized_unknown_strategy(store: Store) -> None: + arr, _ = _ca_sparse_1d(store) + with pytest.raises(ValueError, match="Unknown strategy"): + zarr.shards_initialized(arr, strategy="nonsense") # type: ignore[arg-type] + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +def test_list_strategy_ignores_non_chunk_objects(store: Store) -> None: + """The ``list`` strategy must not mistake unrelated objects sharing the array's + prefix (e.g. metadata) for populated chunks.""" + arr, _ = _ca_sparse_1d(store) + keys = set(zarr.shards_initialized(arr, strategy="list")) + assert keys == {"c/1", "c/5"} + assert all(k.startswith("c/") for k in keys) + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +@pytest.mark.parametrize("setup_name", list(_CA_SETUPS)) +def test_initialized_regions_count_matches_keys(store: Store, setup_name: str) -> None: + """There is one initialized region per populated shard key.""" + arr, _ = _CA_SETUPS[setup_name](store) + assert len(zarr.initialized_regions(arr)) == len(zarr.shards_initialized(arr)) + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +def test_initialized_regions_values(store: Store) -> None: + arr, _ = _ca_sparse_1d(store) + assert set(zarr.initialized_regions(arr)) == {(slice(8, 16, 1),), (slice(40, 48, 1),)} + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +@pytest.mark.parametrize("setup_name", list(_CA_SETUPS)) +def test_read_initialized_regions_reconstructs_baseline(store: Store, setup_name: str) -> None: + """Reading the initialized regions and scattering them onto a fill-valued array + reproduces the full ``arr[:]`` read exactly.""" + arr, baseline = _CA_SETUPS[setup_name](store) + results = zarr.read_regions(arr, zarr.initialized_regions(arr)) + result = _ca_pack(arr, results, baseline) + assert np.array_equal(result, baseline) + assert result.dtype == baseline.dtype + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +def test_read_regions_explicit_regions(store: Store) -> None: + """Explicit regions are read and returned with their decoded data.""" + arr, baseline = _ca_sparse_1d(store) + explicit = [(slice(8, 16),), (slice(40, 48),)] + regions = dict(zarr.read_regions(arr, explicit)) + assert set(regions) == set(explicit) + assert np.array_equal(np.asarray(regions[(slice(8, 16),)]), baseline[8:16]) + assert np.array_equal(np.asarray(regions[(slice(40, 48),)]), baseline[40:48]) + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +def test_read_regions_concurrency_one(store: Store) -> None: + """A concurrency limit of 1 produces the same result as the default.""" + arr, baseline = _ca_sparse_2d(store) + results = zarr.read_regions(arr, zarr.initialized_regions(arr), concurrency=1) + assert np.array_equal(_ca_pack(arr, results, baseline), baseline) + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +@pytest.mark.parametrize("setup_name", list(_CA_SETUPS)) +async def test_read_regions_async_matches_sync(store: Store, setup_name: str) -> None: + """The async streaming generator yields the same ``(region, data)`` set as the + synchronous wrapper.""" + arr, _ = _CA_SETUPS[setup_name](store) + regions = zarr.initialized_regions(arr) + async_pairs = { + region: np.asarray(data).tobytes() + async for region, data in zarr.api.asynchronous.read_regions(arr._async_array, regions) + } + sync_pairs = { + region: np.asarray(data).tobytes() for region, data in zarr.read_regions(arr, regions) + } + assert async_pairs == sync_pairs + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +async def test_shards_initialized_async(store: Store) -> None: + arr, _ = _ca_sparse_1d(store) + keys = await zarr.api.asynchronous.shards_initialized(arr._async_array) + assert set(keys) == {"c/1", "c/5"} + + +@pytest.mark.parametrize("store", ["memory"], indirect=["store"]) +async def test_async_chunk_access_accepts_sync_array(store: Store) -> None: + """The async API also accepts a synchronous ``Array``, unwrapping it to the + underlying async array.""" + arr, _ = _ca_sparse_1d(store) + keys = await zarr.api.asynchronous.shards_initialized(arr) + regions = await zarr.api.asynchronous.initialized_regions(arr) + results = [pair async for pair in zarr.api.asynchronous.read_regions(arr, regions)] + assert set(keys) == {"c/1", "c/5"} + assert set(regions) == {(slice(8, 16, 1),), (slice(40, 48, 1),)} + assert len(results) == 2