Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2917,6 +2917,16 @@ between themselves.

Suitable authentication keys can also be generated by using :func:`os.urandom`.

This authentication protects :class:`Listener` and :func:`Client` connections,
which are reachable by address. It is not applied to the anonymous pipes
created by :func:`~multiprocessing.Pipe` or used internally by
:class:`~multiprocessing.Queue`.
:mod:`multiprocessing` treats all local processes running as the same user as
trusted; on most operating systems such processes can access each other's pipe
file descriptors regardless. Applications that require isolation between
processes of the same user must arrange it at the operating-system level --
for example, by running workers under a different user account or in a sandbox.


Logging
^^^^^^^
Expand Down
217 changes: 151 additions & 66 deletions Lib/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,82 @@ def __repr__(self):
else:
_PopenSelector = selectors.SelectSelector

def _communicate_io_posix(selector, stdin, input_view, input_offset,
output_buffers, endtime, *, close_on_eof=False):
"""
Low-level POSIX I/O multiplexing loop used by Popen._communicate.

Handles the select loop for reading/writing but does not manage
stream lifecycle or raise timeout exceptions.

Args:
selector: A _PopenSelector with streams already registered
stdin: Writable file object for input, or None
input_view: memoryview of input bytes, or None
input_offset: Starting offset into input_view (for resume support)
output_buffers: Dict {file_object: list} to append read chunks to
endtime: Deadline timestamp, or None for no timeout
close_on_eof: If True, close output streams immediately when they
EOF rather than leaving them open for the caller to close.
Used by Popen._communicate() to match its historical behavior
of releasing fds as soon as the child closes the corresponding
pipe.

Returns:
(new_input_offset, completed)
- new_input_offset: How many bytes of input were written
- completed: True if all I/O finished, False if timed out

Note:
- Closes output streams on EOF only if close_on_eof=True
- Does NOT raise TimeoutExpired (caller handles)
- Appends to output_buffers lists in place
"""
stdin_fd = stdin.fileno() if stdin else None

while selector.get_map():
remaining = _deadline_remaining(endtime)
if remaining is not None and remaining <= 0:
return (input_offset, False) # Timed out

ready = selector.select(remaining)

# Check timeout after select (may have woken spuriously)
if endtime is not None and _time() > endtime:
return (input_offset, False) # Timed out

for key, events in ready:
if key.fd == stdin_fd:
chunk = input_view[input_offset:input_offset + _PIPE_BUF]
try:
input_offset += os.write(key.fd, chunk)
except BrokenPipeError:
selector.unregister(key.fd)
try:
stdin.close()
except BrokenPipeError:
pass
else:
if input_offset >= len(input_view):
selector.unregister(key.fd)
try:
stdin.close()
except BrokenPipeError:
pass
elif key.fileobj in output_buffers:
data = os.read(key.fd, 32768)
if not data:
selector.unregister(key.fileobj)
if close_on_eof:
try:
key.fileobj.close()
except OSError:
pass
else:
output_buffers[key.fileobj].append(data)

return (input_offset, True) # Completed


if _mswindows:
# On Windows we just need to close `Popen._handle` when we no longer need
Expand Down Expand Up @@ -289,6 +365,45 @@ def _cleanup():
DEVNULL = -3


def _deadline_remaining(endtime):
"""Calculate remaining time until deadline."""
if endtime is None:
return None
return endtime - _time()


def _flush_stdin(stdin):
"""Flush stdin, ignoring BrokenPipeError and closed file ValueError."""
try:
stdin.flush()
except BrokenPipeError:
pass # communicate() must ignore BrokenPipeError.
except ValueError:
# Ignore ValueError: I/O operation on closed file.
if not stdin.closed:
raise


def _make_input_view(input_data):
"""Convert input data to a byte memoryview for writing.

Handles the case where input_data is already a memoryview with
non-byte elements (e.g., int32 array) by casting to a byte view.
This ensures len(view) returns the byte count, not element count.
"""
if not input_data:
return None
if isinstance(input_data, memoryview):
return input_data.cast("b") # ensure byte view for correct len()
return memoryview(input_data)


def _translate_newlines(data, encoding, errors):
"""Decode bytes to str and translate newlines to \n."""
data = data.decode(encoding, errors)
return data.replace("\r\n", "\n").replace("\r", "\n")


