From 53f6e919dbae7b65627018f015014e0fe70b4d92 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Sat, 18 Apr 2026 15:02:53 +0100 Subject: [PATCH] Optimise decompression size (#12357) --- aiohttp/compression_utils.py | 10 ++++- aiohttp/http_parser.py | 9 +++-- aiohttp/streams.py | 63 +++++++++++++++++-------------- aiohttp/web_request.py | 4 ++ tests/test_flowcontrol_streams.py | 2 +- 5 files changed, 53 insertions(+), 35 deletions(-) diff --git a/aiohttp/compression_utils.py b/aiohttp/compression_utils.py index 8c6dd5877b7..9373344e65c 100644 --- a/aiohttp/compression_utils.py +++ b/aiohttp/compression_utils.py @@ -326,9 +326,15 @@ def decompress_sync( ) -> bytes: """Decompress the given data.""" if hasattr(self._obj, "decompress"): - result = cast(bytes, self._obj.decompress(data, max_length)) + if max_length == ZLIB_MAX_LENGTH_UNLIMITED: + result = cast(bytes, self._obj.decompress(data)) + else: + result = cast(bytes, self._obj.decompress(data, max_length)) else: - result = cast(bytes, self._obj.process(data, max_length)) + if max_length == ZLIB_MAX_LENGTH_UNLIMITED: + result = cast(bytes, self._obj.process(data)) + else: + result = cast(bytes, self._obj.process(data, max_length)) # Only way to know that brotli has no further data is checking we get no output self._last_empty = result == b"" return result diff --git a/aiohttp/http_parser.py b/aiohttp/http_parser.py index 43485354bb5..60c26e58577 100644 --- a/aiohttp/http_parser.py +++ b/aiohttp/http_parser.py @@ -2,6 +2,7 @@ import asyncio import re import string +import sys from contextlib import suppress from enum import IntEnum from re import Pattern @@ -1118,10 +1119,12 @@ def feed_data(self, chunk: bytes) -> bool: encoding=self.encoding, suppress_deflate_header=True ) + low_water = self.out._low_water + max_length = ( + 0 if low_water >= sys.maxsize else max(self._max_decompress_size, low_water) + ) try: - chunk = self.decompressor.decompress_sync( - chunk, max_length=self._max_decompress_size - ) + chunk = self.decompressor.decompress_sync(chunk, max_length=max_length) except Exception: raise ContentEncodingError( "Can not decode content-encoding: %s" % self.encoding diff --git a/aiohttp/streams.py b/aiohttp/streams.py index 219d6b7c535..28280ed286c 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -1,5 +1,6 @@ import asyncio import collections +import sys import warnings from collections.abc import Awaitable, Callable from typing import Final, Generic, TypeVar @@ -67,31 +68,7 @@ async def __anext__(self) -> tuple[bytes, bool]: return rv -class AsyncStreamReaderMixin: - - __slots__ = () - - def __aiter__(self) -> AsyncStreamIterator[bytes]: - return AsyncStreamIterator(self.readline) # type: ignore[attr-defined] - - def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]: - """Returns an asynchronous iterator that yields chunks of size n.""" - return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined] - - def iter_any(self) -> AsyncStreamIterator[bytes]: - """Yield all available data as soon as it is received.""" - return AsyncStreamIterator(self.readany) # type: ignore[attr-defined] - - def iter_chunks(self) -> ChunkTupleAsyncStreamIterator: - """Yield chunks of data as they are received by the server. - - The yielded objects are tuples - of (bytes, bool) as returned by the StreamReader.readchunk method. - """ - return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type] - - -class StreamReader(AsyncStreamReaderMixin): +class StreamReader: """An enhancement of asyncio.StreamReader. Supports asynchronous iteration by line, chunk or as available:: @@ -174,9 +151,35 @@ def __repr__(self) -> str: info.append("e=%r" % self._exception) return "<%s>" % " ".join(info) + def __aiter__(self) -> AsyncStreamIterator[bytes]: + return AsyncStreamIterator(self.readline) + + def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]: + """Returns an asynchronous iterator that yields chunks of size n.""" + self.set_read_chunk_size(n) + return AsyncStreamIterator(lambda: self.read(n)) + + def iter_any(self) -> AsyncStreamIterator[bytes]: + """Yield all available data as soon as it is received.""" + return AsyncStreamIterator(self.readany) + + def iter_chunks(self) -> ChunkTupleAsyncStreamIterator: + """Yield chunks of data as they are received by the server. + + The yielded objects are tuples + of (bytes, bool) as returned by the StreamReader.readchunk method. + """ + return ChunkTupleAsyncStreamIterator(self) + def get_read_buffer_limits(self) -> tuple[int, int]: return (self._low_water, self._high_water) + def set_read_chunk_size(self, n: int) -> None: + """Raise buffer limits to match the consumer's chunk size.""" + if n > self._low_water: + self._low_water = n + self._high_water = n * 2 + def exception(self) -> type[BaseException] | BaseException | None: return self._exception @@ -410,10 +413,8 @@ async def read(self, n: int = -1) -> bytes: return b"" if n < 0: - # This used to just loop creating a new waiter hoping to - # collect everything in self._buffer, but that would - # deadlock if the subprocess sends more than self.limit - # bytes. So just call self.readany() until EOF. + # Reading everything — remove decompression chunk limit. + self.set_read_chunk_size(sys.maxsize) blocks = [] while True: block = await self.readany() @@ -422,6 +423,7 @@ async def read(self, n: int = -1) -> bytes: blocks.append(block) return b"".join(blocks) + self.set_read_chunk_size(n) # TODO: should be `if` instead of `while` # because waiter maybe triggered on chunk end, # without feeding any data @@ -595,6 +597,9 @@ async def wait_eof(self) -> None: def feed_data(self, data: bytes) -> bool: return False + def set_read_chunk_size(self, n: int) -> None: + return + async def readline(self, *, max_line_length: int | None = None) -> bytes: return b"" diff --git a/aiohttp/web_request.py b/aiohttp/web_request.py index 5871390529c..2034e0b3f83 100644 --- a/aiohttp/web_request.py +++ b/aiohttp/web_request.py @@ -628,6 +628,10 @@ async def read(self) -> bytes: Returns bytes object with full request content. """ if self._read_bytes is None: + # Raise the buffer limits so compressed payloads decompress in + # larger chunks instead of many small pause/resume cycles. + if self._client_max_size: + self._payload.set_read_chunk_size(self._client_max_size) body = bytearray() while True: chunk = await self._payload.readany() diff --git a/tests/test_flowcontrol_streams.py b/tests/test_flowcontrol_streams.py index 3654ba4aad2..61c44ddbe66 100644 --- a/tests/test_flowcontrol_streams.py +++ b/tests/test_flowcontrol_streams.py @@ -77,7 +77,7 @@ async def test_readexactly(self, stream: streams.StreamReader) -> None: stream.feed_data(b"data") res = await stream.readexactly(3) assert res == b"dat" - assert not stream._protocol.resume_reading.called # type: ignore[attr-defined] + assert stream._protocol.resume_reading.called # type: ignore[attr-defined] async def test_feed_data(self, stream: streams.StreamReader) -> None: stream._protocol._reading_paused = False