From d36485f33782f874d47ebadd3c004adab1972428 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 9 Jun 2026 12:57:43 +0100 Subject: [PATCH] Use ProactorEventLoop on Windows; support SubprocessCluster on Windows --- distributed/config.py | 8 +++---- distributed/deploy/subprocess.py | 7 ------ distributed/deploy/tests/test_subprocess.py | 26 +-------------------- distributed/tests/test_client.py | 1 - pyproject.toml | 2 +- 5 files changed, 5 insertions(+), 39 deletions(-) diff --git a/distributed/config.py b/distributed/config.py index 9ee3b3a1f54..379c398b19d 100644 --- a/distributed/config.py +++ b/distributed/config.py @@ -190,6 +190,7 @@ def initialize_logging(config: dict[Any, Any]) -> None: def get_loop_factory() -> Callable[[], asyncio.AbstractEventLoop] | None: event_loop = dask.config.get("distributed.admin.event-loop") + if event_loop == "uvloop": uvloop = import_required( "uvloop", @@ -201,13 +202,10 @@ def get_loop_factory() -> Callable[[], asyncio.AbstractEventLoop] | None: " pip install uvloop", ) return uvloop.new_event_loop + if event_loop in {"asyncio", "tornado"}: - if sys.platform == "win32": - # ProactorEventLoop is not compatible with tornado 6 - # fallback to the pre-3.8 default of Selector - # https://github.com/tornadoweb/tornado/issues/2608 - return asyncio.SelectorEventLoop return None + raise ValueError( "Expected distributed.admin.event-loop to be in " f"('asyncio', 'tornado', 'uvloop'), got {event_loop}" diff --git a/distributed/deploy/subprocess.py b/distributed/deploy/subprocess.py index 1172272c2ba..22a154844ee 100644 --- a/distributed/deploy/subprocess.py +++ b/distributed/deploy/subprocess.py @@ -18,7 +18,6 @@ from dask.system import CPU_COUNT -from distributed.compatibility import WINDOWS from distributed.deploy.spec import ProcessInterface, SpecCluster from distributed.deploy.utils import nprocesses_nthreads from distributed.utils import Deadline @@ -31,9 +30,6 @@ class Subprocess(ProcessInterface, abc.ABC): process: asyncio.subprocess.Process | None def __init__(self): - if WINDOWS: - # FIXME: distributed#7434 - raise RuntimeError("Subprocess does not support Windows.") self.process = None super().__init__() @@ -233,9 +229,6 @@ def SubprocessCluster( >>> cluster.scale(3) # doctest: +SKIP """ - if WINDOWS: - # FIXME: distributed#7434 - raise RuntimeError("SubprocessCluster does not support Windows.") if not host: host = "127.0.0.1" worker_kwargs = worker_kwargs or {} diff --git a/distributed/deploy/tests/test_subprocess.py b/distributed/deploy/tests/test_subprocess.py index fa878f08c16..21f8f9c5b05 100644 --- a/distributed/deploy/tests/test_subprocess.py +++ b/distributed/deploy/tests/test_subprocess.py @@ -5,16 +5,10 @@ import pytest from distributed import Client -from distributed.compatibility import WINDOWS -from distributed.deploy.subprocess import ( - SubprocessCluster, - SubprocessScheduler, - SubprocessWorker, -) +from distributed.deploy.subprocess import SubprocessCluster from distributed.utils_test import gen_test, new_config_file -@pytest.mark.skipif(WINDOWS, reason="distributed#7434") @gen_test() async def test_basic(): async with SubprocessCluster( @@ -30,7 +24,6 @@ async def test_basic(): assert "Subprocess" in repr(cluster) -@pytest.mark.skipif(WINDOWS, reason="distributed#7434") @gen_test() async def test_n_workers(): async with SubprocessCluster( @@ -44,7 +37,6 @@ async def test_n_workers(): assert "Subprocess" in repr(cluster) -@pytest.mark.skipif(WINDOWS, reason="distributed#7434") @gen_test() async def test_scale_up_and_down(): async with SubprocessCluster( @@ -66,7 +58,6 @@ async def test_scale_up_and_down(): assert len(cluster.workers) == 1 -@pytest.mark.skipif(WINDOWS, reason="distributed#7434") @gen_test() async def test_raise_if_scheduler_fails_to_start(): with pytest.raises(RuntimeError, match="Scheduler failed to start"): @@ -74,7 +65,6 @@ async def test_raise_if_scheduler_fails_to_start(): pass -@pytest.mark.skipif(WINDOWS, reason="distributed#7434") @pytest.mark.slow @gen_test() async def test_subprocess_cluster_does_not_depend_on_logging(): @@ -87,17 +77,3 @@ async def test_subprocess_cluster_does_not_depend_on_logging(): ): result = await client.submit(lambda x: x + 1, 10) assert result == 11 - - -@pytest.mark.skipif( - not WINDOWS, reason="Windows-specific error testing (distributed#7434)" -) -def test_raise_on_windows(): - with pytest.raises(RuntimeError, match="not support Windows"): - SubprocessCluster() - - with pytest.raises(RuntimeError, match="not support Windows"): - SubprocessScheduler() - - with pytest.raises(RuntimeError, match="not support Windows"): - SubprocessWorker(scheduler="tcp://127.0.0.1:8786") diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index d2123d43ad4..59264f5f027 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -7339,7 +7339,6 @@ def test_upload_directory_invalid_mode(): UploadDirectory(".", mode="invalid") -@pytest.mark.skipif(WINDOWS, reason="distributed#7434") @pytest.mark.parametrize("mode", ["all", "scheduler"]) @gen_test() async def test_upload_directory_to_scheduler(mode, tmp_path): diff --git a/pyproject.toml b/pyproject.toml index aae96479d3b..1712c617ecd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -135,7 +135,7 @@ filterwarnings = [ '''ignore:elementwise comparison failed. this will raise an error in the future:DeprecationWarning''', '''ignore:unclosed