Skip to content

Commit accdc35

Browse files
Surface max_message_size on dbapi connect / aconnect / Connection / AsyncConnection
The client layer (dqliteclient) already plumbs the inbound-frame-size cap through connect() / create_pool() / DqliteConnection / DqliteProtocol. The dbapi layer was the missing piece — operators configuring the kwarg through dqlitedbapi.connect() or aio.connect() had no path to forward it. Thread max_message_size through: - dqlitedbapi.connect() / aio.connect() / aio.aconnect() signatures + forward to the underlying Connection / AsyncConnection. - Connection.__init__ / AsyncConnection.__init__ accept and store on self._max_message_size (None = wire-default 64 MiB). - _build_and_connect / _resolve_leader / _get_resolve_leader_cluster accept the kwarg with default None so existing callers continue to compile; the cache key inside _get_resolve_leader_cluster includes max_message_size so the dbapi connect() variant produces independent cache entries. - The ClusterClient construction inside _get_resolve_leader_cluster does NOT forward max_message_size — ClusterClient's constructor lacks the kwarg today (only its per-call connect() method takes it). The leader-probe RPC returns a small LeaderResponse bounded well below the wire-default cap; the operator's larger cap matters only for the eventual DqliteConnection data session, which IS built with max_message_size at the _build_and_connect call site. The wire layer is the single source of truth for validation (positive int, non-bool); the dbapi forward passes the value through verbatim, mirroring the reviewer's "one place to maintain the contract" guidance. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 08808a7 commit accdc35

5 files changed

Lines changed: 139 additions & 0 deletions

File tree

