Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions changes/4028.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Add three composable functions for efficiently reading sparse arrays, where most chunks are empty and resolve to the fill value:

- `zarr.shards_initialized` discovers which shards (or chunks, for unsharded arrays) of an array have actually been written to the store, via either a single prefix listing or concurrent per-key existence probes (selected by `strategy=`).
- `zarr.initialized_regions` expresses that discovery as array regions, suitable to pass straight to `read_regions`.
- `zarr.read_regions` concurrently reads and decodes a caller-supplied collection of array regions, yielding each `(region, data)` pair spatially resolved to its location in the array.

Together these let callers skip the per-empty-chunk store round-trips that dominate `arr[:]` on sparse arrays — e.g. `read_regions(arr, initialized_regions(arr))`. Asynchronous versions are available in `zarr.api.asynchronous`; the async `read_regions` streams each region as soon as its data is available.
7 changes: 7 additions & 0 deletions docs/api/zarr/read.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
title: read
---

::: zarr.shards_initialized
::: zarr.initialized_regions
::: zarr.read_regions
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ nav:
- api/zarr/dtype.md
- api/zarr/load.md
- api/zarr/open.md
- api/zarr/read.md
- api/zarr/save.md
- api/zarr/codecs.md
- api/zarr/codecs/numcodecs.md
Expand Down
6 changes: 6 additions & 0 deletions src/zarr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
full,
full_like,
group,
initialized_regions,
load,
ones,
ones_like,
Expand All @@ -27,9 +28,11 @@
open_consolidated,
open_group,
open_like,
read_regions,
save,
save_array,
save_group,
shards_initialized,
tree,
zeros,
zeros_like,
Expand Down Expand Up @@ -164,6 +167,7 @@ def set_format(log_format: str) -> None:
"full",
"full_like",
"group",
"initialized_regions",
"load",
"ones",
"ones_like",
Expand All @@ -173,9 +177,11 @@ def set_format(log_format: str) -> None:
"open_group",
"open_like",
"print_debug_info",
"read_regions",
"save",
"save_array",
"save_group",
"shards_initialized",
"tree",
"zeros",
"zeros_like",
Expand Down
6 changes: 6 additions & 0 deletions src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
create_array,
from_array,
get_array_metadata,
initialized_regions,
read_regions,
shards_initialized,
)
from zarr.core.array_spec import ArrayConfigLike, parse_array_config
from zarr.core.buffer import NDArrayLike
Expand Down Expand Up @@ -79,6 +82,7 @@
"full",
"full_like",
"group",
"initialized_regions",
"load",
"ones",
"ones_like",
Expand All @@ -87,9 +91,11 @@
"open_consolidated",
"open_group",
"open_like",
"read_regions",
"save",
"save_array",
"save_group",
"shards_initialized",
"tree",
"zeros",
"zeros_like",
Expand Down
132 changes: 132 additions & 0 deletions src/zarr/api/synchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"full",
"full_like",
"group",
"initialized_regions",
"load",
"ones",
"ones_like",
Expand All @@ -66,9 +67,11 @@
"open_consolidated",
"open_group",
"open_like",
"read_regions",
"save",
"save_array",
"save_group",
"shards_initialized",
"tree",
"zeros",
"zeros_like",
Expand Down Expand Up @@ -1426,3 +1429,132 @@ def zeros_like(a: ArrayLike, **kwargs: Any) -> AnyArray:
The new array.
"""
return Array(sync(async_api.zeros_like(a, **kwargs)))


def _as_async_array(array: Array[Any] | AsyncArray[Any]) -> AsyncArray[Any]:
return array._async_array if isinstance(array, Array) else array


def shards_initialized(
array: Array[Any] | AsyncArray[Any],
*,
strategy: Literal["auto", "list", "probe"] = "auto",
) -> tuple[str, ...]:
"""
Return the storage keys of the shards that have been persisted to the store.

This reports storage at the granularity of stored objects: for sharded arrays it
returns shard keys (the objects that actually exist in the store), and for unsharded
arrays it returns chunk keys. To turn these into array regions, use
[initialized_regions][zarr.initialized_regions].

Parameters
----------
array : Array or AsyncArray
The array to inspect.
strategy : {"auto", "list", "probe"}, default "auto"
How to discover which shards exist.

- ``"list"`` issues a single ``store.list_prefix`` call and keeps the keys that
belong to this array's shard grid (ignoring metadata and any other objects
under the same prefix).
- ``"probe"`` checks the existence of each possible shard key individually and
concurrently. This avoids listing a prefix that may hold many unrelated
objects, and is faster when the array has few possible shards.
- ``"auto"`` uses ``"probe"`` when the array has at most a small number of
possible shards and ``"list"`` otherwise.

Returns
-------
tuple[str, ...]
The storage keys of the populated shards (or chunks, when unsharded),
in chunk-grid order.

See Also
--------
initialized_regions : The array regions spanned by the populated shards.
read_regions : Read and decode a collection of array regions.
"""
return sync(async_api.shards_initialized(_as_async_array(array), strategy=strategy))


def initialized_regions(
array: Array[Any] | AsyncArray[Any],
*,
strategy: Literal["auto", "list", "probe"] = "auto",
) -> list[tuple[slice, ...]]:
"""
Return the array regions spanned by the shards that have been persisted to the store.

This is [shards_initialized][zarr.shards_initialized] expressed as regions: it filters
the array's shard regions down to those whose shard is populated. The result is
suitable to pass directly to [read_regions][zarr.read_regions].

Parameters
----------
array : Array or AsyncArray
The array to inspect.
strategy : {"auto", "list", "probe"}, default "auto"
How to discover which shards exist. See [shards_initialized][zarr.shards_initialized].

Returns
-------
list[tuple[slice, ...]]
The regions spanned by the populated shards, in chunk-grid order.

See Also
--------
shards_initialized : The storage keys of the populated shards.
read_regions : Read and decode a collection of array regions.
"""
return sync(async_api.initialized_regions(_as_async_array(array), strategy=strategy))


def read_regions(
array: Array[Any] | AsyncArray[Any],
regions: Iterable[tuple[slice, ...]],
*,
concurrency: int | None = None,
) -> list[tuple[tuple[slice, ...], NDArrayLikeOrScalar]]:
"""
Read and decode a collection of array regions, returning a list of ``(region, data)``
pairs.

Each pair associates a region (a tuple of slices into the array) with the decoded data
for that region. The regions to read are supplied by the caller; pass the result of
[initialized_regions][zarr.initialized_regions] to read only the populated parts of a
sparse array without materializing the full array. For lazy, streaming consumption use
the asynchronous [zarr.api.asynchronous.read_regions][] instead, which yields each pair
as soon as its data is available.

Parameters
----------
array : Array or AsyncArray
The array to read from.
regions : iterable of tuple of slice
The regions to read. Each region is a tuple of slices, one per array dimension.
concurrency : int, optional
The maximum number of regions read concurrently. Defaults to the
``async.concurrency`` config value.

Returns
-------
list[tuple[tuple[slice, ...], NDArrayLikeOrScalar]]
Each region paired with its decoded data, in completion order (not necessarily
the order of ``regions``).

See Also
--------
initialized_regions : The array regions spanned by the populated shards.
shards_initialized : The storage keys of the populated shards.
"""

async def _collect() -> list[tuple[tuple[slice, ...], NDArrayLikeOrScalar]]:
return [
item
async for item in async_api.read_regions(
_as_async_array(array), regions, concurrency=concurrency
)
]

return sync(_collect())
Loading
Loading