From 86ff7e82be02a290d63c7d1ca9d2fb60193cdfad Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Mon, 1 Jun 2026 10:45:18 -0700 Subject: [PATCH 1/2] Make hotspots() lazy for Dask input (#2772) hotspots() used to call da.compute() on the global mean and std while building the graph, so calling it ran about 12 Dask tasks before you asked for any output. This keeps those reductions as lazy 0-d dask arrays that broadcast into the z-score. The convolution runs through map_overlap and the classification through map_blocks, so the Dask and Dask+CuPy paths build their graph without running a single task. test_dask_laziness.py now asserts zero tasks run on the call, via a dask task-count callback, instead of only checking the return type. --- xrspatial/focal.py | 84 ++++++++++----------------- xrspatial/tests/test_dask_laziness.py | 30 ++++++++++ 2 files changed, 62 insertions(+), 52 deletions(-) diff --git a/xrspatial/focal.py b/xrspatial/focal.py index 2cb276fbd..a47b816d1 100644 --- a/xrspatial/focal.py +++ b/xrspatial/focal.py @@ -1286,46 +1286,36 @@ def _hotspots_dask_numpy(raster, kernel, boundary='nan'): if not np.issubdtype(data.dtype, np.floating): data = data.astype(np.float32) - # Pass 1: eagerly compute global statistics (two scalars). - # This reads all chunks once, produces 16 bytes, then frees all - # intermediate state -- no barrier that would force materialization - # of the full convolution output. - global_mean, global_std = da.compute(da.nanmean(data), da.nanstd(data)) - global_mean = np.float32(global_mean) - global_std = np.float32(global_std) - - if global_std == 0: - raise ZeroDivisionError( - "Standard deviation of the input raster values is 0." - ) + # Global statistics stay lazy: 0-d dask arrays that broadcast into the + # z-score below. Nothing is computed during graph construction. + global_mean = da.nanmean(data).astype(np.float32) + global_std = da.nanstd(data).astype(np.float32) norm_kernel = (kernel / kernel.sum()).astype(np.float32) pad_h = norm_kernel.shape[0] // 2 pad_w = norm_kernel.shape[1] // 2 - # Pass 2: fuse convolution + z-score + classification into one - # map_overlap call. Each chunk reads source + halo, produces int8 - # output, and frees all intermediates immediately. No spill needed. - _func = partial( - _hotspots_chunk, - kernel=norm_kernel, - global_mean=global_mean, - global_std=global_std, - ) - out = data.map_overlap( - _func, + # Convolve each chunk (reading source + halo) and free the halo + # immediately. + convolved = data.map_overlap( + partial(_convolve_2d_numpy, kernel=norm_kernel), depth=(pad_h, pad_w), boundary=_boundary_to_dask(boundary), - meta=np.array((), dtype=np.int8), + meta=np.array((), dtype=np.float32), ) + + # z-score via broadcast of the lazy 0-d stats, then classify per block. + z_array = (convolved - global_mean) / global_std + out = z_array.map_blocks(_calc_hotspots_numpy, meta=np.array((), dtype=np.int8)) return out -def _hotspots_chunk(chunk, kernel, global_mean, global_std): - """Fused per-chunk: convolve -> z-score -> classify.""" - convolved = _convolve_2d_numpy(chunk, kernel) - z = (convolved - global_mean) / global_std - return _calc_hotspots_numpy(z) +def _calc_hotspots_cupy(z): + """Per-chunk GPU classification of a z-score array.""" + out = cupy.zeros_like(z, dtype=cupy.int8) + griddim, blockdim = cuda_args(z.shape) + _run_gpu_hotspots[griddim, blockdim](z, out) + return out def _hotspots_dask_cupy(raster, kernel, boundary='nan'): @@ -1333,37 +1323,27 @@ def _hotspots_dask_cupy(raster, kernel, boundary='nan'): if not cupy.issubdtype(data.dtype, cupy.floating): data = data.astype(cupy.float32) - # Pass 1: global statistics (two scalars, eager) - global_mean, global_std = da.compute(da.nanmean(data), da.nanstd(data)) - global_mean = np.float32(float(global_mean)) - global_std = np.float32(float(global_std)) - - if global_std == 0: - raise ZeroDivisionError( - "Standard deviation of the input raster values is 0." - ) + # Global statistics stay lazy: 0-d dask arrays that broadcast into the + # z-score below. Nothing is computed during graph construction. + global_mean = da.nanmean(data).astype(np.float32) + global_std = da.nanstd(data).astype(np.float32) norm_kernel = (kernel / kernel.sum()).astype(np.float32) pad_h = norm_kernel.shape[0] // 2 pad_w = norm_kernel.shape[1] // 2 - # Pass 2: fuse convolution + z-score + classification, all on the GPU. - # Reuse the _run_gpu_hotspots kernel (same as the single-GPU path) so - # each chunk stays on the device -- no host round trip per chunk. - def _chunk_fn(chunk): - convolved = _convolve_2d_cupy(chunk, norm_kernel) - z = (convolved - global_mean) / global_std - out = cupy.zeros_like(z, dtype=cupy.int8) - griddim, blockdim = cuda_args(z.shape) - _run_gpu_hotspots[griddim, blockdim](z, out) - return out - - out = data.map_overlap( - _chunk_fn, + # Convolve each chunk on the GPU (source + halo), free the halo, then + # classify the broadcast z-score per block -- each chunk stays on the + # device, no host round trip. + convolved = data.map_overlap( + partial(_convolve_2d_cupy, kernel=norm_kernel), depth=(pad_h, pad_w), boundary=_boundary_to_dask(boundary, is_cupy=True), - meta=cupy.array((), dtype=cupy.int8), + meta=cupy.array((), dtype=cupy.float32), ) + + z_array = (convolved - global_mean) / global_std + out = z_array.map_blocks(_calc_hotspots_cupy, meta=cupy.array((), dtype=cupy.int8)) return out diff --git a/xrspatial/tests/test_dask_laziness.py b/xrspatial/tests/test_dask_laziness.py index da4216f8a..5dff79db8 100644 --- a/xrspatial/tests/test_dask_laziness.py +++ b/xrspatial/tests/test_dask_laziness.py @@ -16,6 +16,7 @@ try: import dask import dask.array as da + import dask.callbacks # noqa: F401 (registers dask.callbacks.Callback) except ImportError: da = None @@ -32,6 +33,20 @@ def _is_lazy(result): return False +class _TaskCounter(dask.callbacks.Callback): + """Count dask tasks executed while the context is active. + + Used to assert that a function builds its graph lazily (zero tasks run + during the call) rather than eagerly triggering computation. + """ + + def __init__(self): + self.count = 0 + + def _posttask(self, key, result, dsk, state, worker_id): + self.count += 1 + + # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @@ -117,6 +132,21 @@ def test_hotspots(self, elev): kernel = np.ones((3, 3), dtype=np.float32) / 9 assert _is_lazy(hotspots(elev, kernel)) + def test_hotspots_no_eager_compute(self, elev): + # hotspots() must build the graph without running any dask tasks. + # Previously it eagerly computed global mean/std, executing ~12 + # tasks on the call itself (issue #2772). + from xrspatial.focal import hotspots + kernel = np.ones((3, 3), dtype=np.float32) / 9 + with _TaskCounter() as counter: + result = hotspots(elev, kernel) + assert counter.count == 0, ( + f"hotspots() ran {counter.count} dask tasks on call; expected 0 " + f"(should stay lazy)" + ) + # The graph must still compute to a real result. + assert result.compute().shape == elev.shape + # --------------------------------------------------------------------------- # Classification (partially lazy -- returns dask after computing small stats) From 29251b4d497cf25b69b918e6519161daba14bda4 Mon Sep 17 00:00:00 2001 From: Brendan Collins Date: Mon, 1 Jun 2026 10:47:47 -0700 Subject: [PATCH 2/2] Cast dask hotspots input to float32 to match convolve meta (#2772) Address review nit: the convolve map_overlap declared a float32 meta but float64 input stayed float64 through the intermediate. Cast to float32 up front in both dask paths, matching the single-array numpy path. --- xrspatial/focal.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/xrspatial/focal.py b/xrspatial/focal.py index a47b816d1..61f013832 100644 --- a/xrspatial/focal.py +++ b/xrspatial/focal.py @@ -1282,9 +1282,9 @@ def _hotspots_numpy(raster, kernel, boundary='nan'): def _hotspots_dask_numpy(raster, kernel, boundary='nan'): - data = raster.data - if not np.issubdtype(data.dtype, np.floating): - data = data.astype(np.float32) + # Match the numpy path: compute in float32 so the convolution and the + # float32 map_overlap meta agree regardless of input dtype. + data = raster.data.astype(np.float32) # Global statistics stay lazy: 0-d dask arrays that broadcast into the # z-score below. Nothing is computed during graph construction. @@ -1319,9 +1319,9 @@ def _calc_hotspots_cupy(z): def _hotspots_dask_cupy(raster, kernel, boundary='nan'): - data = raster.data - if not cupy.issubdtype(data.dtype, cupy.floating): - data = data.astype(cupy.float32) + # Match the numpy path: compute in float32 so the convolution and the + # float32 map_overlap meta agree regardless of input dtype. + data = raster.data.astype(cupy.float32) # Global statistics stay lazy: 0-d dask arrays that broadcast into the # z-score below. Nothing is computed during graph construction.