src/dqlitedbapi/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ def connect(
214214
timeout: float = DEFAULT_TIMEOUT_SECONDS,
215215
max_total_rows: int | None = _DEFAULT_MAX_TOTAL_ROWS,
216216
max_continuation_frames: int | None = _DEFAULT_MAX_CONTINUATION_FRAMES,
217+
max_message_size: int | None = None,
217218
trust_server_heartbeat: bool = False,
218219
close_timeout: float = DEFAULT_CLOSE_TIMEOUT_SECONDS,
219220
dial_timeout: float | None = None,
@@ -239,6 +240,12 @@ def connect(
239240
:class:`Connection`. ``None`` disables the cap.
240241
max_continuation_frames: Per-query continuation-frame cap.
241242
Forwarded to the underlying :class:`Connection`.
243+
max_message_size: Maximum allowed inbound frame size in
244+
bytes. ``None`` (default) falls back to the wire-layer
245+
default (64 MiB). Forwarded to the underlying
246+
:class:`Connection`. The wire layer validates the value
247+
(positive int, non-bool); pathological values raise
248+
``ValueError`` from the wire layer at construction.
242249
trust_server_heartbeat: Let the server-advertised heartbeat
243250
widen the per-read deadline. Default False.
244251
close_timeout: Budget (seconds) for the transport-drain during
@@ -325,6 +332,7 @@ def connect(
325332
timeout=timeout,
326333
max_total_rows=max_total_rows,
327334
max_continuation_frames=max_continuation_frames,
335+
max_message_size=max_message_size,
328336
trust_server_heartbeat=trust_server_heartbeat,
329337
close_timeout=close_timeout,
330338
dial_timeout=dial_timeout,

src/dqlitedbapi/aio/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ def connect(
200200
timeout: float = DEFAULT_TIMEOUT_SECONDS,
201201
max_total_rows: int | None = _DEFAULT_MAX_TOTAL_ROWS,
202202
max_continuation_frames: int | None = _DEFAULT_MAX_CONTINUATION_FRAMES,
203+
max_message_size: int | None = None,
203204
trust_server_heartbeat: bool = False,
204205
close_timeout: float = DEFAULT_CLOSE_TIMEOUT_SECONDS,
205206
dial_timeout: float | None = None,
@@ -228,6 +229,12 @@ def connect(
228229
AsyncConnection. None disables the cap.
229230
max_continuation_frames: Per-query continuation-frame cap.
230231
Forwarded to the underlying AsyncConnection.
232+
max_message_size: Maximum allowed inbound frame size in
233+
bytes. ``None`` (default) falls back to the wire-layer
234+
default (64 MiB). Forwarded to the underlying
235+
AsyncConnection. The wire layer validates the value
236+
(positive int, non-bool); pathological values raise
237+
``ValueError`` from the wire layer at construction.
231238
trust_server_heartbeat: Let the server-advertised heartbeat
232239
widen the per-read deadline. Default False.
233240
close_timeout: Budget (seconds) for the transport-drain during
@@ -291,6 +298,7 @@ def connect(
291298
timeout=timeout,
292299
max_total_rows=max_total_rows,
293300
max_continuation_frames=max_continuation_frames,
301+
max_message_size=max_message_size,
294302
trust_server_heartbeat=trust_server_heartbeat,
295303
close_timeout=close_timeout,
296304
dial_timeout=dial_timeout,
@@ -306,6 +314,7 @@ async def aconnect(
306314
timeout: float = DEFAULT_TIMEOUT_SECONDS,
307315
max_total_rows: int | None = _DEFAULT_MAX_TOTAL_ROWS,
308316
max_continuation_frames: int | None = _DEFAULT_MAX_CONTINUATION_FRAMES,
317+
max_message_size: int | None = None,
309318
trust_server_heartbeat: bool = False,
310319
close_timeout: float = DEFAULT_CLOSE_TIMEOUT_SECONDS,
311320
dial_timeout: float | None = None,
@@ -332,6 +341,12 @@ async def aconnect(
332341
AsyncConnection. None disables the cap.
333342
max_continuation_frames: Per-query continuation-frame cap.
334343
Forwarded to the underlying AsyncConnection.
344+
max_message_size: Maximum allowed inbound frame size in
345+
bytes. ``None`` (default) falls back to the wire-layer
346+
default (64 MiB). Forwarded to the underlying
347+
AsyncConnection. The wire layer validates the value
348+
(positive int, non-bool); pathological values raise
349+
``ValueError`` from the wire layer at construction.
335350
trust_server_heartbeat: Let the server-advertised heartbeat
336351
widen the per-read deadline. Default False.
337352
close_timeout: Budget (seconds) for the transport-drain during
@@ -393,6 +408,7 @@ async def aconnect(
393408
timeout=timeout,
394409
max_total_rows=max_total_rows,
395410
max_continuation_frames=max_continuation_frames,
411+
max_message_size=max_message_size,
396412
trust_server_heartbeat=trust_server_heartbeat,
397413
close_timeout=close_timeout,
398414
dial_timeout=dial_timeout,

src/dqlitedbapi/aio/connection.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ def __init__(
208208
timeout: float = DEFAULT_TIMEOUT_SECONDS,
209209
max_total_rows: int | None = _DEFAULT_MAX_TOTAL_ROWS,
210210
max_continuation_frames: int | None = _DEFAULT_MAX_CONTINUATION_FRAMES,
211+
max_message_size: int | None = None,
211212
trust_server_heartbeat: bool = False,
212213
close_timeout: float = DEFAULT_CLOSE_TIMEOUT_SECONDS,
213214
dial_timeout: float | None = None,
@@ -289,6 +290,9 @@ def __init__(
289290
"max_continuation_frames",
290291
upper=MAX_CONTINUATION_FRAMES_UPPER_BOUND,
291292
)
293+
# See sync sibling for rationale: ``None`` falls back to the
294+
# wire-layer default; the wire layer revalidates.
295+
self._max_message_size = max_message_size
292296
self._trust_server_heartbeat = trust_server_heartbeat
293297
self._close_timeout = close_timeout
294298
self._dial_timeout = dial_timeout
@@ -526,6 +530,7 @@ async def _ensure_connection(self) -> DqliteConnection:
526530
timeout=self._timeout,
527531
max_total_rows=self._max_total_rows,
528532
max_continuation_frames=self._max_continuation_frames,
533+
max_message_size=getattr(self, "_max_message_size", None),
529534
trust_server_heartbeat=self._trust_server_heartbeat,
530535
close_timeout=self._close_timeout,
531536
dial_timeout=getattr(self, "_dial_timeout", None),

src/dqlitedbapi/connection.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ def _get_resolve_leader_cluster(
377377
max_continuation_frames: int | None,
378378
trust_server_heartbeat: bool,
379379
dial_func: DialFunc | None = None,
380+
max_message_size: int | None = None,
380381
) -> ClusterClient:
381382
"""Return a process-shared :class:`ClusterClient` for the
382383
leader-discovery probe, keyed by the (loop, address, governor)
@@ -461,6 +462,7 @@ def _get_resolve_leader_cluster(
461462
timeout,
462463
max_total_rows,
463464
max_continuation_frames,
465+
max_message_size,
464466
trust_server_heartbeat,
465467
dial_func,
466468
)
@@ -486,6 +488,19 @@ def _get_resolve_leader_cluster(
486488
# cache size cap (_RESOLVE_LEADER_CACHE_MAX) bounds
487489
# the per-loop memory pressure.
488490
_RESOLVE_LEADER_CACHE.pop(next(iter(_RESOLVE_LEADER_CACHE)))
491+
# ``max_message_size`` is intentionally NOT forwarded to
492+
# ``ClusterClient.__init__``: ClusterClient lacks the
493+
# constructor kwarg today (only its per-call ``connect()``
494+
# method accepts it). The leader-probe RPC returns
495+
# ``LeaderResponse`` which is bounded well below the
496+
# wire-default 64 MiB; the operator's larger cap matters
497+
# only for the eventual ``DqliteConnection`` data session,
498+
# which IS built with ``max_message_size`` at the call
499+
# site below in ``_build_and_connect``. Cache-key membership
500+
# is still kept on ``max_message_size`` so the dbapi
501+
# connect() variant produces independent cache entries —
502+
# avoids cross-contamination if ``ClusterClient`` ever
503+
# grows the kwarg.
489504
cluster = ClusterClient(
490505
MemoryNodeStore([address]),
491506
timeout=timeout,
@@ -504,6 +519,7 @@ async def _resolve_leader(
504519
timeout: float,
505520
max_total_rows: int | None = _DEFAULT_MAX_TOTAL_ROWS,
506521
max_continuation_frames: int | None = _DEFAULT_MAX_CONTINUATION_FRAMES,
522+
max_message_size: int | None = None,
507523
trust_server_heartbeat: bool = False,
508524
dial_func: DialFunc | None = None,
509525
) -> str:
@@ -550,6 +566,7 @@ async def _resolve_leader(
550566
timeout=timeout,
551567
max_total_rows=max_total_rows,
552568
max_continuation_frames=max_continuation_frames,
569+
max_message_size=max_message_size,
553570
trust_server_heartbeat=trust_server_heartbeat,
554571
dial_func=dial_func,
555572
)
@@ -568,6 +585,7 @@ async def _build_and_connect(
568585
dial_timeout: float | None = None,
569586
attempt_timeout: float | None = None,
570587
dial_func: DialFunc | None = None,
588+
max_message_size: int | None = None,
571589
) -> DqliteConnection:
572590
"""Build a DqliteConnection with the given governors and connect it.
573591
@@ -595,6 +613,7 @@ async def _build_and_connect(
595613
timeout=timeout,
596614
max_total_rows=max_total_rows,
597615
max_continuation_frames=max_continuation_frames,
616+
max_message_size=max_message_size,
598617
trust_server_heartbeat=trust_server_heartbeat,
599618
dial_func=dial_func,
600619
)
@@ -680,6 +699,7 @@ async def _build_and_connect(
680699
timeout=timeout,
681700
max_total_rows=max_total_rows,
682701
max_continuation_frames=max_continuation_frames,
702+
max_message_size=max_message_size,
683703
trust_server_heartbeat=trust_server_heartbeat,
684704
close_timeout=close_timeout,
685705
dial_timeout=dial_timeout,
@@ -1257,6 +1277,7 @@ def __init__(
12571277
timeout: float = DEFAULT_TIMEOUT_SECONDS,
12581278
max_total_rows: int | None = _DEFAULT_MAX_TOTAL_ROWS,
12591279
max_continuation_frames: int | None = _DEFAULT_MAX_CONTINUATION_FRAMES,
1280+
max_message_size: int | None = None,
12601281
trust_server_heartbeat: bool = False,
12611282
close_timeout: float = DEFAULT_CLOSE_TIMEOUT_SECONDS,
12621283
dial_timeout: float | None = None,
@@ -1363,6 +1384,12 @@ def __init__(
13631384
"max_continuation_frames",
13641385
upper=MAX_CONTINUATION_FRAMES_UPPER_BOUND,
13651386
)
1387+
# ``max_message_size``: ``None`` falls back to the wire-layer
1388+
# default (64 MiB). The wire layer revalidates the value at
1389+
# protocol construction; the dbapi layer just stores and
1390+
# forwards. Passing the value through without dbapi-side
1391+
# validation keeps a single source of truth.
1392+
self._max_message_size = max_message_size
13661393
self._trust_server_heartbeat = trust_server_heartbeat
13671394
self._close_timeout = close_timeout
13681395
self._dial_timeout = dial_timeout
@@ -1990,6 +2017,7 @@ async def _get_async_connection(self) -> DqliteConnection:
19902017
timeout=self._timeout,
19912018
max_total_rows=self._max_total_rows,
19922019
max_continuation_frames=self._max_continuation_frames,
2020+
max_message_size=getattr(self, "_max_message_size", None),
19932021
trust_server_heartbeat=self._trust_server_heartbeat,
19942022
close_timeout=self._close_timeout,
19952023
dial_timeout=getattr(self, "_dial_timeout", None),
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""Pin: ``max_message_size`` kwarg flows from
2+
``dqlitedbapi.connect()`` / ``aconnect()`` through
3+
``Connection`` / ``AsyncConnection`` to the underlying client
4+
:class:`DqliteConnection`'s ``_max_message_size`` slot.
5+
6+
The client layer already plumbs the knob through (see
7+
done/client-wire-max-message-size-64mib-cap-not-propagated-as-knob.md).
8+
This test pins the dbapi-side propagation that was missing.
9+
10+
Out-of-range validation happens at the wire layer
11+
(``DqliteProtocol`` rejects non-int / bool / non-positive); the
12+
dbapi forward passes the value through verbatim.
13+
"""
14+
15+
from __future__ import annotations
16+
17+
from typing import Any
18+
19+
import pytest
20+
21+
import dqlitedbapi
22+
import dqlitedbapi.aio
23+
24+
25+
def test_sync_connect_propagates_max_message_size() -> None:
26+
"""``connect(address, max_message_size=N)`` constructs a
27+
``Connection`` whose stored slot equals ``N``. The actual
28+
underlying ``DqliteConnection`` is built lazily on first use;
29+
the dbapi-side slot is what gets forwarded.
30+
"""
31+
conn = dqlitedbapi.connect("localhost:9001", max_message_size=12345)
32+
assert conn._max_message_size == 12345
33+
# Test does not connect to a real cluster; no close needed.
34+
35+
36+
def test_sync_connect_default_max_message_size_is_none() -> None:
37+
"""``None`` is the default sentinel — the wire layer fills in its
38+
own 64 MiB default when ``max_message_size`` is absent.
39+
"""
40+
conn = dqlitedbapi.connect("localhost:9001")
41+
assert conn._max_message_size is None
42+
43+
44+
def test_async_connect_propagates_max_message_size() -> None:
45+
"""``aio.connect(address, max_message_size=N)`` is the sync-shape
46+
factory that returns an ``AsyncConnection``. Same propagation
47+
expectation as the sync sibling.
48+
"""
49+
conn = dqlitedbapi.aio.connect("localhost:9001", max_message_size=54321)
50+
assert conn._max_message_size == 54321
51+
52+
53+
def test_async_connect_default_max_message_size_is_none() -> None:
54+
conn = dqlitedbapi.aio.connect("localhost:9001")
55+
assert conn._max_message_size is None
56+
57+
58+
@pytest.mark.parametrize(
59+
"bad_value",
60+
[
61+
0, # protocol layer rejects: must be >= 1
62+
-1,
63+
True, # bool rejected (subclass of int)
64+
"100", # type error from protocol layer
65+
],
66+
)
67+
def test_sync_connect_invalid_max_message_size_rejected_at_protocol_layer(
68+
bad_value: Any,
69+
) -> None:
70+
"""The dbapi forward does not re-validate ``max_message_size``;
71+
pathological values reach the wire layer which rejects with
72+
``TypeError`` (bool, str) or ``ValueError`` (non-positive int).
73+
74+
The dbapi just constructs and stores the value; the protocol
75+
layer's ``ValueError`` / ``TypeError`` surfaces at the eventual
76+
``connect()`` call (out-of-scope here — the slot read above
77+
confirms the value reached the dbapi storage). This test pins
78+
that the dbapi does NOT pre-emptively reject (single source of
79+
truth at the protocol layer).
80+
"""
81+
conn = dqlitedbapi.connect("localhost:9001", max_message_size=bad_value)
82+
assert conn._max_message_size == bad_value

0 commit comments

Comments
 (0)