Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions distributed/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def initialize_logging(config: dict[Any, Any]) -> None:

def get_loop_factory() -> Callable[[], asyncio.AbstractEventLoop] | None:
event_loop = dask.config.get("distributed.admin.event-loop")

if event_loop == "uvloop":
uvloop = import_required(
"uvloop",
Expand All @@ -201,13 +202,10 @@ def get_loop_factory() -> Callable[[], asyncio.AbstractEventLoop] | None:
" pip install uvloop",
)
return uvloop.new_event_loop

if event_loop in {"asyncio", "tornado"}:
if sys.platform == "win32":
# ProactorEventLoop is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
# https://github.com/tornadoweb/tornado/issues/2608
return asyncio.SelectorEventLoop
return None

raise ValueError(
"Expected distributed.admin.event-loop to be in "
f"('asyncio', 'tornado', 'uvloop'), got {event_loop}"
Expand Down
7 changes: 0 additions & 7 deletions distributed/deploy/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

from dask.system import CPU_COUNT

from distributed.compatibility import WINDOWS
from distributed.deploy.spec import ProcessInterface, SpecCluster
from distributed.deploy.utils import nprocesses_nthreads
from distributed.utils import Deadline
Expand All @@ -31,9 +30,6 @@ class Subprocess(ProcessInterface, abc.ABC):
process: asyncio.subprocess.Process | None

def __init__(self):
if WINDOWS:
# FIXME: distributed#7434
raise RuntimeError("Subprocess does not support Windows.")
self.process = None
super().__init__()

Expand Down Expand Up @@ -233,9 +229,6 @@ def SubprocessCluster(

>>> cluster.scale(3) # doctest: +SKIP
"""
if WINDOWS:
# FIXME: distributed#7434
raise RuntimeError("SubprocessCluster does not support Windows.")
if not host:
host = "127.0.0.1"
worker_kwargs = worker_kwargs or {}
Expand Down
26 changes: 1 addition & 25 deletions distributed/deploy/tests/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,10 @@
import pytest

from distributed import Client
from distributed.compatibility import WINDOWS
from distributed.deploy.subprocess import (
SubprocessCluster,
SubprocessScheduler,
SubprocessWorker,
)
from distributed.deploy.subprocess import SubprocessCluster
from distributed.utils_test import gen_test, new_config_file


@pytest.mark.skipif(WINDOWS, reason="distributed#7434")
@gen_test()
async def test_basic():
async with SubprocessCluster(
Expand All @@ -30,7 +24,6 @@ async def test_basic():
assert "Subprocess" in repr(cluster)


@pytest.mark.skipif(WINDOWS, reason="distributed#7434")
@gen_test()
async def test_n_workers():
async with SubprocessCluster(
Expand All @@ -44,7 +37,6 @@ async def test_n_workers():
assert "Subprocess" in repr(cluster)


@pytest.mark.skipif(WINDOWS, reason="distributed#7434")
@gen_test()
async def test_scale_up_and_down():
async with SubprocessCluster(
Expand All @@ -66,15 +58,13 @@ async def test_scale_up_and_down():
assert len(cluster.workers) == 1


@pytest.mark.skipif(WINDOWS, reason="distributed#7434")
@gen_test()
async def test_raise_if_scheduler_fails_to_start():
with pytest.raises(RuntimeError, match="Scheduler failed to start"):
async with SubprocessCluster(scheduler_port=-1, asynchronous=True):
pass


@pytest.mark.skipif(WINDOWS, reason="distributed#7434")
@pytest.mark.slow
@gen_test()
async def test_subprocess_cluster_does_not_depend_on_logging():
Expand All @@ -87,17 +77,3 @@ async def test_subprocess_cluster_does_not_depend_on_logging():
):
result = await client.submit(lambda x: x + 1, 10)
assert result == 11


@pytest.mark.skipif(
not WINDOWS, reason="Windows-specific error testing (distributed#7434)"
)
def test_raise_on_windows():
with pytest.raises(RuntimeError, match="not support Windows"):
SubprocessCluster()

with pytest.raises(RuntimeError, match="not support Windows"):
SubprocessScheduler()

with pytest.raises(RuntimeError, match="not support Windows"):
SubprocessWorker(scheduler="tcp://127.0.0.1:8786")
1 change: 0 additions & 1 deletion distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7339,7 +7339,6 @@ def test_upload_directory_invalid_mode():
UploadDirectory(".", mode="invalid")


@pytest.mark.skipif(WINDOWS, reason="distributed#7434")
@pytest.mark.parametrize("mode", ["all", "scheduler"])
@gen_test()
async def test_upload_directory_to_scheduler(mode, tmp_path):
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ filterwarnings = [
'''ignore:elementwise comparison failed. this will raise an error in the future:DeprecationWarning''',
'''ignore:unclosed <socket\.socket.*:ResourceWarning''',
'''ignore:unclosed context <zmq\.asyncio\.Context\(\).*:ResourceWarning''',
'''ignore:unclosed event loop <_(Unix|Windows)SelectorEventLoop.*:ResourceWarning''',
'''ignore:unclosed event loop <(_UnixSelector|Proactor)EventLoop.*:ResourceWarning''',
'''ignore:unclosed file <_io.BufferedWriter.*:ResourceWarning''',
'''ignore:unclosed file <_io.TextIOWrapper.*:ResourceWarning''',
'''ignore:unclosed transport <_SelectorSocketTransport.*:ResourceWarning''',
Expand Down
Loading