# XXX This function is only used by multiprocessing and the test suite,
# but it's here so that it can be imported when Python is compiled without
# threads.
Expand Down Expand Up @@ -1149,8 +1264,8 @@ def universal_newlines(self, universal_newlines):
self.text_mode = bool(universal_newlines)

def _translate_newlines(self, data, encoding, errors):
data = data.decode(encoding, errors)
return data.replace("\r\n", "\n").replace("\r", "\n")
# Subclass-overridable hook; defers to the module-level helper.
return _translate_newlines(data, encoding, errors)

def __enter__(self):
return self
Expand Down Expand Up @@ -1277,7 +1392,7 @@ def communicate(self, input=None, timeout=None):
# See the detailed comment in .wait().
if timeout is not None:
sigint_timeout = min(self._sigint_wait_secs,
self._remaining_time(endtime))
_deadline_remaining(endtime))
else:
sigint_timeout = self._sigint_wait_secs
self._sigint_wait_secs = 0 # nothing else should wait.
Expand All @@ -1290,7 +1405,7 @@ def communicate(self, input=None, timeout=None):
finally:
self._communication_started = True
try:
self.wait(timeout=self._remaining_time(endtime))
self.wait(timeout=_deadline_remaining(endtime))
except TimeoutExpired as exc:
exc.timeout = timeout
raise
Expand All @@ -1304,14 +1419,6 @@ def poll(self):
return self._internal_poll()


def _remaining_time(self, endtime):
"""Convenience for _communicate when computing timeouts."""
if endtime is None:
return None
else:
return endtime - _time()


def _check_timeout(self, endtime, orig_timeout, stdout_seq, stderr_seq,
skip_check_and_raise=False):
"""Convenience for checking if a timeout has expired."""
Expand All @@ -1337,7 +1444,7 @@ def wait(self, timeout=None):
# generated SIGINT and will exit rapidly.
if timeout is not None:
sigint_timeout = min(self._sigint_wait_secs,
self._remaining_time(endtime))
_deadline_remaining(endtime))
else:
sigint_timeout = self._sigint_wait_secs
self._sigint_wait_secs = 0 # nothing else should wait.
Expand Down Expand Up @@ -1704,19 +1811,19 @@ def _communicate(self, input, endtime, orig_timeout):
# thread remains writing and the fd left open in case the user
# calls communicate again.
if hasattr(self, "_stdin_thread"):
self._stdin_thread.join(self._remaining_time(endtime))
self._stdin_thread.join(_deadline_remaining(endtime))
if self._stdin_thread.is_alive():
raise TimeoutExpired(self.args, orig_timeout)

# Wait for the reader threads, or time out. If we time out, the
# threads remain reading and the fds left open in case the user
# calls communicate again.
if self.stdout is not None:
self.stdout_thread.join(self._remaining_time(endtime))
self.stdout_thread.join(_deadline_remaining(endtime))
if self.stdout_thread.is_alive():
raise TimeoutExpired(self.args, orig_timeout)
if self.stderr is not None:
self.stderr_thread.join(self._remaining_time(endtime))
self.stderr_thread.join(_deadline_remaining(endtime))
if self.stderr_thread.is_alive():
raise TimeoutExpired(self.args, orig_timeout)

Expand Down Expand Up @@ -2210,7 +2317,7 @@ def _wait(self, timeout):
break
finally:
self._waitpid_lock.release()
remaining = self._remaining_time(endtime)
remaining = _deadline_remaining(endtime)
if remaining <= 0:
raise TimeoutExpired(self.args, timeout)
delay = min(delay * 2, remaining, .05)
Expand All @@ -2234,14 +2341,7 @@ def _communicate(self, input, endtime, orig_timeout):
if self.stdin and not self._communication_started:
# Flush stdio buffer. This might block, if the user has
# been writing to .stdin in an uncontrolled fashion.
try:
self.stdin.flush()
except BrokenPipeError:
pass # communicate() must ignore BrokenPipeError.
except ValueError:
# ignore ValueError: I/O operation on closed file.
if not self.stdin.closed:
raise
_flush_stdin(self.stdin)
if not input:
try:
self.stdin.close()
Expand All @@ -2266,11 +2366,8 @@ def _communicate(self, input, endtime, orig_timeout):

