Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions aiohttp/compression_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,15 @@ def decompress_sync(
) -> bytes:
"""Decompress the given data."""
if hasattr(self._obj, "decompress"):
result = cast(bytes, self._obj.decompress(data, max_length))
if max_length == ZLIB_MAX_LENGTH_UNLIMITED:
result = cast(bytes, self._obj.decompress(data))
else:
result = cast(bytes, self._obj.decompress(data, max_length))
else:
result = cast(bytes, self._obj.process(data, max_length))
if max_length == ZLIB_MAX_LENGTH_UNLIMITED:
result = cast(bytes, self._obj.process(data))
else:
result = cast(bytes, self._obj.process(data, max_length))
# Only way to know that brotli has no further data is checking we get no output
self._last_empty = result == b""
return result
Expand Down
9 changes: 6 additions & 3 deletions aiohttp/http_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
import re
import string
import sys
from contextlib import suppress
from enum import IntEnum
from re import Pattern
Expand Down Expand Up @@ -1118,10 +1119,12 @@ def feed_data(self, chunk: bytes) -> bool:
encoding=self.encoding, suppress_deflate_header=True
)

low_water = self.out._low_water
max_length = (
0 if low_water >= sys.maxsize else max(self._max_decompress_size, low_water)
)
try:
chunk = self.decompressor.decompress_sync(
chunk, max_length=self._max_decompress_size
)
chunk = self.decompressor.decompress_sync(chunk, max_length=max_length)
except Exception:
raise ContentEncodingError(
"Can not decode content-encoding: %s" % self.encoding
Expand Down
63 changes: 34 additions & 29 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import collections
import sys
import warnings
from collections.abc import Awaitable, Callable
from typing import Final, Generic, TypeVar
Expand Down Expand Up @@ -67,31 +68,7 @@ async def __anext__(self) -> tuple[bytes, bool]:
return rv


class AsyncStreamReaderMixin:

__slots__ = ()

def __aiter__(self) -> AsyncStreamIterator[bytes]:
return AsyncStreamIterator(self.readline) # type: ignore[attr-defined]

def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
"""Returns an asynchronous iterator that yields chunks of size n."""
return AsyncStreamIterator(lambda: self.read(n)) # type: ignore[attr-defined]

def iter_any(self) -> AsyncStreamIterator[bytes]:
"""Yield all available data as soon as it is received."""
return AsyncStreamIterator(self.readany) # type: ignore[attr-defined]

def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
"""Yield chunks of data as they are received by the server.

The yielded objects are tuples
of (bytes, bool) as returned by the StreamReader.readchunk method.
"""
return ChunkTupleAsyncStreamIterator(self) # type: ignore[arg-type]


class StreamReader(AsyncStreamReaderMixin):
class StreamReader:
"""An enhancement of asyncio.StreamReader.

Supports asynchronous iteration by line, chunk or as available::
Expand Down Expand Up @@ -174,9 +151,35 @@ def __repr__(self) -> str:
info.append("e=%r" % self._exception)
return "<%s>" % " ".join(info)

def __aiter__(self) -> AsyncStreamIterator[bytes]:
return AsyncStreamIterator(self.readline)

def iter_chunked(self, n: int) -> AsyncStreamIterator[bytes]:
"""Returns an asynchronous iterator that yields chunks of size n."""
self.set_read_chunk_size(n)
return AsyncStreamIterator(lambda: self.read(n))

def iter_any(self) -> AsyncStreamIterator[bytes]:
"""Yield all available data as soon as it is received."""
return AsyncStreamIterator(self.readany)

def iter_chunks(self) -> ChunkTupleAsyncStreamIterator:
"""Yield chunks of data as they are received by the server.

The yielded objects are tuples
of (bytes, bool) as returned by the StreamReader.readchunk method.
"""
return ChunkTupleAsyncStreamIterator(self)

def get_read_buffer_limits(self) -> tuple[int, int]:
return (self._low_water, self._high_water)

def set_read_chunk_size(self, n: int) -> None:
"""Raise buffer limits to match the consumer's chunk size."""
if n > self._low_water:
self._low_water = n
self._high_water = n * 2

def exception(self) -> type[BaseException] | BaseException | None:
return self._exception

Expand Down Expand Up @@ -410,10 +413,8 @@ async def read(self, n: int = -1) -> bytes:
return b""

if n < 0:
# This used to just loop creating a new waiter hoping to
# collect everything in self._buffer, but that would
# deadlock if the subprocess sends more than self.limit
# bytes. So just call self.readany() until EOF.
# Reading everything — remove decompression chunk limit.
self.set_read_chunk_size(sys.maxsize)
blocks = []
while True:
block = await self.readany()
Expand All @@ -422,6 +423,7 @@ async def read(self, n: int = -1) -> bytes:
blocks.append(block)
return b"".join(blocks)

self.set_read_chunk_size(n)
# TODO: should be `if` instead of `while`
# because waiter maybe triggered on chunk end,
# without feeding any data
Expand Down Expand Up @@ -595,6 +597,9 @@ async def wait_eof(self) -> None:
def feed_data(self, data: bytes) -> bool:
return False

def set_read_chunk_size(self, n: int) -> None:
return

async def readline(self, *, max_line_length: int | None = None) -> bytes:
return b""

Expand Down
4 changes: 4 additions & 0 deletions aiohttp/web_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,10 @@ async def read(self) -> bytes:
Returns bytes object with full request content.
"""
if self._read_bytes is None:
# Raise the buffer limits so compressed payloads decompress in
# larger chunks instead of many small pause/resume cycles.
if self._client_max_size:
self._payload.set_read_chunk_size(self._client_max_size)
body = bytearray()
while True:
chunk = await self._payload.readany()
Expand Down
2 changes: 1 addition & 1 deletion tests/test_flowcontrol_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def test_readexactly(self, stream: streams.StreamReader) -> None:
stream.feed_data(b"data")
res = await stream.readexactly(3)
assert res == b"dat"
assert not stream._protocol.resume_reading.called # type: ignore[attr-defined]
assert stream._protocol.resume_reading.called # type: ignore[attr-defined]

async def test_feed_data(self, stream: streams.StreamReader) -> None:
stream._protocol._reading_paused = False
Expand Down
Loading