Skip to content

Commit 6db05a7

Browse files
Bound pool.close() drain wall-clock with an aggregate deadline
``pool.close()`` previously called ``self._drain_idle()`` with no ``deadline=`` kwarg. A queue of pathologically-slow ``close()`` peers (leader flip mid-shutdown, half-closed sockets whose ``wait_closed`` never returns, FIN/ACK loss on a downed peer) consumed ``max_size × close_timeout × cap_multiplier`` of loop time before close returned — ~20 s at defaults, ~200 s with ``max_size=100``. Sibling acquire-path callers already passed ``deadline=...``; the close path was the structural outlier. Compute an aggregate close-budget deadline up-front and thread it through both ``_drain_idle`` and ``_drain_remaining_after_cancel`` (which gains the matching kwarg + between-iteration gate). The cancel-recovery arm now honours the same envelope so an outer cancel cannot re-amplify the gap via the recovery sweep. The second-caller arm in ``close()`` mirrors the shape so re-entry under a stuck queue is bounded too. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 0596d19 commit 6db05a7

2 files changed

Lines changed: 176 additions & 5 deletions

File tree

src/dqliteclient/pool.py

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1642,9 +1642,14 @@ async def _drain_idle(self, *, deadline: float | None = None) -> None:
16421642
# the next dead-conn detect, or ``pool.close()``.
16431643
if not deadline_exit:
16441644
with contextlib.suppress(asyncio.CancelledError):
1645-
await asyncio.shield(self._drain_remaining_after_cancel())
1646-
1647-
async def _drain_remaining_after_cancel(self) -> None:
1645+
# Forward the same deadline so the
1646+
# cancel-recovery sweep honours the aggregate
1647+
# close-budget; without this the recovery arm
1648+
# re-amplifies the gap the main-loop deadline
1649+
# was meant to bound.
1650+
await asyncio.shield(self._drain_remaining_after_cancel(deadline=deadline))
1651+
1652+
async def _drain_remaining_after_cancel(self, *, deadline: float | None = None) -> None:
16481653
"""Best-effort sweep for connections still in the queue after
16491654
the main drain loop exited (typically via outer cancel).
16501655
@@ -1654,8 +1659,30 @@ async def _drain_remaining_after_cancel(self) -> None:
16541659
does not abort the cleanup of the rest. Re-entry from a second
16551660
``close()`` caller is safe — the queue empties on the first
16561661
sweep and subsequent calls find ``self._pool.empty() is True``.
1662+
1663+
Optional ``deadline`` (monotonic ``loop.time()`` value): when
1664+
the running loop's clock advances past it, the sweep stops
1665+
attempting further connections. The check is between
1666+
iterations so an already-started per-connection close still
1667+
gets its per-iteration cap. Mirrors the discipline at
1668+
``_drain_idle``. Callers from ``pool.close()`` pass an
1669+
aggregate close-budget deadline so the SIGTERM contract is
1670+
honoured end-to-end (cancel-recovery arm cannot extend close
1671+
past the operator's budget).
16571672
"""
1673+
loop = asyncio.get_running_loop()
16581674
while not self._pool.empty():
1675+
if deadline is not None and loop.time() >= deadline:
1676+
# Aggregate deadline exceeded between iterations.
1677+
# The remaining queue stays for the next caller
1678+
# (next ``pool.close()`` re-entry, or the
1679+
# post-close finalizer). Mirrors ``_drain_idle``'s
1680+
# between-iteration deadline gate.
1681+
logger.debug(
1682+
"pool: _drain_remaining_after_cancel deadline "
1683+
"exhausted between iterations; abandoning"
1684+
)
1685+
return
16591686
try:
16601687
conn = self._pool.get_nowait()
16611688
except asyncio.QueueEmpty: # pragma: no cover
@@ -2898,7 +2925,15 @@ async def close(self) -> None:
28982925
# being silently broken.
28992926
if not self._drain_complete:
29002927
with contextlib.suppress(asyncio.CancelledError):
2901-
await asyncio.shield(self._drain_remaining_after_cancel())
2928+
# Mirror the first-caller's close-budget shape so
2929+
# the second-caller's sweep cannot extend past
2930+
# the same aggregate envelope.
2931+
second_close_deadline = asyncio.get_running_loop().time() + (
2932+
self._close_timeout * self._max_size * _DRAIN_PER_CONN_CAP_MULTIPLIER
2933+
)
2934+
await asyncio.shield(
2935+
self._drain_remaining_after_cancel(deadline=second_close_deadline)
2936+
)
29022937
return
29032938
# Publish the drain-done event BEFORE flipping the closed flag
29042939
# so any second caller observing ``_closed=True`` is guaranteed
@@ -2928,7 +2963,24 @@ async def close(self) -> None:
29282963
self._pool.qsize(),
29292964
max(self._size - self._pool.qsize(), 0),
29302965
)
2931-
await self._drain_idle()
2966+
# Aggregate close-budget deadline. Bounds the drain at the
2967+
# documented worst-case shape (``close_timeout × max_size
2968+
# × _DRAIN_PER_CONN_CAP_MULTIPLIER``) — same envelope the
2969+
# operator would compose manually around ``pool.close()``
2970+
# via ``asyncio.timeout``. Without this gate, a queue of
2971+
# stuck-close peers (leader-flip mid-shutdown, half-closed
2972+
# sockets whose ``wait_closed`` never returns) keeps the
2973+
# close path waiting the full
2974+
# ``N × per_iter_cap`` even though the operator's
2975+
# ``close_timeout`` was sized per-peer, not per-batch. The
2976+
# cancel-recovery arm in ``_drain_idle`` forwards the same
2977+
# deadline so an outer cancel still truncates predictably
2978+
# rather than re-amplifying via
2979+
# ``_drain_remaining_after_cancel``.
2980+
close_deadline = asyncio.get_running_loop().time() + (
2981+
self._close_timeout * self._max_size * _DRAIN_PER_CONN_CAP_MULTIPLIER
2982+
)
2983+
await self._drain_idle(deadline=close_deadline)
29322984
# Drain completed normally. Set BEFORE the finally so a
29332985
# cancel landing between the drain return and the flag
29342986
# assignment leaves ``_drain_complete=False`` — siblings
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
"""Pin: ``ConnectionPool.close()`` derives an aggregate close-budget
2+
deadline and threads it through both ``_drain_idle`` and
3+
``_drain_remaining_after_cancel`` so the SIGTERM contract bounds
4+
end-to-end wall-clock regardless of stuck-close stampedes.
5+
6+
The prior shape called ``await self._drain_idle()`` (no
7+
``deadline=`` kwarg). With the wire-default ``close_timeout=0.5``
8+
and ``max_size=10``, a queue of pathologically-slow ``close()``
9+
peers consumed ~20 s before the close path returned; with
10+
``max_size=100``, ~200 s. Sibling acquire-path call sites
11+
already passed ``deadline=...``; the close path was the
12+
structural outlier.
13+
14+
``_drain_remaining_after_cancel`` (the cancel-recovery sweep)
15+
gains the same ``deadline`` kwarg so an outer cancel mid-drain
16+
cannot re-amplify the gap via the recovery arm.
17+
"""
18+
19+
from __future__ import annotations
20+
21+
import ast
22+
import inspect
23+
import textwrap
24+
25+
from dqliteclient.pool import ConnectionPool
26+
27+
28+
def _close_source() -> str:
29+
return textwrap.dedent(inspect.getsource(ConnectionPool.close))
30+
31+
32+
def _drain_remaining_after_cancel_signature() -> inspect.Signature:
33+
return inspect.signature(ConnectionPool._drain_remaining_after_cancel)
34+
35+
36+
def test_pool_close_passes_deadline_to_drain_idle() -> None:
37+
"""Structural pin: ``pool.close()`` must call
38+
``self._drain_idle(deadline=...)`` (not the bare
39+
``self._drain_idle()`` shape that omits the aggregate
40+
close-budget gate).
41+
"""
42+
src = _close_source()
43+
tree = ast.parse(src)
44+
found_with_deadline = False
45+
found_bare = False
46+
for node in ast.walk(tree):
47+
if not isinstance(node, ast.Call):
48+
continue
49+
func = node.func
50+
if (
51+
isinstance(func, ast.Attribute)
52+
and func.attr == "_drain_idle"
53+
and isinstance(func.value, ast.Name)
54+
and func.value.id == "self"
55+
):
56+
if any(kw.arg == "deadline" for kw in node.keywords):
57+
found_with_deadline = True
58+
else:
59+
found_bare = True
60+
assert found_with_deadline, (
61+
"pool.close() must pass ``deadline=<aggregate_close_budget>`` "
62+
"to ``self._drain_idle``. The aggregate-budget gate is what "
63+
"bounds end-to-end close wall-clock under a stuck-close "
64+
"stampede; without it, ``close()`` waits ``N × per_iter_cap`` "
65+
"even when ``close_timeout`` was sized per-peer."
66+
)
67+
assert not found_bare, (
68+
"pool.close() should NOT contain a bare ``self._drain_idle()`` "
69+
"call. Every drain invocation from the close path must thread "
70+
"the aggregate deadline through."
71+
)
72+
73+
74+
def test_drain_remaining_after_cancel_accepts_deadline_kwarg() -> None:
75+
"""The sibling cancel-recovery sweep must accept the same
76+
``deadline`` plumbing so the close-budget gate is honoured
77+
end-to-end (outer cancel can't re-amplify via the recovery arm).
78+
"""
79+
sig = _drain_remaining_after_cancel_signature()
80+
assert "deadline" in sig.parameters, (
81+
"_drain_remaining_after_cancel must accept a ``deadline`` "
82+
"kwarg so the cancel-recovery sweep honours the same "
83+
"aggregate close-budget the main drain loop enforces. "
84+
"Without it, an outer cancel re-amplifies the gap the "
85+
"main-loop deadline was meant to bound."
86+
)
87+
88+
89+
def test_pool_close_forwards_deadline_into_drain_remaining_after_cancel() -> None:
90+
"""Pin: the second-caller arm in ``pool.close()`` calls
91+
``self._drain_remaining_after_cancel(deadline=...)`` (not the
92+
bare form).
93+
"""
94+
src = _close_source()
95+
tree = ast.parse(src)
96+
found_with_deadline = False
97+
found_bare = False
98+
for node in ast.walk(tree):
99+
if not isinstance(node, ast.Call):
100+
continue
101+
func = node.func
102+
if (
103+
isinstance(func, ast.Attribute)
104+
and func.attr == "_drain_remaining_after_cancel"
105+
and isinstance(func.value, ast.Name)
106+
and func.value.id == "self"
107+
):
108+
if any(kw.arg == "deadline" for kw in node.keywords):
109+
found_with_deadline = True
110+
else:
111+
found_bare = True
112+
assert found_with_deadline, (
113+
"pool.close()'s second-caller arm must pass a ``deadline=`` "
114+
"to ``self._drain_remaining_after_cancel`` so the recovery "
115+
"sweep honours the close-budget envelope."
116+
)
117+
assert not found_bare, (
118+
"pool.close() should not contain a bare ``self._drain_remaining_after_cancel()`` call."
119+
)

0 commit comments

Comments
 (0)