Skip to content

Commit 2847a2c

Browse files
Release the op-lock when a signal interrupts the owner-stamp in _run_sync
A KeyboardInterrupt or SystemExit delivered after the op-lock was acquired but before the body try/finally was entered — landing on the _op_lock_owner stamp — escaped with the lock held by no thread, permanently wedging the connection: every later _run_sync then timed out on the bounded acquire. Place the acquire, the owner-stamp, and the body under a single try/finally so the release always runs once the lock is held. The inner try/except around acquire() is kept for the acquire-return-before-store window (it best-effort releases with acquired still False, so the outer finally does not double-release), and acquired is bound before the outer try so the finally never references an unbound local. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 3073d70 commit 2847a2c

2 files changed

Lines changed: 122 additions & 33 deletions

File tree

src/dqlitedbapi/connection.py

Lines changed: 40 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,41 +1100,48 @@ def _run_sync[T](self, coro: Coroutine[Any, Any, T]) -> T:
11001100
``self._timeout`` so a same-thread signal-handler re-entry
11011101
raises cleanly instead of deadlocking the non-reentrant lock.
11021102
"""
1103-
# acquire(timeout=...) is SIGINT-interruptible: a KI/SystemExit
1104-
# escapes BEFORE the try block, skipping its cleanup arm. If a
1105-
# prior op is in flight it owns _in_use, wedging the connection;
1106-
# schedule a defensive _invalidate. Gated on _in_use so a KI on
1107-
# a quiet acquire doesn't invalidate gratuitously.
1103+
# The acquire, the owner-stamp, and the body all sit under one
1104+
# try/finally so a SIGINT-delivered KeyboardInterrupt/SystemExit
1105+
# raised anywhere after the lock is held — including in the
1106+
# owner-stamp gap — cannot escape with the lock latched. `acquired`
1107+
# is bound first so the finally never NameErrors on the inner arm
1108+
# below (which re-raises before `acquired` is assigned).
1109+
acquired = False
11081110
try:
1109-
acquired = self._op_lock.acquire(timeout=self._timeout)
1110-
except (KeyboardInterrupt, SystemExit):
1111-
# The lock may be held if KI landed in the gap between
1112-
# acquire returning True and the STORE_FAST; best-effort
1113-
# release (suppress the unlocked-lock RuntimeError).
1114-
with contextlib.suppress(RuntimeError):
1115-
self._op_lock_owner = None
1116-
self._op_lock.release()
1117-
# Close the never-scheduled coro to avoid a warning.
1118-
coro.close()
1119-
# Synchronously null _async_conn so a retry from the signal
1120-
# handler doesn't hit "another operation is in progress" on
1121-
# the stale in-use conn (the queued _invalidate only lands
1122-
# when the slow read yields). GIL-atomic STORE_ATTR; the
1123-
# loop coro keeps its own ref and reaps via _invalidate.
1124-
dying = self._async_conn
1125-
if dying is not None and self._loop is not None and dying._in_use:
1126-
self._async_conn = None
1111+
# acquire(timeout=...) is SIGINT-interruptible: a KI/SystemExit
1112+
# can land between acquire returning True and the STORE_FAST,
1113+
# leaving the lock held with `acquired` still False — so the
1114+
# inner arm best-effort releases it (the outer finally skips
1115+
# release when `acquired` is False). A prior op in flight owns
1116+
# _in_use, wedging the connection; schedule a defensive
1117+
# _invalidate, gated on _in_use so a KI on a quiet acquire
1118+
# doesn't invalidate gratuitously.
1119+
try:
1120+
acquired = self._op_lock.acquire(timeout=self._timeout)
1121+
except (KeyboardInterrupt, SystemExit):
11271122
with contextlib.suppress(RuntimeError):
1128-
self._loop.call_soon_threadsafe(
1129-
dying._invalidate,
1130-
InterfaceError("operation interrupted during op-lock acquire"),
1131-
)
1132-
raise
1133-
# Stamp owner before the try/finally so the close() bypass probe
1134-
# can read it (only on the acquired arm). GIL-atomic.
1135-
if acquired:
1136-
self._op_lock_owner = threading.get_ident()
1137-
try:
1123+
self._op_lock_owner = None
1124+
self._op_lock.release()
1125+
# Close the never-scheduled coro to avoid a warning.
1126+
coro.close()
1127+
# Synchronously null _async_conn so a retry from the signal
1128+
# handler doesn't hit "another operation is in progress" on
1129+
# the stale in-use conn (the queued _invalidate only lands
1130+
# when the slow read yields). GIL-atomic STORE_ATTR; the
1131+
# loop coro keeps its own ref and reaps via _invalidate.
1132+
dying = self._async_conn
1133+
if dying is not None and self._loop is not None and dying._in_use:
1134+
self._async_conn = None
1135+
with contextlib.suppress(RuntimeError):
1136+
self._loop.call_soon_threadsafe(
1137+
dying._invalidate,
1138+
InterfaceError("operation interrupted during op-lock acquire"),
1139+
)
1140+
raise
1141+
# Stamp owner on the acquired arm so the close() bypass probe
1142+
# can read it. GIL-atomic.
1143+
if acquired:
1144+
self._op_lock_owner = threading.get_ident()
11381145
if not acquired:
11391146
coro.close()
11401147
# OperationalError (not InterfaceError) so SA's
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""Pin: a KeyboardInterrupt/SystemExit raised in the op-lock owner-stamp gap
2+
must not leak ``_op_lock``.
3+
4+
The owner-stamp (``self._op_lock_owner = threading.get_ident()``) runs on the
5+
acquired arm of ``_run_sync``. If a signal lands there while the lock is held,
6+
the release path must still run; otherwise the lock is held by no thread and
7+
every later ``_run_sync`` wedges on the bounded acquire.
8+
"""
9+
10+
from __future__ import annotations
11+
12+
import threading
13+
from collections.abc import Callable
14+
from unittest.mock import AsyncMock, MagicMock, patch
15+
16+
import pytest
17+
18+
from dqlitedbapi import Connection
19+
20+
21+
def _make_with_loop_thread() -> Connection:
22+
"""Sync Connection with the loop thread up and a mock async connection."""
23+
conn = Connection("localhost:9001")
24+
conn._ensure_loop()
25+
fake = MagicMock()
26+
fake.execute = AsyncMock(return_value=(0, 0))
27+
fake.close = AsyncMock()
28+
fake._invalidate = MagicMock()
29+
fake._in_use = False
30+
fake._bound_loop = None
31+
conn._async_conn = fake
32+
return conn
33+
34+
35+
async def _noop() -> None:
36+
return None
37+
38+
39+
def _ident_raising_in_stamp_gap(
40+
conn: Connection, exc: type[BaseException], state: dict[str, bool]
41+
) -> Callable[[], int]:
42+
"""A ``threading.get_ident`` replacement that raises once, only while
43+
``_op_lock`` is held — i.e. at the owner-stamp, never at ``_check_thread``
44+
or ``close`` (which also call ``get_ident`` but with the lock unheld)."""
45+
real = threading.get_ident
46+
47+
def fake() -> int:
48+
if not state["fired"] and conn._op_lock.locked():
49+
state["fired"] = True
50+
raise exc
51+
return real()
52+
53+
return fake
54+
55+
56+
@pytest.mark.parametrize("exc", [KeyboardInterrupt, SystemExit])
57+
def test_run_sync_signal_in_owner_stamp_gap_releases_op_lock(
58+
exc: type[BaseException],
59+
) -> None:
60+
conn = _make_with_loop_thread()
61+
coro = _noop()
62+
try:
63+
state = {"fired": False}
64+
with (
65+
patch(
66+
"dqlitedbapi.connection.threading.get_ident",
67+
side_effect=_ident_raising_in_stamp_gap(conn, exc, state),
68+
),
69+
pytest.raises(exc),
70+
):
71+
conn._run_sync(coro)
72+
73+
assert state["fired"], "test did not exercise the owner-stamp gap"
74+
# The signal escaped before the body try was entered, but the lock was
75+
# held — it must be released, else the connection wedges for life.
76+
assert not conn._op_lock.locked()
77+
assert conn._op_lock.acquire(timeout=0) is True
78+
conn._op_lock.release()
79+
assert conn._op_lock_owner is None
80+
finally:
81+
coro.close()
82+
conn._closed = True

0 commit comments

Comments
 (0)