self._save_input(input)

if self._input:
if not isinstance(self._input, memoryview):
input_view = memoryview(self._input)
else:
input_view = self._input.cast("b") # byte input required
input_view = _make_input_view(self._input)
input_offset = self._input_offset if self._input else 0

with _PopenSelector() as selector:
if self.stdin and not self.stdin.closed and self._input:
Expand All @@ -2280,43 +2377,31 @@ def _communicate(self, input, endtime, orig_timeout):
if self.stderr and not self.stderr.closed:
selector.register(self.stderr, selectors.EVENT_READ)

while selector.get_map():
timeout = self._remaining_time(endtime)
if timeout is not None and timeout <= 0:
self._check_timeout(endtime, orig_timeout,
stdout, stderr,
skip_check_and_raise=True)
raise RuntimeError( # Impossible :)
'_check_timeout(..., skip_check_and_raise=True) '
'failed to raise TimeoutExpired.')

ready = selector.select(timeout)
self._check_timeout(endtime, orig_timeout, stdout, stderr)

# XXX Rewrite these to use non-blocking I/O on the file
# objects; they are no longer using C stdio!

for key, events in ready:
if key.fileobj is self.stdin:
chunk = input_view[self._input_offset :
self._input_offset + _PIPE_BUF]
try:
self._input_offset += os.write(key.fd, chunk)
except BrokenPipeError:
selector.unregister(key.fileobj)
key.fileobj.close()
else:
if self._input_offset >= len(input_view):
selector.unregister(key.fileobj)
key.fileobj.close()
elif key.fileobj in (self.stdout, self.stderr):
data = os.read(key.fd, 32768)
if not data:
selector.unregister(key.fileobj)
key.fileobj.close()
self._fileobj2output[key.fileobj].append(data)
stdin_to_write = (self.stdin if self.stdin and self._input
and not self.stdin.closed else None)
# Persist the returned offset on self so a subsequent
# communicate() after a TimeoutExpired resumes mid-input
# rather than re-sending bytes the child already consumed.
new_offset, completed = _communicate_io_posix(
selector,
stdin_to_write,
input_view,
input_offset,
self._fileobj2output,
endtime,
close_on_eof=True)
if self._input:
self._input_offset = new_offset

if not completed:
self._check_timeout(endtime, orig_timeout, stdout, stderr,
skip_check_and_raise=True)
raise RuntimeError( # Impossible :)
'_check_timeout(..., skip_check_and_raise=True) '
'failed to raise TimeoutExpired.')

try:
self.wait(timeout=self._remaining_time(endtime))
self.wait(timeout=_deadline_remaining(endtime))
except TimeoutExpired as exc:
exc.timeout = orig_timeout
raise
Expand Down
33 changes: 33 additions & 0 deletions Lib/test/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,39 @@ def test_communicate_timeout_large_input(self):
p.kill()
p.wait()

def test_communicate_timeout_resume_partial_write(self):
"""Resume writing input after a partial-write TimeoutExpired.

Exercises the _input_offset bookkeeping across the
_communicate_io_posix factoring: a first communicate() must time out
mid-write, and a subsequent communicate() must finish delivering the
remaining bytes so the child receives the full input intact.
"""
# 1 MiB easily exceeds typical pipe buffers (~64 KiB) so writing
# blocks once the buffer fills before the child starts reading.
input_data = bytes(range(256)) * 4096 # 1 MiB, distinctive pattern
self.assertEqual(len(input_data), 1024 * 1024)

p = subprocess.Popen(
[sys.executable, "-c",
"import sys, time; "
"time.sleep(0.5); "
"sys.stdout.buffer.write(sys.stdin.buffer.read())"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
try:
with self.assertRaises(subprocess.TimeoutExpired):
p.communicate(input_data, timeout=0.05)

# Resume: no new input, generous timeout to avoid CI flakes.
stdout, stderr = p.communicate(timeout=support.LONG_TIMEOUT)
self.assertEqual(len(stdout), len(input_data))
self.assertEqual(stdout, input_data)
finally:
p.kill()
p.wait()

# Test for the fd leak reported in http://bugs.python.org/issue2791.
def test_communicate_pipe_fd_leak(self):
for stdin_pipe in (False, True):
Expand Down
Loading