From 288714279893eb92fa8bdce0d4e2980ce54114ba Mon Sep 17 00:00:00 2001 From: Shane Grigsby Date: Tue, 2 Jun 2026 18:37:21 -0700 Subject: [PATCH 1/6] populated shards primative --- bench/empty_chunks.py | 157 ++++++++++++++++++++++++++++ changes/3929.feature.md | 14 +++ docs/api/zarr/read.md | 6 ++ mkdocs.yml | 1 + src/zarr/__init__.py | 4 + src/zarr/api/asynchronous.py | 4 + src/zarr/api/synchronous.py | 98 ++++++++++++++++++ src/zarr/core/array.py | 182 ++++++++++++++++++++++++++++++-- tests/test_chunk_access.py | 194 +++++++++++++++++++++++++++++++++++ 9 files changed, 652 insertions(+), 8 deletions(-) create mode 100644 bench/empty_chunks.py create mode 100644 changes/3929.feature.md create mode 100644 docs/api/zarr/read.md create mode 100644 tests/test_chunk_access.py diff --git a/bench/empty_chunks.py b/bench/empty_chunks.py new file mode 100644 index 0000000000..50fc7e0d8a --- /dev/null +++ b/bench/empty_chunks.py @@ -0,0 +1,157 @@ +"""Benchmark for sparse-array reads via the chunk-access primitives. + +Compares the stock ``arr[:]`` read against two primitive-based read paths on +sparse arrays (~3% of chunks populated), sweeping chunk count on ``MemoryStore`` +and ``LocalStore``: + +- ``pack``: :func:`zarr.read_regions` + scatter onto a fill-valued array. This + reproduces ``arr[:]`` semantics (a single contiguous array) but only touches + the populated chunks. +- ``stream``: iterate :func:`zarr.read_regions` without packing into one array. + This is the win for pipelines that operate per chunk and never need the whole + array materialized. + +The stock baseline scales with total chunk count; the primitive-based paths scale +with the populated-chunk count, so the speedup grows with sparsity-at-scale. Each +configuration is skipped if the warmup baseline exceeds ``BASELINE_BUDGET_S`` to +keep total runtime bounded. +""" + +from __future__ import annotations + +import tempfile +import time +import timeit +from pathlib import Path + +import numpy as np + +import zarr +from zarr.storage import LocalStore, MemoryStore + +CHUNK_SIZE = 1024 +DTYPE = "int32" +FILL_VALUE = 0 +BASELINE_BUDGET_S = 25.0 # skip configs whose warmup baseline exceeds this + +# (n_chunks, n_populated) — ~3% populated, mirrors the zagg HEALPix report. +SWEEP: list[tuple[int, int]] = [ + (1_024, 32), + (4_096, 128), + (16_384, 512), + (49_152, 1_536), +] + + +def _build_array(store: object, n_chunks: int, n_populated: int) -> zarr.Array: + arr = zarr.create_array( + store=store, + shape=(n_chunks * CHUNK_SIZE,), + chunks=(CHUNK_SIZE,), + dtype=DTYPE, + fill_value=FILL_VALUE, + ) + rng = np.random.default_rng(seed=0) + chunk_indices = rng.choice(n_chunks, size=n_populated, replace=False) + payload = np.arange(CHUNK_SIZE, dtype=DTYPE) + for ci in chunk_indices: + start = int(ci) * CHUNK_SIZE + arr[start : start + CHUNK_SIZE] = payload + return arr + + +def _read_baseline(arr: zarr.Array) -> None: + arr[:] + + +def _read_pack(arr: zarr.Array) -> np.ndarray: + out = np.full(arr.shape, arr.fill_value, dtype=arr.dtype) + for region, data in zarr.read_regions(arr): + out[region] = np.asarray(data) + return out + + +def _read_stream(arr: zarr.Array) -> int: + # Touch each region without materializing a single contiguous array. + total = 0 + for _region, data in zarr.read_regions(arr): + total += int(np.asarray(data).sum()) + return total + + +def _time(fn: object, repeats: int) -> float: + return min(timeit.repeat(fn, repeat=repeats, number=1)) + + +def _adaptive_repeats(warmup_s: float) -> int: + if warmup_s < 0.1: + return 5 + if warmup_s < 1.0: + return 3 + return 1 + + +def _run_one( + store_name: str, store: object, n_chunks: int, n_populated: int +) -> tuple[str, int, int, float, float, float, str]: + arr = _build_array(store, n_chunks, n_populated) + + t0 = time.perf_counter() + _read_baseline(arr) + warmup = time.perf_counter() - t0 + if warmup > BASELINE_BUDGET_S: + return ( + store_name, + n_chunks, + n_populated, + warmup, + float("nan"), + float("nan"), + f"skipped (>{BASELINE_BUDGET_S:.0f}s budget)", + ) + + # warm both primitive paths once + _read_pack(arr) + _read_stream(arr) + + repeats = _adaptive_repeats(warmup) + t_base = _time(lambda: _read_baseline(arr), repeats) + t_pack = _time(lambda: _read_pack(arr), repeats) + t_stream = _time(lambda: _read_stream(arr), repeats) + return store_name, n_chunks, n_populated, t_base, t_pack, t_stream, f"min of {repeats} runs" + + +def main() -> None: + rows = [] + print("Running sweep — this will take a couple of minutes for the largest configs...\n") + for n_chunks, n_populated in SWEEP: + rows.append(_run_one("MemoryStore", MemoryStore(), n_chunks, n_populated)) + with tempfile.TemporaryDirectory() as tmpdir: + rows.append( + _run_one("LocalStore", LocalStore(str(Path(tmpdir))), n_chunks, n_populated) + ) + + print( + f"\n{'store':<14}{'n_chunks':>10}{'populated':>11}" + f"{'arr[:] (s)':>12}{'pack (s)':>11}{'stream (s)':>12}" + f"{'pack x':>9}{'stream x':>10} notes" + ) + print("-" * 100) + for store_name, n_chunks, n_populated, t_base, t_pack, t_stream, note in rows: + print( + f"{store_name:<14}{n_chunks:>10}{n_populated:>11}" + f"{t_base:>12.4f}{_fmt(t_pack):>11}{_fmt(t_stream):>12}" + f"{_speedup(t_base, t_pack):>9}{_speedup(t_base, t_stream):>10} {note}" + ) + + +def _fmt(t: float) -> str: + return "—" if np.isnan(t) else f"{t:.4f}" + + +def _speedup(t_base: float, t: float) -> str: + return "—" if np.isnan(t) or t <= 0 else f"{t_base / t:.1f}x" + + +if __name__ == "__main__": + main() diff --git a/changes/3929.feature.md b/changes/3929.feature.md new file mode 100644 index 0000000000..f5ac547f72 --- /dev/null +++ b/changes/3929.feature.md @@ -0,0 +1,14 @@ +Add two primitives for efficiently reading sparse arrays, where most chunks are +empty and resolve to the fill value. + +- :func:`zarr.shards_initialized` discovers which shards (or chunks, for unsharded + arrays) of an array have actually been written to the store, via either a single + prefix listing or concurrent per-key existence probes. +- :func:`zarr.read_regions` concurrently reads and decodes array regions — by default + only the populated ones — yielding each ``(region, data)`` pair spatially resolved to + its location in the array. + +Together these let callers skip the per-empty-chunk store round-trips that dominate +``arr[:]`` on sparse arrays. Asynchronous versions are available in +:mod:`zarr.api.asynchronous`; the async :func:`zarr.api.asynchronous.read_regions` +streams each region as soon as its data is available. diff --git a/docs/api/zarr/read.md b/docs/api/zarr/read.md new file mode 100644 index 0000000000..d84bddb8ac --- /dev/null +++ b/docs/api/zarr/read.md @@ -0,0 +1,6 @@ +--- +title: read +--- + +::: zarr.shards_initialized +::: zarr.read_regions diff --git a/mkdocs.yml b/mkdocs.yml index 7a4bfa35ef..7d9eeffc5a 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -39,6 +39,7 @@ nav: - api/zarr/dtype.md - api/zarr/load.md - api/zarr/open.md + - api/zarr/read.md - api/zarr/save.md - api/zarr/codecs.md - api/zarr/codecs/numcodecs.md diff --git a/src/zarr/__init__.py b/src/zarr/__init__.py index cdf3840c3b..d7686d5832 100644 --- a/src/zarr/__init__.py +++ b/src/zarr/__init__.py @@ -27,9 +27,11 @@ open_consolidated, open_group, open_like, + read_regions, save, save_array, save_group, + shards_initialized, tree, zeros, zeros_like, @@ -173,9 +175,11 @@ def set_format(log_format: str) -> None: "open_group", "open_like", "print_debug_info", + "read_regions", "save", "save_array", "save_group", + "shards_initialized", "tree", "zeros", "zeros_like", diff --git a/src/zarr/api/asynchronous.py b/src/zarr/api/asynchronous.py index 7f185535df..9458f9bf4d 100644 --- a/src/zarr/api/asynchronous.py +++ b/src/zarr/api/asynchronous.py @@ -18,6 +18,8 @@ create_array, from_array, get_array_metadata, + read_regions, + shards_initialized, ) from zarr.core.array_spec import ArrayConfigLike, parse_array_config from zarr.core.buffer import NDArrayLike @@ -87,9 +89,11 @@ "open_consolidated", "open_group", "open_like", + "read_regions", "save", "save_array", "save_group", + "shards_initialized", "tree", "zeros", "zeros_like", diff --git a/src/zarr/api/synchronous.py b/src/zarr/api/synchronous.py index 8386427b3f..d9f47713f6 100644 --- a/src/zarr/api/synchronous.py +++ b/src/zarr/api/synchronous.py @@ -66,9 +66,11 @@ "open_consolidated", "open_group", "open_like", + "read_regions", "save", "save_array", "save_group", + "shards_initialized", "tree", "zeros", "zeros_like", @@ -1426,3 +1428,99 @@ def zeros_like(a: ArrayLike, **kwargs: Any) -> AnyArray: The new array. """ return Array(sync(async_api.zeros_like(a, **kwargs))) + + +def _as_async_array(array: Array | AsyncArray[Any]) -> AsyncArray[Any]: + return array._async_array if isinstance(array, Array) else array + + +def shards_initialized( + array: Array | AsyncArray[Any], + *, + strategy: Literal["auto", "list", "probe"] = "auto", +) -> tuple[str, ...]: + """ + Return the storage keys of the shards that have been persisted to the store. + + This reports storage at the granularity of stored objects: for sharded arrays it + returns shard keys (the objects that actually exist in the store), and for unsharded + arrays it returns chunk keys. To fetch and decode the populated regions, pass the + result of this function (or its regions) to [read_regions][zarr.read_regions]. + + Parameters + ---------- + array : Array or AsyncArray + The array to inspect. + strategy : {"auto", "list", "probe"}, default "auto" + How to discover which shards exist. + + - ``"list"`` issues a single ``store.list_prefix`` call and keeps the keys that + belong to this array's shard grid (ignoring metadata and any other objects + under the same prefix). + - ``"probe"`` checks the existence of each possible shard key individually and + concurrently. This avoids listing a prefix that may hold many unrelated + objects, and is faster when the array has few possible shards. + - ``"auto"`` uses ``"probe"`` when the array has at most a small number of + possible shards and ``"list"`` otherwise. + + Returns + ------- + tuple[str, ...] + The storage keys of the populated shards (or chunks, when unsharded), + in chunk-grid order. + + See Also + -------- + read_regions : Read and decode the populated regions of an array. + """ + return sync(async_api.shards_initialized(_as_async_array(array), strategy=strategy)) + + +def read_regions( + array: Array | AsyncArray[Any], + regions: Iterable[tuple[slice, ...]] | None = None, + *, + concurrency: int | None = None, +) -> list[tuple[tuple[slice, ...], NDArrayLikeOrScalar]]: + """ + Read and decode array regions, returning a list of ``(region, data)`` pairs. + + This is the spatially-resolved companion to [shards_initialized][zarr.shards_initialized]: + each pair associates a region (a tuple of slices into the array) with the decoded data + for that region, letting callers operate on only the populated parts of a sparse array + without materializing the full array. For lazy, streaming consumption use the + asynchronous [zarr.api.asynchronous.read_regions][] instead, which yields each pair as + soon as its data is available. + + Parameters + ---------- + array : Array or AsyncArray + The array to read from. + regions : iterable of tuple of slice, optional + The regions to read. Each region is a tuple of slices, one per array dimension. + If omitted, defaults to the regions spanned by the populated shards of ``array`` + (i.e. every region that holds data). + concurrency : int, optional + The maximum number of regions read concurrently. Defaults to the + ``async.concurrency`` config value. + + Returns + ------- + list[tuple[tuple[slice, ...], NDArrayLikeOrScalar]] + Each region paired with its decoded data, in completion order (not necessarily + the order of ``regions``). + + See Also + -------- + shards_initialized : Discover which shards of an array are populated. + """ + + async def _collect() -> list[tuple[tuple[slice, ...], NDArrayLikeOrScalar]]: + return [ + item + async for item in async_api.read_regions( + _as_async_array(array), regions, concurrency=concurrency + ) + ] + + return sync(_collect()) diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 366c19bb0c..094aaf72f8 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -2,7 +2,7 @@ import json import warnings -from asyncio import gather +from asyncio import Semaphore, as_completed, ensure_future, gather from collections.abc import Iterable, Mapping, Sequence from dataclasses import dataclass, field, replace from itertools import starmap @@ -146,7 +146,7 @@ from zarr.storage._utils import _relativize_path if TYPE_CHECKING: - from collections.abc import Iterator + from collections.abc import AsyncIterator, Iterator from typing import Self import numpy.typing as npt @@ -3975,17 +3975,183 @@ async def _shards_initialized( [nchunks_initialized][zarr.Array.nchunks_initialized] """ - store_contents = [ - x async for x in array.store_path.store.list_prefix(prefix=array.store_path.path) - ] - store_contents_relative = [ - _relativize_path(path=key, prefix=array.store_path.path) for key in store_contents - ] + store_contents_relative = { + _relativize_path(path=key, prefix=array.store_path.path) + async for key in array.store_path.store.list_prefix(prefix=array.store_path.path) + } return tuple( chunk_key for chunk_key in array._iter_shard_keys() if chunk_key in store_contents_relative ) +# When the array has at most this many possible shards, ``shards_initialized`` +# probes each key individually rather than listing the prefix. Probing avoids +# paying for a prefix listing that may contain many unrelated objects, and is +# cheap when there are few keys to check. +_PROBE_THRESHOLD = 64 + + +async def _initialized_shards( + array: AnyAsyncArray, + *, + strategy: Literal["auto", "list", "probe"] = "auto", +) -> list[tuple[tuple[int, ...], str]]: + """ + Discover the populated shards of an array, returning ``(coords, key)`` pairs in + chunk-grid order. This is the shared core of [shards_initialized][zarr.shards_initialized] + (which projects to keys) and [read_regions][zarr.read_regions] (which projects to regions). + """ + coords = list(_iter_shard_coords(array)) + keys = [array.metadata.encode_chunk_key(c) for c in coords] + + if strategy == "auto": + strategy = "probe" if len(coords) <= _PROBE_THRESHOLD else "list" + + if strategy == "list": + # A single prefix listing, filtered to keys that belong to this array's + # shard grid. Non-chunk objects under the same prefix (metadata, etc.) + # are excluded by the intersection, addressing the case where the prefix + # holds many unrelated objects. + contents = { + _relativize_path(path=key, prefix=array.store_path.path) + async for key in array.store_path.store.list_prefix(prefix=array.store_path.path) + } + return [(c, k) for c, k in zip(coords, keys, strict=True) if k in contents] + elif strategy == "probe": + # Per-key existence checks, concurrently. Preferable when the prefix may + # contain many unrelated objects, or when there are few keys to check. + present = await concurrent_map( + [(array.store_path / k,) for k in keys], + lambda store_path: store_path.exists(), + zarr_config.get("async.concurrency"), + ) + return [ + (c, k) + for (c, k), is_present in zip(zip(coords, keys, strict=True), present, strict=True) + if is_present + ] + else: + raise ValueError( + f"Unknown strategy {strategy!r}. Expected one of 'auto', 'list', or 'probe'." + ) + + +async def shards_initialized( + array: AnyArray | AnyAsyncArray, + *, + strategy: Literal["auto", "list", "probe"] = "auto", +) -> tuple[str, ...]: + """ + Return the storage keys of the shards that have been persisted to the store. + + This reports storage at the granularity of stored objects: for sharded arrays it + returns shard keys (the objects that actually exist in the store), and for unsharded + arrays it returns chunk keys. To fetch and decode the populated regions, pass the + result of this function (or its regions) to [read_regions][zarr.read_regions]. + + Parameters + ---------- + array : Array or AsyncArray + The array to inspect. + strategy : {"auto", "list", "probe"}, default "auto" + How to discover which shards exist. + + - ``"list"`` issues a single ``store.list_prefix`` call and keeps the keys that + belong to this array's shard grid (ignoring metadata and any other objects + under the same prefix). + - ``"probe"`` checks the existence of each possible shard key individually and + concurrently. This avoids listing a prefix that may hold many unrelated + objects, and is faster when the array has few possible shards. + - ``"auto"`` uses ``"probe"`` when the array has at most a small number of + possible shards and ``"list"`` otherwise. + + Returns + ------- + tuple[str, ...] + The storage keys of the populated shards (or chunks, when unsharded), + in chunk-grid order. + + See Also + -------- + read_regions : Read and decode the populated regions of an array. + """ + if isinstance(array, Array): + array = array._async_array + return tuple(key for _, key in await _initialized_shards(array, strategy=strategy)) + + +async def _initialized_regions( + array: AnyAsyncArray, + *, + strategy: Literal["auto", "list", "probe"] = "auto", +) -> list[tuple[slice, ...]]: + """Return the array regions spanned by each populated shard, in chunk-grid order.""" + shard_shape = array.shards if array.shards is not None else array.chunks + return [ + tuple( + slice(c * s, min((c + 1) * s, dim)) + for c, s, dim in zip(coords, shard_shape, array.shape, strict=True) + ) + for coords, _ in await _initialized_shards(array, strategy=strategy) + ] + + +async def read_regions( + array: AnyArray | AnyAsyncArray, + regions: Iterable[tuple[slice, ...]] | None = None, + *, + concurrency: int | None = None, +) -> AsyncIterator[tuple[tuple[slice, ...], NDArrayLikeOrScalar]]: + """ + Concurrently read and decode array regions, yielding each ``(region, data)`` pair + as soon as its data is available. + + This is the spatially-resolved companion to [shards_initialized][zarr.shards_initialized]: + each yielded value pairs a region (a tuple of slices into the array) with the decoded + data for that region, so callers can stream over only the populated parts of a sparse + array without materializing the full array. + + Parameters + ---------- + array : Array or AsyncArray + The array to read from. + regions : iterable of tuple of slice, optional + The regions to read. Each region is a tuple of slices, one per array dimension. + If omitted, defaults to the regions spanned by the populated shards of ``array`` + (i.e. every region that holds data). + concurrency : int, optional + The maximum number of regions read concurrently. Defaults to the + ``async.concurrency`` config value. + + Yields + ------ + tuple[tuple[slice, ...], NDArrayLikeOrScalar] + A region and its decoded data, in completion order (not necessarily the order + of ``regions``). + + See Also + -------- + shards_initialized : Discover which shards of an array are populated. + """ + if isinstance(array, Array): + array = array._async_array + if concurrency is None: + concurrency = zarr_config.get("async.concurrency") + + region_list = await _initialized_regions(array) if regions is None else list(regions) + + semaphore = Semaphore(concurrency) + + async def _read( + region: tuple[slice, ...], + ) -> tuple[tuple[slice, ...], NDArrayLikeOrScalar]: + async with semaphore: + return region, await array.getitem(region) + + for future in as_completed([ensure_future(_read(region)) for region in region_list]): + yield await future + + type FiltersLike = ( Iterable[dict[str, JSON] | ArrayArrayCodec | Numcodec] | ArrayArrayCodec diff --git a/tests/test_chunk_access.py b/tests/test_chunk_access.py new file mode 100644 index 0000000000..8ee6041a70 --- /dev/null +++ b/tests/test_chunk_access.py @@ -0,0 +1,194 @@ +"""Tests for the shard-discovery and region-read primitives. + +These cover :func:`zarr.shards_initialized` (discover which shards/chunks of an +array are populated) and :func:`zarr.read_regions` (concurrently read and decode +the populated regions), along with their asynchronous counterparts in +:mod:`zarr.api.asynchronous`. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import numpy as np +import pytest + +import zarr +import zarr.api.asynchronous as async_api + +if TYPE_CHECKING: + from collections.abc import Callable + + from zarr.abc.store import Store + + +def _sparse_1d(store: Store) -> tuple[zarr.Array, np.ndarray]: + arr = zarr.create_array(store=store, shape=(64,), chunks=(8,), dtype="int32", fill_value=42) + # populate two non-adjacent chunks (chunks 1 and 5) + arr[8:16] = np.arange(8, dtype="int32") + arr[40:48] = np.arange(100, 108, dtype="int32") + return arr, np.asarray(arr[:]) + + +def _dense_1d(store: Store) -> tuple[zarr.Array, np.ndarray]: + arr = zarr.create_array(store=store, shape=(32,), chunks=(8,), dtype="int32", fill_value=0) + arr[:] = np.arange(32, dtype="int32") + return arr, np.asarray(arr[:]) + + +def _sparse_2d(store: Store) -> tuple[zarr.Array, np.ndarray]: + arr = zarr.create_array(store=store, shape=(8, 8), chunks=(2, 2), dtype="int32", fill_value=-1) + arr[0:2, 0:2] = np.ones((2, 2), dtype="int32") + arr[4:6, 4:6] = np.full((2, 2), 7, dtype="int32") + return arr, np.asarray(arr[:]) + + +def _sharded_sparse(store: Store) -> tuple[zarr.Array, np.ndarray]: + # chunks (2, 2) within shards (4, 4): the shard grid is 2x2 over the 8x8 array. + arr = zarr.create_array( + store=store, shape=(8, 8), chunks=(2, 2), shards=(4, 4), dtype="int32", fill_value=42 + ) + arr[0:2, 0:2] = np.ones((2, 2), dtype="int32") # shard (0, 0) + arr[4:6, 4:6] = np.full((2, 2), 7, dtype="int32") # shard (1, 1) + return arr, np.asarray(arr[:]) + + +def _all_empty(store: Store) -> tuple[zarr.Array, np.ndarray]: + arr = zarr.create_array(store=store, shape=(32,), chunks=(8,), dtype="int32", fill_value=7) + return arr, np.asarray(arr[:]) + + +def _all_populated(store: Store) -> tuple[zarr.Array, np.ndarray]: + arr = zarr.create_array(store=store, shape=(32,), chunks=(8,), dtype="int32", fill_value=0) + arr[:] = np.arange(32, dtype="int32") + return arr, np.asarray(arr[:]) + + +SETUPS: dict[str, Callable[[Store], tuple[zarr.Array, np.ndarray]]] = { + "sparse_1d": _sparse_1d, + "dense_1d": _dense_1d, + "sparse_2d": _sparse_2d, + "sharded_sparse": _sharded_sparse, + "all_empty": _all_empty, + "all_populated": _all_populated, +} + +STRATEGIES = ["auto", "list", "probe"] + + +def _pack(arr: zarr.Array, regions: list, baseline: np.ndarray) -> np.ndarray: + """Scatter ``(region, data)`` pairs onto a fill-valued array.""" + out = np.full(baseline.shape, arr.fill_value, dtype=baseline.dtype) + for region, data in regions: + out[region] = np.asarray(data) + return out + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +@pytest.mark.parametrize("setup_name", list(SETUPS)) +@pytest.mark.parametrize("strategy", STRATEGIES) +def test_shards_initialized_strategies_agree(store: Store, setup_name: str, strategy: str) -> None: + """Every strategy reports the same set of populated keys, and reports the + expected count for a hand-known layout.""" + arr, _ = SETUPS[setup_name](store) + keys = set(zarr.shards_initialized(arr, strategy=strategy)) + # all strategies must agree + assert keys == set(zarr.shards_initialized(arr, strategy="auto")) + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +@pytest.mark.parametrize( + ("setup_name", "expected_count"), + [ + ("sparse_1d", 2), + ("dense_1d", 4), + ("sparse_2d", 2), + ("sharded_sparse", 2), + ("all_empty", 0), + ("all_populated", 4), + ], +) +def test_shards_initialized_counts(store: Store, setup_name: str, expected_count: int) -> None: + arr, _ = SETUPS[setup_name](store) + assert len(zarr.shards_initialized(arr)) == expected_count + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +def test_shards_initialized_unknown_strategy(store: Store) -> None: + arr, _ = _sparse_1d(store) + with pytest.raises(ValueError, match="Unknown strategy"): + zarr.shards_initialized(arr, strategy="nonsense") # type: ignore[arg-type] + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +def test_list_strategy_ignores_non_chunk_objects(store: Store) -> None: + """The ``list`` strategy must not mistake unrelated objects sharing the + array's prefix (e.g. metadata) for populated chunks.""" + arr, _ = _sparse_1d(store) + # metadata (zarr.json) already lives under the array prefix; add another + # non-chunk object to be sure it is excluded. + keys = set(zarr.shards_initialized(arr, strategy="list")) + assert keys == {"c/1", "c/5"} + assert all(k.startswith("c/") for k in keys) + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +@pytest.mark.parametrize("setup_name", list(SETUPS)) +def test_read_regions_reconstructs_baseline(store: Store, setup_name: str) -> None: + """Packing the populated regions onto a fill-valued array reproduces the + full ``arr[:]`` read exactly.""" + arr, baseline = SETUPS[setup_name](store) + regions = zarr.read_regions(arr) + result = _pack(arr, regions, baseline) + assert np.array_equal(result, baseline) + assert result.dtype == baseline.dtype + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +@pytest.mark.parametrize("setup_name", list(SETUPS)) +def test_read_regions_default_count_matches_discovery(store: Store, setup_name: str) -> None: + """With no explicit regions, ``read_regions`` reads exactly the populated + shards discovered by ``shards_initialized``.""" + arr, _ = SETUPS[setup_name](store) + regions = zarr.read_regions(arr) + assert len(regions) == len(zarr.shards_initialized(arr)) + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +def test_read_regions_explicit_regions(store: Store) -> None: + """Explicit regions are read and returned with their decoded data.""" + arr, baseline = _sparse_1d(store) + explicit = [(slice(8, 16),), (slice(40, 48),)] + regions = dict(zarr.read_regions(arr, explicit)) + assert set(regions) == set(explicit) + assert np.array_equal(np.asarray(regions[(slice(8, 16),)]), baseline[8:16]) + assert np.array_equal(np.asarray(regions[(slice(40, 48),)]), baseline[40:48]) + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +def test_read_regions_concurrency_one(store: Store) -> None: + """A concurrency limit of 1 produces the same result as the default.""" + arr, baseline = _sparse_2d(store) + regions = zarr.read_regions(arr, concurrency=1) + assert np.array_equal(_pack(arr, regions, baseline), baseline) + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +@pytest.mark.parametrize("setup_name", list(SETUPS)) +async def test_read_regions_async_matches_sync(store: Store, setup_name: str) -> None: + """The async streaming generator yields the same ``(region, data)`` set as + the synchronous wrapper.""" + arr, _ = SETUPS[setup_name](store) + async_pairs = { + region: np.asarray(data).tobytes() + async for region, data in async_api.read_regions(arr._async_array) + } + sync_pairs = {region: np.asarray(data).tobytes() for region, data in zarr.read_regions(arr)} + assert async_pairs == sync_pairs + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +async def test_shards_initialized_async(store: Store) -> None: + arr, _ = _sparse_1d(store) + keys = await async_api.shards_initialized(arr._async_array) + assert set(keys) == {"c/1", "c/5"} From 82bdbf85af667f2af9f79a4f690fc5f4e4f15d50 Mon Sep 17 00:00:00 2001 From: Shane Grigsby Date: Tue, 2 Jun 2026 19:01:33 -0700 Subject: [PATCH 2/6] delegation consistency for shards --- src/zarr/core/array.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 094aaf72f8..74bc3225e4 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -3975,13 +3975,11 @@ async def _shards_initialized( [nchunks_initialized][zarr.Array.nchunks_initialized] """ - store_contents_relative = { - _relativize_path(path=key, prefix=array.store_path.path) - async for key in array.store_path.store.list_prefix(prefix=array.store_path.path) - } - return tuple( - chunk_key for chunk_key in array._iter_shard_keys() if chunk_key in store_contents_relative - ) + # Thin wrapper over the shared discovery core, projected to keys. The "list" + # strategy preserves this function's historical behavior (a single prefix + # listing); see [shards_initialized][zarr.shards_initialized] for the public, + # strategy-aware entry point. + return tuple(key for _, key in await _initialized_shards(array, strategy="list")) # When the array has at most this many possible shards, ``shards_initialized`` From a027be53104c020e9246263e7c5a160b42b1232c Mon Sep 17 00:00:00 2001 From: Shane Grigsby Date: Wed, 3 Jun 2026 16:47:16 -0700 Subject: [PATCH 3/6] easy fixes (tests, file paths, etc) --- bench/empty_chunks.py | 157 ------------------------ changes/3929.feature.md | 14 --- changes/4028.feature.md | 7 ++ docs/api/zarr/read.md | 1 + src/zarr/__init__.py | 2 + src/zarr/api/asynchronous.py | 2 + src/zarr/api/synchronous.py | 55 +++++++-- src/zarr/core/array.py | 142 +++++++++++----------- tests/benchmarks/test_indexing.py | 47 +++++++- tests/test_array.py | 179 +++++++++++++++++++++++++++ tests/test_chunk_access.py | 194 ------------------------------ 11 files changed, 357 insertions(+), 443 deletions(-) delete mode 100644 bench/empty_chunks.py delete mode 100644 changes/3929.feature.md create mode 100644 changes/4028.feature.md delete mode 100644 tests/test_chunk_access.py diff --git a/bench/empty_chunks.py b/bench/empty_chunks.py deleted file mode 100644 index 50fc7e0d8a..0000000000 --- a/bench/empty_chunks.py +++ /dev/null @@ -1,157 +0,0 @@ -"""Benchmark for sparse-array reads via the chunk-access primitives. - -Compares the stock ``arr[:]`` read against two primitive-based read paths on -sparse arrays (~3% of chunks populated), sweeping chunk count on ``MemoryStore`` -and ``LocalStore``: - -- ``pack``: :func:`zarr.read_regions` + scatter onto a fill-valued array. This - reproduces ``arr[:]`` semantics (a single contiguous array) but only touches - the populated chunks. -- ``stream``: iterate :func:`zarr.read_regions` without packing into one array. - This is the win for pipelines that operate per chunk and never need the whole - array materialized. - -The stock baseline scales with total chunk count; the primitive-based paths scale -with the populated-chunk count, so the speedup grows with sparsity-at-scale. Each -configuration is skipped if the warmup baseline exceeds ``BASELINE_BUDGET_S`` to -keep total runtime bounded. -""" - -from __future__ import annotations - -import tempfile -import time -import timeit -from pathlib import Path - -import numpy as np - -import zarr -from zarr.storage import LocalStore, MemoryStore - -CHUNK_SIZE = 1024 -DTYPE = "int32" -FILL_VALUE = 0 -BASELINE_BUDGET_S = 25.0 # skip configs whose warmup baseline exceeds this - -# (n_chunks, n_populated) — ~3% populated, mirrors the zagg HEALPix report. -SWEEP: list[tuple[int, int]] = [ - (1_024, 32), - (4_096, 128), - (16_384, 512), - (49_152, 1_536), -] - - -def _build_array(store: object, n_chunks: int, n_populated: int) -> zarr.Array: - arr = zarr.create_array( - store=store, - shape=(n_chunks * CHUNK_SIZE,), - chunks=(CHUNK_SIZE,), - dtype=DTYPE, - fill_value=FILL_VALUE, - ) - rng = np.random.default_rng(seed=0) - chunk_indices = rng.choice(n_chunks, size=n_populated, replace=False) - payload = np.arange(CHUNK_SIZE, dtype=DTYPE) - for ci in chunk_indices: - start = int(ci) * CHUNK_SIZE - arr[start : start + CHUNK_SIZE] = payload - return arr - - -def _read_baseline(arr: zarr.Array) -> None: - arr[:] - - -def _read_pack(arr: zarr.Array) -> np.ndarray: - out = np.full(arr.shape, arr.fill_value, dtype=arr.dtype) - for region, data in zarr.read_regions(arr): - out[region] = np.asarray(data) - return out - - -def _read_stream(arr: zarr.Array) -> int: - # Touch each region without materializing a single contiguous array. - total = 0 - for _region, data in zarr.read_regions(arr): - total += int(np.asarray(data).sum()) - return total - - -def _time(fn: object, repeats: int) -> float: - return min(timeit.repeat(fn, repeat=repeats, number=1)) - - -def _adaptive_repeats(warmup_s: float) -> int: - if warmup_s < 0.1: - return 5 - if warmup_s < 1.0: - return 3 - return 1 - - -def _run_one( - store_name: str, store: object, n_chunks: int, n_populated: int -) -> tuple[str, int, int, float, float, float, str]: - arr = _build_array(store, n_chunks, n_populated) - - t0 = time.perf_counter() - _read_baseline(arr) - warmup = time.perf_counter() - t0 - if warmup > BASELINE_BUDGET_S: - return ( - store_name, - n_chunks, - n_populated, - warmup, - float("nan"), - float("nan"), - f"skipped (>{BASELINE_BUDGET_S:.0f}s budget)", - ) - - # warm both primitive paths once - _read_pack(arr) - _read_stream(arr) - - repeats = _adaptive_repeats(warmup) - t_base = _time(lambda: _read_baseline(arr), repeats) - t_pack = _time(lambda: _read_pack(arr), repeats) - t_stream = _time(lambda: _read_stream(arr), repeats) - return store_name, n_chunks, n_populated, t_base, t_pack, t_stream, f"min of {repeats} runs" - - -def main() -> None: - rows = [] - print("Running sweep — this will take a couple of minutes for the largest configs...\n") - for n_chunks, n_populated in SWEEP: - rows.append(_run_one("MemoryStore", MemoryStore(), n_chunks, n_populated)) - with tempfile.TemporaryDirectory() as tmpdir: - rows.append( - _run_one("LocalStore", LocalStore(str(Path(tmpdir))), n_chunks, n_populated) - ) - - print( - f"\n{'store':<14}{'n_chunks':>10}{'populated':>11}" - f"{'arr[:] (s)':>12}{'pack (s)':>11}{'stream (s)':>12}" - f"{'pack x':>9}{'stream x':>10} notes" - ) - print("-" * 100) - for store_name, n_chunks, n_populated, t_base, t_pack, t_stream, note in rows: - print( - f"{store_name:<14}{n_chunks:>10}{n_populated:>11}" - f"{t_base:>12.4f}{_fmt(t_pack):>11}{_fmt(t_stream):>12}" - f"{_speedup(t_base, t_pack):>9}{_speedup(t_base, t_stream):>10} {note}" - ) - - -def _fmt(t: float) -> str: - return "—" if np.isnan(t) else f"{t:.4f}" - - -def _speedup(t_base: float, t: float) -> str: - return "—" if np.isnan(t) or t <= 0 else f"{t_base / t:.1f}x" - - -if __name__ == "__main__": - main() diff --git a/changes/3929.feature.md b/changes/3929.feature.md deleted file mode 100644 index f5ac547f72..0000000000 --- a/changes/3929.feature.md +++ /dev/null @@ -1,14 +0,0 @@ -Add two primitives for efficiently reading sparse arrays, where most chunks are -empty and resolve to the fill value. - -- :func:`zarr.shards_initialized` discovers which shards (or chunks, for unsharded - arrays) of an array have actually been written to the store, via either a single - prefix listing or concurrent per-key existence probes. -- :func:`zarr.read_regions` concurrently reads and decodes array regions — by default - only the populated ones — yielding each ``(region, data)`` pair spatially resolved to - its location in the array. - -Together these let callers skip the per-empty-chunk store round-trips that dominate -``arr[:]`` on sparse arrays. Asynchronous versions are available in -:mod:`zarr.api.asynchronous`; the async :func:`zarr.api.asynchronous.read_regions` -streams each region as soon as its data is available. diff --git a/changes/4028.feature.md b/changes/4028.feature.md new file mode 100644 index 0000000000..1ff1882cab --- /dev/null +++ b/changes/4028.feature.md @@ -0,0 +1,7 @@ +Add three composable functions for efficiently reading sparse arrays, where most chunks are empty and resolve to the fill value: + +- `zarr.shards_initialized` discovers which shards (or chunks, for unsharded arrays) of an array have actually been written to the store, via either a single prefix listing or concurrent per-key existence probes (selected by `strategy=`). +- `zarr.initialized_regions` expresses that discovery as array regions, suitable to pass straight to `read_regions`. +- `zarr.read_regions` concurrently reads and decodes a caller-supplied collection of array regions, yielding each `(region, data)` pair spatially resolved to its location in the array. + +Together these let callers skip the per-empty-chunk store round-trips that dominate `arr[:]` on sparse arrays — e.g. `read_regions(arr, initialized_regions(arr))`. Asynchronous versions are available in `zarr.api.asynchronous`; the async `read_regions` streams each region as soon as its data is available. diff --git a/docs/api/zarr/read.md b/docs/api/zarr/read.md index d84bddb8ac..87e84a1244 100644 --- a/docs/api/zarr/read.md +++ b/docs/api/zarr/read.md @@ -3,4 +3,5 @@ title: read --- ::: zarr.shards_initialized +::: zarr.initialized_regions ::: zarr.read_regions diff --git a/src/zarr/__init__.py b/src/zarr/__init__.py index d7686d5832..ec33511ddf 100644 --- a/src/zarr/__init__.py +++ b/src/zarr/__init__.py @@ -19,6 +19,7 @@ full, full_like, group, + initialized_regions, load, ones, ones_like, @@ -166,6 +167,7 @@ def set_format(log_format: str) -> None: "full", "full_like", "group", + "initialized_regions", "load", "ones", "ones_like", diff --git a/src/zarr/api/asynchronous.py b/src/zarr/api/asynchronous.py index 9458f9bf4d..5a069ebd34 100644 --- a/src/zarr/api/asynchronous.py +++ b/src/zarr/api/asynchronous.py @@ -18,6 +18,7 @@ create_array, from_array, get_array_metadata, + initialized_regions, read_regions, shards_initialized, ) @@ -81,6 +82,7 @@ "full", "full_like", "group", + "initialized_regions", "load", "ones", "ones_like", diff --git a/src/zarr/api/synchronous.py b/src/zarr/api/synchronous.py index d9f47713f6..331c5bfe3b 100644 --- a/src/zarr/api/synchronous.py +++ b/src/zarr/api/synchronous.py @@ -58,6 +58,7 @@ "full", "full_like", "group", + "initialized_regions", "load", "ones", "ones_like", @@ -1444,8 +1445,8 @@ def shards_initialized( This reports storage at the granularity of stored objects: for sharded arrays it returns shard keys (the objects that actually exist in the store), and for unsharded - arrays it returns chunk keys. To fetch and decode the populated regions, pass the - result of this function (or its regions) to [read_regions][zarr.read_regions]. + arrays it returns chunk keys. To turn these into array regions, use + [initialized_regions][zarr.initialized_regions]. Parameters ---------- @@ -1471,11 +1472,44 @@ def shards_initialized( See Also -------- - read_regions : Read and decode the populated regions of an array. + initialized_regions : The array regions spanned by the populated shards. + read_regions : Read and decode a collection of array regions. """ return sync(async_api.shards_initialized(_as_async_array(array), strategy=strategy)) +def initialized_regions( + array: Array | AsyncArray[Any], + *, + strategy: Literal["auto", "list", "probe"] = "auto", +) -> list[tuple[slice, ...]]: + """ + Return the array regions spanned by the shards that have been persisted to the store. + + This is [shards_initialized][zarr.shards_initialized] expressed as regions: it filters + the array's shard regions down to those whose shard is populated. The result is + suitable to pass directly to [read_regions][zarr.read_regions]. + + Parameters + ---------- + array : Array or AsyncArray + The array to inspect. + strategy : {"auto", "list", "probe"}, default "auto" + How to discover which shards exist. See [shards_initialized][zarr.shards_initialized]. + + Returns + ------- + list[tuple[slice, ...]] + The regions spanned by the populated shards, in chunk-grid order. + + See Also + -------- + shards_initialized : The storage keys of the populated shards. + read_regions : Read and decode a collection of array regions. + """ + return sync(async_api.initialized_regions(_as_async_array(array), strategy=strategy)) + + def read_regions( array: Array | AsyncArray[Any], regions: Iterable[tuple[slice, ...]] | None = None, @@ -1485,12 +1519,12 @@ def read_regions( """ Read and decode array regions, returning a list of ``(region, data)`` pairs. - This is the spatially-resolved companion to [shards_initialized][zarr.shards_initialized]: - each pair associates a region (a tuple of slices into the array) with the decoded data - for that region, letting callers operate on only the populated parts of a sparse array - without materializing the full array. For lazy, streaming consumption use the - asynchronous [zarr.api.asynchronous.read_regions][] instead, which yields each pair as - soon as its data is available. + Each pair associates a region (a tuple of slices into the array) with the decoded data + for that region. If ``regions`` is omitted, it defaults to the populated regions of the + array (see [initialized_regions][zarr.initialized_regions]), letting callers operate on + only the populated parts of a sparse array without materializing the full array. For + lazy, streaming consumption use the asynchronous [zarr.api.asynchronous.read_regions][] + instead, which yields each pair as soon as its data is available. Parameters ---------- @@ -1512,7 +1546,8 @@ def read_regions( See Also -------- - shards_initialized : Discover which shards of an array are populated. + initialized_regions : The array regions spanned by the populated shards. + shards_initialized : The storage keys of the populated shards. """ async def _collect() -> list[tuple[tuple[slice, ...], NDArrayLikeOrScalar]]: diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 74bc3225e4..2138b728a9 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -3975,11 +3975,9 @@ async def _shards_initialized( [nchunks_initialized][zarr.Array.nchunks_initialized] """ - # Thin wrapper over the shared discovery core, projected to keys. The "list" - # strategy preserves this function's historical behavior (a single prefix - # listing); see [shards_initialized][zarr.shards_initialized] for the public, - # strategy-aware entry point. - return tuple(key for _, key in await _initialized_shards(array, strategy="list")) + # Thin wrapper over the public, strategy-aware entry point. The "list" strategy + # preserves this function's historical behavior (a single prefix listing). + return await shards_initialized(array, strategy="list") # When the array has at most this many possible shards, ``shards_initialized`` @@ -3989,108 +3987,116 @@ async def _shards_initialized( _PROBE_THRESHOLD = 64 -async def _initialized_shards( - array: AnyAsyncArray, +async def shards_initialized( + array: AnyArray | AnyAsyncArray, *, strategy: Literal["auto", "list", "probe"] = "auto", -) -> list[tuple[tuple[int, ...], str]]: +) -> tuple[str, ...]: """ - Discover the populated shards of an array, returning ``(coords, key)`` pairs in - chunk-grid order. This is the shared core of [shards_initialized][zarr.shards_initialized] - (which projects to keys) and [read_regions][zarr.read_regions] (which projects to regions). + Return the storage keys of the shards that have been persisted to the store. + + This reports storage at the granularity of stored objects: for sharded arrays it + returns shard keys (the objects that actually exist in the store), and for unsharded + arrays it returns chunk keys. To turn these into array regions, use + [initialized_regions][zarr.initialized_regions]. + + Parameters + ---------- + array : Array or AsyncArray + The array to inspect. + strategy : {"auto", "list", "probe"}, default "auto" + How to discover which shards exist. + + - ``"list"`` issues a single ``store.list_prefix`` call and keeps the keys that + belong to this array's shard grid (ignoring metadata and any other objects + under the same prefix). + - ``"probe"`` checks the existence of each possible shard key individually and + concurrently. This avoids listing a prefix that may hold many unrelated + objects, and is faster when the array has few possible shards. + - ``"auto"`` uses ``"probe"`` when the array has at most a small number of + possible shards and ``"list"`` otherwise. + + Returns + ------- + tuple[str, ...] + The storage keys of the populated shards (or chunks, when unsharded), + in chunk-grid order. + + See Also + -------- + initialized_regions : The array regions spanned by the populated shards. + read_regions : Read and decode a collection of array regions. """ - coords = list(_iter_shard_coords(array)) - keys = [array.metadata.encode_chunk_key(c) for c in coords] + if isinstance(array, Array): + array = array._async_array + + keys = list(_iter_shard_keys(array)) if strategy == "auto": - strategy = "probe" if len(coords) <= _PROBE_THRESHOLD else "list" + strategy = "probe" if len(keys) <= _PROBE_THRESHOLD else "list" if strategy == "list": - # A single prefix listing, filtered to keys that belong to this array's - # shard grid. Non-chunk objects under the same prefix (metadata, etc.) - # are excluded by the intersection, addressing the case where the prefix - # holds many unrelated objects. + # A single prefix listing, filtered to keys that belong to this array's shard + # grid. Non-chunk objects under the same prefix (metadata, etc.) are excluded by + # the intersection, handling prefixes that also hold unrelated objects. contents = { _relativize_path(path=key, prefix=array.store_path.path) async for key in array.store_path.store.list_prefix(prefix=array.store_path.path) } - return [(c, k) for c, k in zip(coords, keys, strict=True) if k in contents] + return tuple(key for key in keys if key in contents) elif strategy == "probe": # Per-key existence checks, concurrently. Preferable when the prefix may # contain many unrelated objects, or when there are few keys to check. present = await concurrent_map( - [(array.store_path / k,) for k in keys], + [(array.store_path / key,) for key in keys], lambda store_path: store_path.exists(), zarr_config.get("async.concurrency"), ) - return [ - (c, k) - for (c, k), is_present in zip(zip(coords, keys, strict=True), present, strict=True) - if is_present - ] + return tuple(key for key, is_present in zip(keys, present, strict=True) if is_present) else: raise ValueError( f"Unknown strategy {strategy!r}. Expected one of 'auto', 'list', or 'probe'." ) -async def shards_initialized( +async def initialized_regions( array: AnyArray | AnyAsyncArray, *, strategy: Literal["auto", "list", "probe"] = "auto", -) -> tuple[str, ...]: +) -> list[tuple[slice, ...]]: """ - Return the storage keys of the shards that have been persisted to the store. + Return the array regions spanned by the shards that have been persisted to the store. - This reports storage at the granularity of stored objects: for sharded arrays it - returns shard keys (the objects that actually exist in the store), and for unsharded - arrays it returns chunk keys. To fetch and decode the populated regions, pass the - result of this function (or its regions) to [read_regions][zarr.read_regions]. + This is [shards_initialized][zarr.shards_initialized] expressed as regions: it filters + the array's shard regions down to those whose shard is populated. The result is + suitable to pass directly to [read_regions][zarr.read_regions]. Parameters ---------- array : Array or AsyncArray The array to inspect. strategy : {"auto", "list", "probe"}, default "auto" - How to discover which shards exist. - - - ``"list"`` issues a single ``store.list_prefix`` call and keeps the keys that - belong to this array's shard grid (ignoring metadata and any other objects - under the same prefix). - - ``"probe"`` checks the existence of each possible shard key individually and - concurrently. This avoids listing a prefix that may hold many unrelated - objects, and is faster when the array has few possible shards. - - ``"auto"`` uses ``"probe"`` when the array has at most a small number of - possible shards and ``"list"`` otherwise. + How to discover which shards exist. See [shards_initialized][zarr.shards_initialized]. Returns ------- - tuple[str, ...] - The storage keys of the populated shards (or chunks, when unsharded), - in chunk-grid order. + list[tuple[slice, ...]] + The regions spanned by the populated shards, in chunk-grid order. See Also -------- - read_regions : Read and decode the populated regions of an array. + shards_initialized : The storage keys of the populated shards. + read_regions : Read and decode a collection of array regions. """ if isinstance(array, Array): array = array._async_array - return tuple(key for _, key in await _initialized_shards(array, strategy=strategy)) - - -async def _initialized_regions( - array: AnyAsyncArray, - *, - strategy: Literal["auto", "list", "probe"] = "auto", -) -> list[tuple[slice, ...]]: - """Return the array regions spanned by each populated shard, in chunk-grid order.""" - shard_shape = array.shards if array.shards is not None else array.chunks + initialized = frozenset(await shards_initialized(array, strategy=strategy)) + # "regions that are initialized" is a filter over the array's shard regions, keyed on + # whether the corresponding shard key is populated. return [ - tuple( - slice(c * s, min((c + 1) * s, dim)) - for c, s, dim in zip(coords, shard_shape, array.shape, strict=True) - ) - for coords, _ in await _initialized_shards(array, strategy=strategy) + region + for region, key in zip(_iter_shard_regions(array), _iter_shard_keys(array), strict=True) + if key in initialized ] @@ -4104,10 +4110,11 @@ async def read_regions( Concurrently read and decode array regions, yielding each ``(region, data)`` pair as soon as its data is available. - This is the spatially-resolved companion to [shards_initialized][zarr.shards_initialized]: - each yielded value pairs a region (a tuple of slices into the array) with the decoded - data for that region, so callers can stream over only the populated parts of a sparse - array without materializing the full array. + Each yielded value pairs a region (a tuple of slices into the array) with the decoded + data for that region. If ``regions`` is omitted, it defaults to the populated regions + of the array (see [initialized_regions][zarr.initialized_regions]), so callers can + stream over only the populated parts of a sparse array without materializing the full + array. Parameters ---------- @@ -4129,14 +4136,15 @@ async def read_regions( See Also -------- - shards_initialized : Discover which shards of an array are populated. + initialized_regions : The array regions spanned by the populated shards. + shards_initialized : The storage keys of the populated shards. """ if isinstance(array, Array): array = array._async_array if concurrency is None: concurrency = zarr_config.get("async.concurrency") - region_list = await _initialized_regions(array) if regions is None else list(regions) + region_list = await initialized_regions(array) if regions is None else list(regions) semaphore = Semaphore(concurrency) diff --git a/tests/benchmarks/test_indexing.py b/tests/benchmarks/test_indexing.py index 385a85b5b5..0090b4304a 100644 --- a/tests/benchmarks/test_indexing.py +++ b/tests/benchmarks/test_indexing.py @@ -11,7 +11,7 @@ import pytest -from zarr import create_array +from zarr import create_array, read_regions indexers = ( (0,) * 3, @@ -277,3 +277,48 @@ def write_with_cache_clear() -> None: data[indexer] = write_data benchmark(write_with_cache_clear) + + +# Sparse-read benchmark: most chunks empty (resolve to the fill value). +sparse_shards = ( + None, + (64,), +) + + +@pytest.mark.parametrize("store", ["memory", "memory_get_latency"], indirect=["store"]) +@pytest.mark.parametrize("shards", sparse_shards, ids=str) +@pytest.mark.parametrize("reader", ["full", "read_regions"], ids=str) +def test_sparse_read( + store: Store, + shards: tuple[int, ...] | None, + reader: str, + benchmark: BenchmarkFixture, +) -> None: + """Benchmark reading a sparse array (most chunks empty) two ways. + + ``full`` is the stock ``arr[:]`` read, which issues a store request for every chunk + including the empty ones; ``read_regions`` touches only the populated chunks. The gap + is largest on ``memory_get_latency``, where each skipped empty-chunk request avoids a + round-trip. + """ + n_chunks = 256 + chunk = 16 + data = create_array( + store=store, + shape=(n_chunks * chunk,), + dtype="uint8", + chunks=(chunk,), + shards=shards, + compressors=None, + filters=None, + fill_value=0, + ) + # populate ~3% of chunks, spread across the array + for ci in range(0, n_chunks, 32): + data[ci * chunk : ci * chunk + chunk] = 1 + + if reader == "full": + benchmark(getitem, data, slice(None)) + else: + benchmark(read_regions, data) diff --git a/tests/test_array.py b/tests/test_array.py index 0d6d2d5906..971533f394 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -2374,3 +2374,182 @@ async def test_create_array_chunks_3d( shape = (10, 12, 15) arr = await create_array(store={}, shape=shape, chunks=chunk_input, dtype="float64") assert arr.write_chunk_sizes == expected + + +# --- shards_initialized / initialized_regions / read_regions --------------------------- + + +def _ca_sparse_1d(store: Store) -> tuple[Array, np.ndarray]: + arr = zarr.create_array(store=store, shape=(64,), chunks=(8,), dtype="int32", fill_value=42) + # populate two non-adjacent chunks (chunks 1 and 5) + arr[8:16] = np.arange(8, dtype="int32") + arr[40:48] = np.arange(100, 108, dtype="int32") + return arr, np.asarray(arr[:]) + + +def _ca_dense_1d(store: Store) -> tuple[Array, np.ndarray]: + arr = zarr.create_array(store=store, shape=(32,), chunks=(8,), dtype="int32", fill_value=0) + arr[:] = np.arange(32, dtype="int32") + return arr, np.asarray(arr[:]) + + +def _ca_sparse_2d(store: Store) -> tuple[Array, np.ndarray]: + arr = zarr.create_array(store=store, shape=(8, 8), chunks=(2, 2), dtype="int32", fill_value=-1) + arr[0:2, 0:2] = np.ones((2, 2), dtype="int32") + arr[4:6, 4:6] = np.full((2, 2), 7, dtype="int32") + return arr, np.asarray(arr[:]) + + +def _ca_sharded_sparse(store: Store) -> tuple[Array, np.ndarray]: + # chunks (2, 2) within shards (4, 4): the shard grid is 2x2 over the 8x8 array. + arr = zarr.create_array( + store=store, shape=(8, 8), chunks=(2, 2), shards=(4, 4), dtype="int32", fill_value=42 + ) + arr[0:2, 0:2] = np.ones((2, 2), dtype="int32") # shard (0, 0) + arr[4:6, 4:6] = np.full((2, 2), 7, dtype="int32") # shard (1, 1) + return arr, np.asarray(arr[:]) + + +def _ca_all_empty(store: Store) -> tuple[Array, np.ndarray]: + arr = zarr.create_array(store=store, shape=(32,), chunks=(8,), dtype="int32", fill_value=7) + return arr, np.asarray(arr[:]) + + +def _ca_all_populated(store: Store) -> tuple[Array, np.ndarray]: + arr = zarr.create_array(store=store, shape=(32,), chunks=(8,), dtype="int32", fill_value=0) + arr[:] = np.arange(32, dtype="int32") + return arr, np.asarray(arr[:]) + + +_CA_SETUPS = { + "sparse_1d": _ca_sparse_1d, + "dense_1d": _ca_dense_1d, + "sparse_2d": _ca_sparse_2d, + "sharded_sparse": _ca_sharded_sparse, + "all_empty": _ca_all_empty, + "all_populated": _ca_all_populated, +} +_CA_STRATEGIES = ["auto", "list", "probe"] + + +def _ca_pack(arr: Array, results: list, baseline: np.ndarray) -> np.ndarray: + """Scatter ``(region, data)`` pairs onto a fill-valued array.""" + out = np.full(baseline.shape, arr.fill_value, dtype=baseline.dtype) + for region, data in results: + out[region] = np.asarray(data) + return out + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +@pytest.mark.parametrize("setup_name", list(_CA_SETUPS)) +@pytest.mark.parametrize("strategy", _CA_STRATEGIES) +def test_shards_initialized_strategies_agree(store: Store, setup_name: str, strategy: str) -> None: + """Every strategy reports the same set of populated keys.""" + arr, _ = _CA_SETUPS[setup_name](store) + keys = set(zarr.shards_initialized(arr, strategy=strategy)) + assert keys == set(zarr.shards_initialized(arr, strategy="auto")) + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +@pytest.mark.parametrize( + ("setup_name", "expected_count"), + [ + ("sparse_1d", 2), + ("dense_1d", 4), + ("sparse_2d", 2), + ("sharded_sparse", 2), + ("all_empty", 0), + ("all_populated", 4), + ], +) +def test_shards_initialized_counts(store: Store, setup_name: str, expected_count: int) -> None: + arr, _ = _CA_SETUPS[setup_name](store) + assert len(zarr.shards_initialized(arr)) == expected_count + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +def test_shards_initialized_unknown_strategy(store: Store) -> None: + arr, _ = _ca_sparse_1d(store) + with pytest.raises(ValueError, match="Unknown strategy"): + zarr.shards_initialized(arr, strategy="nonsense") # type: ignore[arg-type] + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +def test_list_strategy_ignores_non_chunk_objects(store: Store) -> None: + """The ``list`` strategy must not mistake unrelated objects sharing the array's + prefix (e.g. metadata) for populated chunks.""" + arr, _ = _ca_sparse_1d(store) + keys = set(zarr.shards_initialized(arr, strategy="list")) + assert keys == {"c/1", "c/5"} + assert all(k.startswith("c/") for k in keys) + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +@pytest.mark.parametrize("setup_name", list(_CA_SETUPS)) +def test_initialized_regions_count_matches_keys(store: Store, setup_name: str) -> None: + """There is one initialized region per populated shard key.""" + arr, _ = _CA_SETUPS[setup_name](store) + assert len(zarr.initialized_regions(arr)) == len(zarr.shards_initialized(arr)) + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +def test_initialized_regions_values(store: Store) -> None: + arr, _ = _ca_sparse_1d(store) + assert set(zarr.initialized_regions(arr)) == {(slice(8, 16, 1),), (slice(40, 48, 1),)} + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +@pytest.mark.parametrize("setup_name", list(_CA_SETUPS)) +def test_read_regions_default_reconstructs_baseline(store: Store, setup_name: str) -> None: + """With no explicit regions, ``read_regions`` reads the populated regions; scattering + them onto a fill-valued array reproduces the full ``arr[:]`` read exactly.""" + arr, baseline = _CA_SETUPS[setup_name](store) + results = zarr.read_regions(arr) + result = _ca_pack(arr, results, baseline) + assert np.array_equal(result, baseline) + assert result.dtype == baseline.dtype + # the default region set is exactly the initialized regions + assert len(results) == len(zarr.initialized_regions(arr)) + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +def test_read_regions_explicit_regions(store: Store) -> None: + """Explicit regions are read and returned with their decoded data.""" + arr, baseline = _ca_sparse_1d(store) + explicit = [(slice(8, 16),), (slice(40, 48),)] + regions = dict(zarr.read_regions(arr, explicit)) + assert set(regions) == set(explicit) + assert np.array_equal(np.asarray(regions[(slice(8, 16),)]), baseline[8:16]) + assert np.array_equal(np.asarray(regions[(slice(40, 48),)]), baseline[40:48]) + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +def test_read_regions_concurrency_one(store: Store) -> None: + """A concurrency limit of 1 produces the same result as the default.""" + arr, baseline = _ca_sparse_2d(store) + results = zarr.read_regions(arr, zarr.initialized_regions(arr), concurrency=1) + assert np.array_equal(_ca_pack(arr, results, baseline), baseline) + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +@pytest.mark.parametrize("setup_name", list(_CA_SETUPS)) +async def test_read_regions_async_matches_sync(store: Store, setup_name: str) -> None: + """The async streaming generator yields the same ``(region, data)`` set as the + synchronous wrapper.""" + arr, _ = _CA_SETUPS[setup_name](store) + regions = zarr.initialized_regions(arr) + async_pairs = { + region: np.asarray(data).tobytes() + async for region, data in zarr.api.asynchronous.read_regions(arr._async_array, regions) + } + sync_pairs = { + region: np.asarray(data).tobytes() for region, data in zarr.read_regions(arr, regions) + } + assert async_pairs == sync_pairs + + +@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) +async def test_shards_initialized_async(store: Store) -> None: + arr, _ = _ca_sparse_1d(store) + keys = await zarr.api.asynchronous.shards_initialized(arr._async_array) + assert set(keys) == {"c/1", "c/5"} diff --git a/tests/test_chunk_access.py b/tests/test_chunk_access.py deleted file mode 100644 index 8ee6041a70..0000000000 --- a/tests/test_chunk_access.py +++ /dev/null @@ -1,194 +0,0 @@ -"""Tests for the shard-discovery and region-read primitives. - -These cover :func:`zarr.shards_initialized` (discover which shards/chunks of an -array are populated) and :func:`zarr.read_regions` (concurrently read and decode -the populated regions), along with their asynchronous counterparts in -:mod:`zarr.api.asynchronous`. -""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -import numpy as np -import pytest - -import zarr -import zarr.api.asynchronous as async_api - -if TYPE_CHECKING: - from collections.abc import Callable - - from zarr.abc.store import Store - - -def _sparse_1d(store: Store) -> tuple[zarr.Array, np.ndarray]: - arr = zarr.create_array(store=store, shape=(64,), chunks=(8,), dtype="int32", fill_value=42) - # populate two non-adjacent chunks (chunks 1 and 5) - arr[8:16] = np.arange(8, dtype="int32") - arr[40:48] = np.arange(100, 108, dtype="int32") - return arr, np.asarray(arr[:]) - - -def _dense_1d(store: Store) -> tuple[zarr.Array, np.ndarray]: - arr = zarr.create_array(store=store, shape=(32,), chunks=(8,), dtype="int32", fill_value=0) - arr[:] = np.arange(32, dtype="int32") - return arr, np.asarray(arr[:]) - - -def _sparse_2d(store: Store) -> tuple[zarr.Array, np.ndarray]: - arr = zarr.create_array(store=store, shape=(8, 8), chunks=(2, 2), dtype="int32", fill_value=-1) - arr[0:2, 0:2] = np.ones((2, 2), dtype="int32") - arr[4:6, 4:6] = np.full((2, 2), 7, dtype="int32") - return arr, np.asarray(arr[:]) - - -def _sharded_sparse(store: Store) -> tuple[zarr.Array, np.ndarray]: - # chunks (2, 2) within shards (4, 4): the shard grid is 2x2 over the 8x8 array. - arr = zarr.create_array( - store=store, shape=(8, 8), chunks=(2, 2), shards=(4, 4), dtype="int32", fill_value=42 - ) - arr[0:2, 0:2] = np.ones((2, 2), dtype="int32") # shard (0, 0) - arr[4:6, 4:6] = np.full((2, 2), 7, dtype="int32") # shard (1, 1) - return arr, np.asarray(arr[:]) - - -def _all_empty(store: Store) -> tuple[zarr.Array, np.ndarray]: - arr = zarr.create_array(store=store, shape=(32,), chunks=(8,), dtype="int32", fill_value=7) - return arr, np.asarray(arr[:]) - - -def _all_populated(store: Store) -> tuple[zarr.Array, np.ndarray]: - arr = zarr.create_array(store=store, shape=(32,), chunks=(8,), dtype="int32", fill_value=0) - arr[:] = np.arange(32, dtype="int32") - return arr, np.asarray(arr[:]) - - -SETUPS: dict[str, Callable[[Store], tuple[zarr.Array, np.ndarray]]] = { - "sparse_1d": _sparse_1d, - "dense_1d": _dense_1d, - "sparse_2d": _sparse_2d, - "sharded_sparse": _sharded_sparse, - "all_empty": _all_empty, - "all_populated": _all_populated, -} - -STRATEGIES = ["auto", "list", "probe"] - - -def _pack(arr: zarr.Array, regions: list, baseline: np.ndarray) -> np.ndarray: - """Scatter ``(region, data)`` pairs onto a fill-valued array.""" - out = np.full(baseline.shape, arr.fill_value, dtype=baseline.dtype) - for region, data in regions: - out[region] = np.asarray(data) - return out - - -@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) -@pytest.mark.parametrize("setup_name", list(SETUPS)) -@pytest.mark.parametrize("strategy", STRATEGIES) -def test_shards_initialized_strategies_agree(store: Store, setup_name: str, strategy: str) -> None: - """Every strategy reports the same set of populated keys, and reports the - expected count for a hand-known layout.""" - arr, _ = SETUPS[setup_name](store) - keys = set(zarr.shards_initialized(arr, strategy=strategy)) - # all strategies must agree - assert keys == set(zarr.shards_initialized(arr, strategy="auto")) - - -@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) -@pytest.mark.parametrize( - ("setup_name", "expected_count"), - [ - ("sparse_1d", 2), - ("dense_1d", 4), - ("sparse_2d", 2), - ("sharded_sparse", 2), - ("all_empty", 0), - ("all_populated", 4), - ], -) -def test_shards_initialized_counts(store: Store, setup_name: str, expected_count: int) -> None: - arr, _ = SETUPS[setup_name](store) - assert len(zarr.shards_initialized(arr)) == expected_count - - -@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) -def test_shards_initialized_unknown_strategy(store: Store) -> None: - arr, _ = _sparse_1d(store) - with pytest.raises(ValueError, match="Unknown strategy"): - zarr.shards_initialized(arr, strategy="nonsense") # type: ignore[arg-type] - - -@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) -def test_list_strategy_ignores_non_chunk_objects(store: Store) -> None: - """The ``list`` strategy must not mistake unrelated objects sharing the - array's prefix (e.g. metadata) for populated chunks.""" - arr, _ = _sparse_1d(store) - # metadata (zarr.json) already lives under the array prefix; add another - # non-chunk object to be sure it is excluded. - keys = set(zarr.shards_initialized(arr, strategy="list")) - assert keys == {"c/1", "c/5"} - assert all(k.startswith("c/") for k in keys) - - -@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) -@pytest.mark.parametrize("setup_name", list(SETUPS)) -def test_read_regions_reconstructs_baseline(store: Store, setup_name: str) -> None: - """Packing the populated regions onto a fill-valued array reproduces the - full ``arr[:]`` read exactly.""" - arr, baseline = SETUPS[setup_name](store) - regions = zarr.read_regions(arr) - result = _pack(arr, regions, baseline) - assert np.array_equal(result, baseline) - assert result.dtype == baseline.dtype - - -@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) -@pytest.mark.parametrize("setup_name", list(SETUPS)) -def test_read_regions_default_count_matches_discovery(store: Store, setup_name: str) -> None: - """With no explicit regions, ``read_regions`` reads exactly the populated - shards discovered by ``shards_initialized``.""" - arr, _ = SETUPS[setup_name](store) - regions = zarr.read_regions(arr) - assert len(regions) == len(zarr.shards_initialized(arr)) - - -@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) -def test_read_regions_explicit_regions(store: Store) -> None: - """Explicit regions are read and returned with their decoded data.""" - arr, baseline = _sparse_1d(store) - explicit = [(slice(8, 16),), (slice(40, 48),)] - regions = dict(zarr.read_regions(arr, explicit)) - assert set(regions) == set(explicit) - assert np.array_equal(np.asarray(regions[(slice(8, 16),)]), baseline[8:16]) - assert np.array_equal(np.asarray(regions[(slice(40, 48),)]), baseline[40:48]) - - -@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) -def test_read_regions_concurrency_one(store: Store) -> None: - """A concurrency limit of 1 produces the same result as the default.""" - arr, baseline = _sparse_2d(store) - regions = zarr.read_regions(arr, concurrency=1) - assert np.array_equal(_pack(arr, regions, baseline), baseline) - - -@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) -@pytest.mark.parametrize("setup_name", list(SETUPS)) -async def test_read_regions_async_matches_sync(store: Store, setup_name: str) -> None: - """The async streaming generator yields the same ``(region, data)`` set as - the synchronous wrapper.""" - arr, _ = SETUPS[setup_name](store) - async_pairs = { - region: np.asarray(data).tobytes() - async for region, data in async_api.read_regions(arr._async_array) - } - sync_pairs = {region: np.asarray(data).tobytes() for region, data in zarr.read_regions(arr)} - assert async_pairs == sync_pairs - - -@pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) -async def test_shards_initialized_async(store: Store) -> None: - arr, _ = _sparse_1d(store) - keys = await async_api.shards_initialized(arr._async_array) - assert set(keys) == {"c/1", "c/5"} From aec3260a7893809bfda6fae90b66bb9920698f7c Mon Sep 17 00:00:00 2001 From: Shane Grigsby Date: Wed, 3 Jun 2026 17:10:56 -0700 Subject: [PATCH 4/6] switching to caller to provide regions --- src/zarr/api/synchronous.py | 19 +++++++++---------- src/zarr/core/array.py | 21 ++++++++------------- tests/benchmarks/test_indexing.py | 10 +++++----- tests/test_array.py | 10 ++++------ 4 files changed, 26 insertions(+), 34 deletions(-) diff --git a/src/zarr/api/synchronous.py b/src/zarr/api/synchronous.py index 331c5bfe3b..89457ddcf1 100644 --- a/src/zarr/api/synchronous.py +++ b/src/zarr/api/synchronous.py @@ -1512,28 +1512,27 @@ def initialized_regions( def read_regions( array: Array | AsyncArray[Any], - regions: Iterable[tuple[slice, ...]] | None = None, + regions: Iterable[tuple[slice, ...]], *, concurrency: int | None = None, ) -> list[tuple[tuple[slice, ...], NDArrayLikeOrScalar]]: """ - Read and decode array regions, returning a list of ``(region, data)`` pairs. + Read and decode a collection of array regions, returning a list of ``(region, data)`` + pairs. Each pair associates a region (a tuple of slices into the array) with the decoded data - for that region. If ``regions`` is omitted, it defaults to the populated regions of the - array (see [initialized_regions][zarr.initialized_regions]), letting callers operate on - only the populated parts of a sparse array without materializing the full array. For - lazy, streaming consumption use the asynchronous [zarr.api.asynchronous.read_regions][] - instead, which yields each pair as soon as its data is available. + for that region. The regions to read are supplied by the caller; pass the result of + [initialized_regions][zarr.initialized_regions] to read only the populated parts of a + sparse array without materializing the full array. For lazy, streaming consumption use + the asynchronous [zarr.api.asynchronous.read_regions][] instead, which yields each pair + as soon as its data is available. Parameters ---------- array : Array or AsyncArray The array to read from. - regions : iterable of tuple of slice, optional + regions : iterable of tuple of slice The regions to read. Each region is a tuple of slices, one per array dimension. - If omitted, defaults to the regions spanned by the populated shards of ``array`` - (i.e. every region that holds data). concurrency : int, optional The maximum number of regions read concurrently. Defaults to the ``async.concurrency`` config value. diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 2138b728a9..f2fae9b50f 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -4102,28 +4102,25 @@ async def initialized_regions( async def read_regions( array: AnyArray | AnyAsyncArray, - regions: Iterable[tuple[slice, ...]] | None = None, + regions: Iterable[tuple[slice, ...]], *, concurrency: int | None = None, ) -> AsyncIterator[tuple[tuple[slice, ...], NDArrayLikeOrScalar]]: """ - Concurrently read and decode array regions, yielding each ``(region, data)`` pair - as soon as its data is available. + Concurrently read and decode a collection of array regions, yielding each + ``(region, data)`` pair as soon as its data is available. Each yielded value pairs a region (a tuple of slices into the array) with the decoded - data for that region. If ``regions`` is omitted, it defaults to the populated regions - of the array (see [initialized_regions][zarr.initialized_regions]), so callers can - stream over only the populated parts of a sparse array without materializing the full - array. + data for that region. The regions to read are supplied by the caller; pass the result + of [initialized_regions][zarr.initialized_regions] to read only the populated parts of + a sparse array without materializing the full array. Parameters ---------- array : Array or AsyncArray The array to read from. - regions : iterable of tuple of slice, optional + regions : iterable of tuple of slice The regions to read. Each region is a tuple of slices, one per array dimension. - If omitted, defaults to the regions spanned by the populated shards of ``array`` - (i.e. every region that holds data). concurrency : int, optional The maximum number of regions read concurrently. Defaults to the ``async.concurrency`` config value. @@ -4144,8 +4141,6 @@ async def read_regions( if concurrency is None: concurrency = zarr_config.get("async.concurrency") - region_list = await initialized_regions(array) if regions is None else list(regions) - semaphore = Semaphore(concurrency) async def _read( @@ -4154,7 +4149,7 @@ async def _read( async with semaphore: return region, await array.getitem(region) - for future in as_completed([ensure_future(_read(region)) for region in region_list]): + for future in as_completed([ensure_future(_read(region)) for region in regions]): yield await future diff --git a/tests/benchmarks/test_indexing.py b/tests/benchmarks/test_indexing.py index 0090b4304a..021c0b1e04 100644 --- a/tests/benchmarks/test_indexing.py +++ b/tests/benchmarks/test_indexing.py @@ -11,7 +11,7 @@ import pytest -from zarr import create_array, read_regions +from zarr import create_array, initialized_regions, read_regions indexers = ( (0,) * 3, @@ -298,9 +298,9 @@ def test_sparse_read( """Benchmark reading a sparse array (most chunks empty) two ways. ``full`` is the stock ``arr[:]`` read, which issues a store request for every chunk - including the empty ones; ``read_regions`` touches only the populated chunks. The gap - is largest on ``memory_get_latency``, where each skipped empty-chunk request avoids a - round-trip. + including the empty ones; ``read_regions`` discovers the populated regions and reads + only those. The gap is largest on ``memory_get_latency``, where each skipped + empty-chunk request avoids a round-trip. """ n_chunks = 256 chunk = 16 @@ -321,4 +321,4 @@ def test_sparse_read( if reader == "full": benchmark(getitem, data, slice(None)) else: - benchmark(read_regions, data) + benchmark(lambda: read_regions(data, initialized_regions(data))) diff --git a/tests/test_array.py b/tests/test_array.py index 971533f394..938a9a4dbe 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -2500,16 +2500,14 @@ def test_initialized_regions_values(store: Store) -> None: @pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) @pytest.mark.parametrize("setup_name", list(_CA_SETUPS)) -def test_read_regions_default_reconstructs_baseline(store: Store, setup_name: str) -> None: - """With no explicit regions, ``read_regions`` reads the populated regions; scattering - them onto a fill-valued array reproduces the full ``arr[:]`` read exactly.""" +def test_read_initialized_regions_reconstructs_baseline(store: Store, setup_name: str) -> None: + """Reading the initialized regions and scattering them onto a fill-valued array + reproduces the full ``arr[:]`` read exactly.""" arr, baseline = _CA_SETUPS[setup_name](store) - results = zarr.read_regions(arr) + results = zarr.read_regions(arr, zarr.initialized_regions(arr)) result = _ca_pack(arr, results, baseline) assert np.array_equal(result, baseline) assert result.dtype == baseline.dtype - # the default region set is exactly the initialized regions - assert len(results) == len(zarr.initialized_regions(arr)) @pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) From cc94ab8203aa5c19828c4f6bd760c6ba8add149c Mon Sep 17 00:00:00 2001 From: Shane Grigsby Date: Wed, 3 Jun 2026 17:22:43 -0700 Subject: [PATCH 5/6] linting fixes --- src/zarr/api/synchronous.py | 8 ++++---- tests/test_array.py | 22 ++++++++++++++-------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/zarr/api/synchronous.py b/src/zarr/api/synchronous.py index 89457ddcf1..780129f34c 100644 --- a/src/zarr/api/synchronous.py +++ b/src/zarr/api/synchronous.py @@ -1431,12 +1431,12 @@ def zeros_like(a: ArrayLike, **kwargs: Any) -> AnyArray: return Array(sync(async_api.zeros_like(a, **kwargs))) -def _as_async_array(array: Array | AsyncArray[Any]) -> AsyncArray[Any]: +def _as_async_array(array: Array[Any] | AsyncArray[Any]) -> AsyncArray[Any]: return array._async_array if isinstance(array, Array) else array def shards_initialized( - array: Array | AsyncArray[Any], + array: Array[Any] | AsyncArray[Any], *, strategy: Literal["auto", "list", "probe"] = "auto", ) -> tuple[str, ...]: @@ -1479,7 +1479,7 @@ def shards_initialized( def initialized_regions( - array: Array | AsyncArray[Any], + array: Array[Any] | AsyncArray[Any], *, strategy: Literal["auto", "list", "probe"] = "auto", ) -> list[tuple[slice, ...]]: @@ -1511,7 +1511,7 @@ def initialized_regions( def read_regions( - array: Array | AsyncArray[Any], + array: Array[Any] | AsyncArray[Any], regions: Iterable[tuple[slice, ...]], *, concurrency: int | None = None, diff --git a/tests/test_array.py b/tests/test_array.py index 938a9a4dbe..ca29195c4b 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -2379,7 +2379,7 @@ async def test_create_array_chunks_3d( # --- shards_initialized / initialized_regions / read_regions --------------------------- -def _ca_sparse_1d(store: Store) -> tuple[Array, np.ndarray]: +def _ca_sparse_1d(store: Store) -> tuple[Array[Any], npt.NDArray[Any]]: arr = zarr.create_array(store=store, shape=(64,), chunks=(8,), dtype="int32", fill_value=42) # populate two non-adjacent chunks (chunks 1 and 5) arr[8:16] = np.arange(8, dtype="int32") @@ -2387,20 +2387,20 @@ def _ca_sparse_1d(store: Store) -> tuple[Array, np.ndarray]: return arr, np.asarray(arr[:]) -def _ca_dense_1d(store: Store) -> tuple[Array, np.ndarray]: +def _ca_dense_1d(store: Store) -> tuple[Array[Any], npt.NDArray[Any]]: arr = zarr.create_array(store=store, shape=(32,), chunks=(8,), dtype="int32", fill_value=0) arr[:] = np.arange(32, dtype="int32") return arr, np.asarray(arr[:]) -def _ca_sparse_2d(store: Store) -> tuple[Array, np.ndarray]: +def _ca_sparse_2d(store: Store) -> tuple[Array[Any], npt.NDArray[Any]]: arr = zarr.create_array(store=store, shape=(8, 8), chunks=(2, 2), dtype="int32", fill_value=-1) arr[0:2, 0:2] = np.ones((2, 2), dtype="int32") arr[4:6, 4:6] = np.full((2, 2), 7, dtype="int32") return arr, np.asarray(arr[:]) -def _ca_sharded_sparse(store: Store) -> tuple[Array, np.ndarray]: +def _ca_sharded_sparse(store: Store) -> tuple[Array[Any], npt.NDArray[Any]]: # chunks (2, 2) within shards (4, 4): the shard grid is 2x2 over the 8x8 array. arr = zarr.create_array( store=store, shape=(8, 8), chunks=(2, 2), shards=(4, 4), dtype="int32", fill_value=42 @@ -2410,12 +2410,12 @@ def _ca_sharded_sparse(store: Store) -> tuple[Array, np.ndarray]: return arr, np.asarray(arr[:]) -def _ca_all_empty(store: Store) -> tuple[Array, np.ndarray]: +def _ca_all_empty(store: Store) -> tuple[Array[Any], npt.NDArray[Any]]: arr = zarr.create_array(store=store, shape=(32,), chunks=(8,), dtype="int32", fill_value=7) return arr, np.asarray(arr[:]) -def _ca_all_populated(store: Store) -> tuple[Array, np.ndarray]: +def _ca_all_populated(store: Store) -> tuple[Array[Any], npt.NDArray[Any]]: arr = zarr.create_array(store=store, shape=(32,), chunks=(8,), dtype="int32", fill_value=0) arr[:] = np.arange(32, dtype="int32") return arr, np.asarray(arr[:]) @@ -2432,7 +2432,11 @@ def _ca_all_populated(store: Store) -> tuple[Array, np.ndarray]: _CA_STRATEGIES = ["auto", "list", "probe"] -def _ca_pack(arr: Array, results: list, baseline: np.ndarray) -> np.ndarray: +def _ca_pack( + arr: Array[Any], + results: list[tuple[tuple[slice, ...], Any]], + baseline: npt.NDArray[Any], +) -> npt.NDArray[Any]: """Scatter ``(region, data)`` pairs onto a fill-valued array.""" out = np.full(baseline.shape, arr.fill_value, dtype=baseline.dtype) for region, data in results: @@ -2443,7 +2447,9 @@ def _ca_pack(arr: Array, results: list, baseline: np.ndarray) -> np.ndarray: @pytest.mark.parametrize("store", ["local", "memory"], indirect=["store"]) @pytest.mark.parametrize("setup_name", list(_CA_SETUPS)) @pytest.mark.parametrize("strategy", _CA_STRATEGIES) -def test_shards_initialized_strategies_agree(store: Store, setup_name: str, strategy: str) -> None: +def test_shards_initialized_strategies_agree( + store: Store, setup_name: str, strategy: Literal["auto", "list", "probe"] +) -> None: """Every strategy reports the same set of populated keys.""" arr, _ = _CA_SETUPS[setup_name](store) keys = set(zarr.shards_initialized(arr, strategy=strategy)) From b8d1d8c093925aa6838753ad4a9a2bf2a05a1d42 Mon Sep 17 00:00:00 2001 From: Shane Grigsby Date: Wed, 3 Jun 2026 17:33:01 -0700 Subject: [PATCH 6/6] bumping codecov to include final three lines... --- tests/test_array.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/tests/test_array.py b/tests/test_array.py index ca29195c4b..b3c7046572 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -2557,3 +2557,16 @@ async def test_shards_initialized_async(store: Store) -> None: arr, _ = _ca_sparse_1d(store) keys = await zarr.api.asynchronous.shards_initialized(arr._async_array) assert set(keys) == {"c/1", "c/5"} + + +@pytest.mark.parametrize("store", ["memory"], indirect=["store"]) +async def test_async_chunk_access_accepts_sync_array(store: Store) -> None: + """The async API also accepts a synchronous ``Array``, unwrapping it to the + underlying async array.""" + arr, _ = _ca_sparse_1d(store) + keys = await zarr.api.asynchronous.shards_initialized(arr) + regions = await zarr.api.asynchronous.initialized_regions(arr) + results = [pair async for pair in zarr.api.asynchronous.read_regions(arr, regions)] + assert set(keys) == {"c/1", "c/5"} + assert set(regions) == {(slice(8, 16, 1),), (slice(40, 48, 1),)} + assert len(results) == 2