Prevent threaded Dask workers in DistRDF backend#22393
Conversation
Test Results 22 files 22 suites 3d 12h 10m 36s ⏱️ Results for commit 5bb11cb. |
5bb11cb to
edb506a
Compare
| self.client = (daskclient if daskclient is not None else | ||
| Client(LocalCluster(n_workers=os.cpu_count(), threads_per_worker=1, processes=True))) | ||
|
|
||
| workers = self.client.scheduler_info()["workers"] |
There was a problem hiding this comment.
| workers = self.client.scheduler_info()["workers"] | |
| workers = self.client.scheduler_info().get("workers", None) | |
| if workers is None: | |
| return |
There was a problem hiding this comment.
Thanks for the suggestion. I updated the code to safely handle the case where scheduler_info() does not contain worker information yet.
e938571 to
2eda199
Compare
2eda199 to
0c91ac5
Compare
vepadulano
left a comment
There was a problem hiding this comment.
Thank you @JAGANNATHANJP ! Before proceeding with the CI, I would like to request you to add a test for this new behaviour, perhaps in roottest/python/distrdf/backends/check_backend.py
|
Added a test covering the case where scheduler_info() does not provide worker information during Dask backend initialization. |
| """ | ||
| Check that DaskBackend initialization succeeds when scheduler_info | ||
| does not provide worker information. | ||
| """ | ||
| connection, backend = payload | ||
|
|
||
| if backend != "dask": | ||
| return | ||
|
|
||
| from ROOT._distrdf.Backends.Dask import Backend | ||
|
|
||
| original_scheduler_info = connection.scheduler_info | ||
|
|
||
| try: | ||
| connection.scheduler_info = lambda: {} | ||
| backend = Backend.DaskBackend(daskclient=connection) | ||
| assert backend.client is connection | ||
| finally: | ||
| connection.scheduler_info = original_scheduler_info |
There was a problem hiding this comment.
This is an interesting test, but I would appreciate if you added also some actual computation so we can check that on top of having a valid connection, RDataFrame can also still work with it.
Afterwards, there needs to be another test, namely that checks the new RuntimeError introduced with this PR
There was a problem hiding this comment.
Added a distributed RDataFrame computation to the missing-workers test to verify that execution still works when scheduler_info() does not provide worker information. Also added a new test that checks the expected RuntimeError is raised when using threaded Dask workers.
This PR prevents unsupported threaded Dask workers in DistRDF.
Distributed RDataFrame with Dask threads may lead to crashes and does not provide advantages due to Python GIL limitations. This change validates worker configuration at backend initialization and raises a RuntimeError when threaded workers are detected.
Suggested configuration: