Skip to content

Commit a268d3f

Browse files
miss-islingtongpsheaditamaro
authored
[3.13] gh-146313: Fix multiprocessing ResourceTracker deadlock after os.fork() (GH-146316) (#148426)
gh-146313: Fix multiprocessing ResourceTracker deadlock after os.fork() (GH-146316) `ResourceTracker.__del__` (added in gh-88887 circa Python 3.12) calls os.waitpid(pid, 0) which blocks indefinitely if a process created via os.fork() still holds the tracker pipe's write end. The tracker never sees EOF, never exits, and the parent hangs at interpreter shutdown. Fix with two layers: - **At-fork handler.** An os.register_at_fork(after_in_child=...) handler closes the inherited pipe fd in the child unless a preserve flag is set. popen_fork.Popen._launch() sets the flag before its fork so mp.Process(fork) children keep the fd and reuse the parent's tracker (preserving gh-80849). Raw os.fork() children close the fd, letting the parent reap promptly. - **Timeout safety-net.** _stop_locked() gains a wait_timeout parameter. When called from `__del__`, it polls with WNOHANG using exponential backoff for up to 1 second instead of blocking indefinitely. The at-fork handler makes this unreachable in well-behaved paths; it remains for abnormal shutdowns. (cherry picked from commit 3a7df63) Co-authored-by: Gregory P. Smith <68491+gpshead@users.noreply.github.com> Co-authored-by: Itamar Oren <itamarost@gmail.com>
1 parent 4830d29 commit a268d3f

File tree

4 files changed

+279
-8
lines changed

4 files changed

+279
-8
lines changed

Lib/multiprocessing/popen_fork.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,17 @@ def _launch(self, process_obj):
6464
code = 1
6565
parent_r, child_w = os.pipe()
6666
child_r, parent_w = os.pipe()
67-
self.pid = os.fork()
67+
# gh-146313: Tell the resource tracker's at-fork handler to keep
68+
# the inherited pipe fd so this child reuses the parent's tracker
69+
# (gh-80849) rather than closing it and launching its own.
70+
from .resource_tracker import _fork_intent
71+
_fork_intent.preserve_fd = True
72+
try:
73+
self.pid = os.fork()
74+
finally:
75+
# Reset in both parent and child so the flag does not leak
76+
# into a subsequent raw os.fork() or nested Process launch.
77+
_fork_intent.preserve_fd = False
6878
if self.pid == 0:
6979
try:
7080
atexit._clear()

Lib/multiprocessing/resource_tracker.py

Lines changed: 84 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import signal
2121
import sys
2222
import threading
23+
import time
2324
import warnings
2425
from collections import deque
2526

@@ -79,6 +80,10 @@ def __init__(self):
7980
# The reader should understand all formats.
8081
self._use_simple_format = True
8182

83+
# Set to True by _stop_locked() if the waitpid polling loop ran to
84+
# its timeout without reaping the tracker. Exposed for tests.
85+
self._waitpid_timed_out = False
86+
8287
def _reentrant_call_error(self):
8388
# gh-109629: this happens if an explicit call to the ResourceTracker
8489
# gets interrupted by a garbage collection, invoking a finalizer (*)
@@ -91,16 +96,51 @@ def __del__(self):
9196
# making sure child processess are cleaned before ResourceTracker
9297
# gets destructed.
9398
# see https://github.com/python/cpython/issues/88887
94-
self._stop(use_blocking_lock=False)
99+
# gh-146313: use a timeout to avoid deadlocking if a forked child
100+
# still holds the pipe's write end open.
101+
self._stop(use_blocking_lock=False, wait_timeout=1.0)
102+
103+
def _after_fork_in_child(self):
104+
# gh-146313: Called in the child right after os.fork().
105+
#
106+
# The tracker process is a child of the *parent*, not of us, so we
107+
# could never waitpid() it anyway. Clearing _pid means our __del__
108+
# becomes a no-op (the early return for _pid is None).
109+
#
110+
# Whether we keep the inherited _fd depends on who forked us:
111+
#
112+
# - multiprocessing.Process with the 'fork' start method sets
113+
# _fork_intent.preserve_fd before forking. The child keeps the
114+
# fd and reuses the parent's tracker (gh-80849). This is safe
115+
# because multiprocessing's atexit handler joins all children
116+
# before the parent's __del__ runs, so by then the fd copies
117+
# are gone and the parent can reap the tracker promptly.
118+
#
119+
# - A raw os.fork() leaves the flag unset. We close the fd in the child after forking so
120+
# the parent's __del__ can reap the tracker without waiting
121+
# for the child to exit. If we later need a tracker, ensure_running()
122+
# will launch a fresh one.
123+
self._lock._at_fork_reinit()
124+
self._reentrant_messages.clear()
125+
self._pid = None
126+
self._exitcode = None
127+
if (self._fd is not None and
128+
not getattr(_fork_intent, 'preserve_fd', False)):
129+
fd = self._fd
130+
self._fd = None
131+
try:
132+
os.close(fd)
133+
except OSError:
134+
pass
95135

96-
def _stop(self, use_blocking_lock=True):
136+
def _stop(self, use_blocking_lock=True, wait_timeout=None):
97137
if use_blocking_lock:
98138
with self._lock:
99-
self._stop_locked()
139+
self._stop_locked(wait_timeout=wait_timeout)
100140
else:
101141
acquired = self._lock.acquire(blocking=False)
102142
try:
103-
self._stop_locked()
143+
self._stop_locked(wait_timeout=wait_timeout)
104144
finally:
105145
if acquired:
106146
self._lock.release()
@@ -110,6 +150,10 @@ def _stop_locked(
110150
close=os.close,
111151
waitpid=os.waitpid,
112152
waitstatus_to_exitcode=os.waitstatus_to_exitcode,
153+
monotonic=time.monotonic,
154+
sleep=time.sleep,
155+
WNOHANG=getattr(os, 'WNOHANG', None),
156+
wait_timeout=None,
113157
):
114158
# This shouldn't happen (it might when called by a finalizer)
115159
# so we check for it anyway.
@@ -126,7 +170,30 @@ def _stop_locked(
126170
self._fd = None
127171

128172
try:
129-
_, status = waitpid(self._pid, 0)
173+
if wait_timeout is None:
174+
_, status = waitpid(self._pid, 0)
175+
else:
176+
# gh-146313: A forked child may still hold the pipe's write
177+
# end open, preventing the tracker from seeing EOF and
178+
# exiting. Poll with WNOHANG to avoid blocking forever.
179+
deadline = monotonic() + wait_timeout
180+
delay = 0.001
181+
while True:
182+
result_pid, status = waitpid(self._pid, WNOHANG)
183+
if result_pid != 0:
184+
break
185+
remaining = deadline - monotonic()
186+
if remaining <= 0:
187+
# The tracker is still running; it will be
188+
# reparented to PID 1 (or the nearest subreaper)
189+
# when we exit, and reaped there once all pipe
190+
# holders release their fd.
191+
self._pid = None
192+
self._exitcode = None
193+
self._waitpid_timed_out = True
194+
return
195+
delay = min(delay * 2, remaining, 0.1)
196+
sleep(delay)
130197
except ChildProcessError:
131198
self._pid = None
132199
self._exitcode = None
@@ -312,12 +379,24 @@ def _send(self, cmd, name, rtype):
312379

313380
self._ensure_running_and_write(msg)
314381

382+
# gh-146313: Per-thread flag set by .popen_fork.Popen._launch() just before
383+
# os.fork(), telling _after_fork_in_child() to keep the inherited pipe fd so
384+
# the child can reuse this tracker (gh-80849). Unset for raw os.fork() calls,
385+
# where the child instead closes the fd so the parent's __del__ can reap the
386+
# tracker. Using threading.local() keeps multiple threads calling
387+
# popen_fork.Popen._launch() at once from clobbering eachothers intent.
388+
_fork_intent = threading.local()
389+
315390
_resource_tracker = ResourceTracker()
316391
ensure_running = _resource_tracker.ensure_running
317392
register = _resource_tracker.register
318393
unregister = _resource_tracker.unregister
319394
getfd = _resource_tracker.getfd
320395

396+
# gh-146313: See _after_fork_in_child docstring.
397+
if hasattr(os, 'register_at_fork'):
398+
os.register_at_fork(after_in_child=_resource_tracker._after_fork_in_child)
399+
321400

322401
def _decode_message(line):
323402
if line.startswith(b'{'):

Lib/test/_test_multiprocessing.py

Lines changed: 180 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6007,8 +6007,9 @@ def test_resource_tracker_sigkill(self):
60076007
def _is_resource_tracker_reused(conn, pid):
60086008
from multiprocessing.resource_tracker import _resource_tracker
60096009
_resource_tracker.ensure_running()
6010-
# The pid should be None in the child process, expect for the fork
6011-
# context. It should not be a new value.
6010+
# The pid should be None in the child (the at-fork handler clears
6011+
# it for fork; spawn/forkserver children never had it set). It
6012+
# should not be a new value.
60126013
reused = _resource_tracker._pid in (None, pid)
60136014
reused &= _resource_tracker._check_alive()
60146015
conn.send(reused)
@@ -6093,6 +6094,183 @@ def test_resource_tracker_blocked_signals(self):
60936094
# restore sigmask to what it was before executing test
60946095
signal.pthread_sigmask(signal.SIG_SETMASK, orig_sigmask)
60956096

6097+
@only_run_in_forkserver_testsuite("avoids redundant testing.")
6098+
def test_resource_tracker_fork_deadlock(self):
6099+
# gh-146313: ResourceTracker.__del__ used to deadlock if a forked
6100+
# child still held the pipe's write end open when the parent
6101+
# exited, because the parent would block in waitpid() waiting for
6102+
# the tracker to exit, but the tracker would never see EOF.
6103+
cmd = '''if 1:
6104+
import os, signal
6105+
from multiprocessing.resource_tracker import ensure_running
6106+
ensure_running()
6107+
if os.fork() == 0:
6108+
signal.pause()
6109+
os._exit(0)
6110+
# parent falls through and exits, triggering __del__
6111+
'''
6112+
proc = subprocess.Popen([sys.executable, '-c', cmd],
6113+
start_new_session=True)
6114+
try:
6115+
try:
6116+
proc.wait(timeout=support.SHORT_TIMEOUT)
6117+
except subprocess.TimeoutExpired:
6118+
self.fail(
6119+
"Parent process deadlocked in ResourceTracker.__del__"
6120+
)
6121+
self.assertEqual(proc.returncode, 0)
6122+
finally:
6123+
try:
6124+
os.killpg(proc.pid, signal.SIGKILL)
6125+
except ProcessLookupError:
6126+
pass
6127+
proc.wait()
6128+
6129+
@only_run_in_forkserver_testsuite("avoids redundant testing.")
6130+
def test_resource_tracker_mp_fork_reuse_and_prompt_reap(self):
6131+
# gh-146313 / gh-80849: A child started via multiprocessing.Process
6132+
# with the 'fork' start method should reuse the parent's resource
6133+
# tracker (the at-fork handler preserves the inherited pipe fd),
6134+
# *and* the parent should be able to reap the tracker promptly
6135+
# after joining the child, without hitting the waitpid timeout.
6136+
cmd = textwrap.dedent('''
6137+
import multiprocessing as mp
6138+
from multiprocessing.resource_tracker import _resource_tracker
6139+
6140+
def child(conn):
6141+
# Prove we can talk to the parent's tracker by registering
6142+
# and unregistering a dummy resource over the inherited fd.
6143+
# If the fd were closed, ensure_running would launch a new
6144+
# tracker and _pid would be non-None.
6145+
_resource_tracker.register("x", "dummy")
6146+
_resource_tracker.unregister("x", "dummy")
6147+
conn.send((_resource_tracker._fd is not None,
6148+
_resource_tracker._pid is None,
6149+
_resource_tracker._check_alive()))
6150+
6151+
if __name__ == "__main__":
6152+
mp.set_start_method("fork")
6153+
_resource_tracker.ensure_running()
6154+
r, w = mp.Pipe(duplex=False)
6155+
p = mp.Process(target=child, args=(w,))
6156+
p.start()
6157+
child_has_fd, child_pid_none, child_alive = r.recv()
6158+
p.join()
6159+
w.close(); r.close()
6160+
6161+
# Now simulate __del__: the child has exited and released
6162+
# its fd copy, so the tracker should see EOF and exit
6163+
# promptly -- no timeout.
6164+
_resource_tracker._stop(wait_timeout=5.0)
6165+
print(child_has_fd, child_pid_none, child_alive,
6166+
_resource_tracker._waitpid_timed_out,
6167+
_resource_tracker._exitcode)
6168+
''')
6169+
rc, out, err = script_helper.assert_python_ok('-c', cmd)
6170+
parts = out.decode().split()
6171+
self.assertEqual(parts, ['True', 'True', 'True', 'False', '0'],
6172+
f"unexpected: {parts!r} stderr={err!r}")
6173+
6174+
@only_run_in_forkserver_testsuite("avoids redundant testing.")
6175+
def test_resource_tracker_raw_fork_prompt_reap(self):
6176+
# gh-146313: After a raw os.fork() the at-fork handler closes the
6177+
# child's inherited fd, so the parent can reap the tracker
6178+
# immediately -- even while the child is still alive -- rather
6179+
# than waiting out the 1s timeout.
6180+
cmd = textwrap.dedent('''
6181+
import os, signal
6182+
from multiprocessing.resource_tracker import _resource_tracker
6183+
6184+
_resource_tracker.ensure_running()
6185+
r, w = os.pipe()
6186+
pid = os.fork()
6187+
if pid == 0:
6188+
os.close(r)
6189+
# Report whether our fd was closed by the at-fork handler.
6190+
os.write(w, b"1" if _resource_tracker._fd is None else b"0")
6191+
os.close(w)
6192+
signal.pause() # stay alive so parent's reap is meaningful
6193+
os._exit(0)
6194+
os.close(w)
6195+
child_fd_closed = os.read(r, 1) == b"1"
6196+
os.close(r)
6197+
6198+
# Child is still alive and paused. Because it closed its fd
6199+
# copy, our close below is the last one and the tracker exits.
6200+
_resource_tracker._stop(wait_timeout=5.0)
6201+
6202+
os.kill(pid, signal.SIGKILL)
6203+
os.waitpid(pid, 0)
6204+
print(child_fd_closed,
6205+
_resource_tracker._waitpid_timed_out,
6206+
_resource_tracker._exitcode)
6207+
''')
6208+
rc, out, err = script_helper.assert_python_ok('-c', cmd)
6209+
parts = out.decode().split()
6210+
self.assertEqual(parts, ['True', 'False', '0'],
6211+
f"unexpected: {parts!r} stderr={err!r}")
6212+
6213+
@only_run_in_forkserver_testsuite("avoids redundant testing.")
6214+
def test_resource_tracker_lock_reinit_after_fork(self):
6215+
# gh-146313: If a parent thread held the tracker's lock at fork
6216+
# time, the child would inherit the held lock and deadlock on
6217+
# its next ensure_running(). The at-fork handler reinits it.
6218+
cmd = textwrap.dedent('''
6219+
import os, threading
6220+
from multiprocessing.resource_tracker import _resource_tracker
6221+
6222+
held = threading.Event()
6223+
release = threading.Event()
6224+
def hold():
6225+
with _resource_tracker._lock:
6226+
held.set()
6227+
release.wait()
6228+
t = threading.Thread(target=hold)
6229+
t.start()
6230+
held.wait()
6231+
6232+
pid = os.fork()
6233+
if pid == 0:
6234+
ok = _resource_tracker._lock.acquire(timeout=5.0)
6235+
os._exit(0 if ok else 1)
6236+
6237+
release.set()
6238+
t.join()
6239+
_, status = os.waitpid(pid, 0)
6240+
print(os.waitstatus_to_exitcode(status))
6241+
''')
6242+
rc, out, err = script_helper.assert_python_ok(
6243+
'-W', 'ignore::DeprecationWarning', '-c', cmd)
6244+
self.assertEqual(out.strip(), b'0',
6245+
f"child failed to acquire lock: stderr={err!r}")
6246+
6247+
@only_run_in_forkserver_testsuite("avoids redundant testing.")
6248+
def test_resource_tracker_safety_net_timeout(self):
6249+
# gh-146313: When an mp.Process(fork) child holds the preserved
6250+
# fd and the parent calls _stop() without joining (simulating
6251+
# abnormal shutdown), the safety-net timeout should fire rather
6252+
# than deadlocking.
6253+
cmd = textwrap.dedent('''
6254+
import multiprocessing as mp
6255+
import signal
6256+
from multiprocessing.resource_tracker import _resource_tracker
6257+
6258+
if __name__ == "__main__":
6259+
mp.set_start_method("fork")
6260+
_resource_tracker.ensure_running()
6261+
p = mp.Process(target=signal.pause)
6262+
p.start()
6263+
# Stop WITHOUT joining -- child still holds preserved fd
6264+
_resource_tracker._stop(wait_timeout=0.5)
6265+
print(_resource_tracker._waitpid_timed_out)
6266+
p.terminate()
6267+
p.join()
6268+
''')
6269+
rc, out, err = script_helper.assert_python_ok('-c', cmd)
6270+
self.assertEqual(out.strip(), b'True',
6271+
f"safety-net timeout did not fire: stderr={err!r}")
6272+
6273+
60966274
class TestSimpleQueue(unittest.TestCase):
60976275

60986276
@classmethod
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Fix a deadlock in :mod:`multiprocessing`'s resource tracker
2+
where the parent process could hang indefinitely in :func:`os.waitpid`
3+
during interpreter shutdown if a child created via :func:`os.fork` still
4+
held the resource tracker's pipe open.

0 commit comments

Comments
 (0)