Skip to content

Commit dd3d2aa

Browse files
Yield during the async description NULL-rescue type-code scan
Building ``cursor.description`` resolves a real Type Object per column (PEP 249 §6.1.2). When a column's row-0 tag is NULL the resolver scans later rows for the first non-NULL tag, falling back to the UNKNOWN sentinel only when EVERY row at that column is NULL. A column NULL across the whole page is therefore scanned end-to-end: O(n_null_cols × n_rows) of pure-Python work that ran synchronously on the user loop before the first await in the result path, outside the cooperative-yield chain that already covers the wire read, continuation drain, and row conversion. Extract the scan into an async helper gated by the same ``_LARGE_RESULT_ROW_THRESHOLD`` / ``_CONVERT_ROWS_YIELD_EVERY`` posture as the row conversion: small fetches scan straight through (zero scheduler overhead); larger ones yield every ``_CONVERT_ROWS_YIELD_EVERY`` inner-row steps so a single all-NULL column over many rows still cedes the loop. The resolved type-code list is byte-identical to the prior inline shape — the full all-NULL walk is preserved, only its loop-hold is broken up. The sync cursor surface keeps its synchronous twin (it runs on the dbapi daemon loop, where blocking is by design). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 0b2236d commit dd3d2aa

2 files changed

Lines changed: 238 additions & 14 deletions

File tree

src/dqlitedbapi/aio/cursor.py

Lines changed: 59 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
from typing import TYPE_CHECKING, Any, Final, NoReturn, Self
99

1010
from dqlitedbapi.cursor import (
11+
_CONVERT_ROWS_YIELD_EVERY,
1112
_EXECUTEMANY_REJECT_VERBS,
13+
_LARGE_RESULT_ROW_THRESHOLD,
1214
_call_client,
1315
_classify_caller_sql,
1416
_convert_params,
@@ -51,6 +53,54 @@
5153
_ANEXT_YIELD_EVERY: Final[int] = 512
5254

5355

56+
async def _resolve_null_rescue_type_codes(
57+
column_types: Sequence[int],
58+
row_types: Sequence[Sequence[int]],
59+
) -> list[int | _DBAPIType]:
60+
"""Resolve ``cursor.description`` type codes with a NULL-rescue
61+
scan, yielding to the loop on large result sets.
62+
63+
For each column whose row-0 tag is ``ValueType.NULL`` the scan
64+
walks subsequent rows for the first non-NULL tag, falling back
65+
to the ``UNKNOWN`` sentinel only when EVERY row at that column
66+
index is NULL (PEP 249 §6.1.2: emit a real Type Object, not
67+
``None``; ``UNKNOWN`` marks the genuinely-unrecoverable case).
68+
A column NULL across the whole page walks the full row count —
69+
O(n_null_cols × n_rows) of pure-Python iteration that runs
70+
before the first ``await`` in the result path.
71+
72+
Gating mirrors ``_convert_rows_async``: below
73+
``_LARGE_RESULT_ROW_THRESHOLD`` the scan runs straight through
74+
(small fetches pay zero scheduler overhead); at or above it,
75+
``await asyncio.sleep(0)`` fires every ``_CONVERT_ROWS_YIELD_EVERY``
76+
inner-row steps so a single all-NULL column over N rows still
77+
cedes the loop to siblings. The resolved list is byte-identical
78+
to the prior synchronous inline shape; the sync cursor surface
79+
keeps its synchronous twin (it runs on the dbapi daemon loop,
80+
where blocking is by design).
81+
"""
82+
yield_enabled = len(row_types) >= _LARGE_RESULT_ROW_THRESHOLD
83+
type_codes: list[int | _DBAPIType] = []
84+
scanned = 0
85+
for col_idx, c in enumerate(column_types):
86+
if c != ValueType.NULL:
87+
type_codes.append(int(c))
88+
continue
89+
resolved: int | _DBAPIType = _UNKNOWN_TYPE
90+
for j in range(1, len(row_types)):
91+
if col_idx < len(row_types[j]):
92+
candidate = row_types[j][col_idx]
93+
if candidate != ValueType.NULL:
94+
resolved = int(candidate)
95+
break
96+
if yield_enabled:
97+
scanned += 1
98+
if scanned % _CONVERT_ROWS_YIELD_EVERY == 0:
99+
await asyncio.sleep(0)
100+
type_codes.append(resolved)
101+
return type_codes
102+
103+
54104
class AsyncCursor:
55105
"""Async database cursor."""
56106

@@ -459,20 +509,15 @@ async def _execute_unlocked(
459509
# see sync sibling at ``cursor.py`` for the full
460510
# rationale. Fall back to ``UNKNOWN`` only when
461511
# EVERY row at that column index is NULL
462-
# (genuinely unrecoverable).
463-
type_codes = []
464-
for col_idx, c in enumerate(column_types):
465-
if c != ValueType.NULL:
466-
type_codes.append(int(c))
467-
continue
468-
resolved: int | _DBAPIType = _UNKNOWN_TYPE
469-
for j in range(1, len(row_types)):
470-
if col_idx < len(row_types[j]):
471-
candidate = row_types[j][col_idx]
472-
if candidate != ValueType.NULL:
473-
resolved = int(candidate)
474-
break
475-
type_codes.append(resolved)
512+
# (genuinely unrecoverable). Routed through the
513+
# async helper so a wide all-NULL result set
514+
# (each such column walks the full row count)
515+
# cedes the loop to siblings every
516+
# ``_CONVERT_ROWS_YIELD_EVERY`` steps instead of
517+
# monopolising it before the first downstream
518+
# ``await``. Small results take the helper's
519+
# synchronous fast path (no scheduler overhead).
520+
type_codes = await _resolve_null_rescue_type_codes(column_types, row_types)
476521
self._description = tuple(
477522
(name, type_codes[i], None, None, None, None, None)
478523
for i, name in enumerate(columns)
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
"""Pin: the async cursor's NULL-rescue type-code resolution for
2+
``cursor.description`` yields cooperatively on large result sets
3+
instead of walking every row of an all-NULL column synchronously
4+
on the user loop.
5+
6+
PEP 249 §6.1.2 requires a real Type Object per column. When a
7+
column's row-0 tag is ``ValueType.NULL`` the resolver scans
8+
subsequent rows for the first non-NULL tag, falling back to the
9+
``UNKNOWN`` sentinel only when EVERY row at that column is NULL.
10+
A column that is NULL across the whole page (a LEFT JOIN
11+
right-side with no match, an always-NULL projection) is scanned
12+
end-to-end — O(n_null_cols × n_rows) of pure-Python iteration
13+
that ran BEFORE the first ``await`` in the result path, outside
14+
the cooperative-yield chain (wire read / drain / convert-rows).
15+
16+
The fix extracts the scan into an ``async def`` helper gated by
17+
``_LARGE_RESULT_ROW_THRESHOLD`` (small fetches keep the
18+
synchronous path, zero scheduler overhead) that yields
19+
``await asyncio.sleep(0)`` every ``_CONVERT_ROWS_YIELD_EVERY``
20+
scanned inner-row steps. The resolved type-code list is
21+
byte-identical to the prior inline logic; only the all-NULL
22+
"every row is NULL" fallback walks the full count, and only that
23+
path benefits from the yields. The sync cursor surface is
24+
unchanged.
25+
"""
26+
27+
from __future__ import annotations
28+
29+
import asyncio
30+
import contextlib
31+
import time
32+
33+
import pytest
34+
35+
from dqlitedbapi.aio.cursor import _resolve_null_rescue_type_codes
36+
from dqlitedbapi.types import UNKNOWN as _UNKNOWN_TYPE
37+
from dqlitewire import ValueType
38+
39+
pytestmark = pytest.mark.asyncio
40+
41+
42+
def _sync_reference(
43+
column_types: list[ValueType],
44+
row_types: list[list[ValueType]],
45+
) -> list[int | object]:
46+
"""Byte-identical reference: the prior inline scan shape."""
47+
type_codes: list[int | object] = []
48+
for col_idx, c in enumerate(column_types):
49+
if c != ValueType.NULL:
50+
type_codes.append(int(c))
51+
continue
52+
resolved: int | object = _UNKNOWN_TYPE
53+
for j in range(1, len(row_types)):
54+
if col_idx < len(row_types[j]):
55+
candidate = row_types[j][col_idx]
56+
if candidate != ValueType.NULL:
57+
resolved = int(candidate)
58+
break
59+
type_codes.append(resolved)
60+
return type_codes
61+
62+
63+
async def test_typed_row0_columns_resolve_directly() -> None:
64+
column_types = [ValueType.INTEGER, ValueType.TEXT]
65+
row_types = [[ValueType.INTEGER, ValueType.TEXT]]
66+
result = await _resolve_null_rescue_type_codes(column_types, row_types)
67+
assert result == [int(ValueType.INTEGER), int(ValueType.TEXT)]
68+
assert result == _sync_reference(column_types, row_types)
69+
70+
71+
async def test_null_first_row_rescued_from_later_row() -> None:
72+
# Column 0 is NULL in row 0 but INTEGER in row 2.
73+
column_types = [ValueType.NULL, ValueType.TEXT]
74+
row_types = [
75+
[ValueType.NULL, ValueType.TEXT],
76+
[ValueType.NULL, ValueType.TEXT],
77+
[ValueType.INTEGER, ValueType.TEXT],
78+
]
79+
result = await _resolve_null_rescue_type_codes(column_types, row_types)
80+
assert result[0] == int(ValueType.INTEGER)
81+
assert result == _sync_reference(column_types, row_types)
82+
83+
84+
async def test_all_null_column_falls_back_to_unknown() -> None:
85+
# Column 0 is NULL in EVERY row → UNKNOWN sentinel.
86+
column_types = [ValueType.NULL, ValueType.INTEGER]
87+
row_types = [[ValueType.NULL, ValueType.INTEGER] for _ in range(50)]
88+
result = await _resolve_null_rescue_type_codes(column_types, row_types)
89+
assert result[0] is _UNKNOWN_TYPE
90+
assert result[1] == int(ValueType.INTEGER)
91+
assert result == _sync_reference(column_types, row_types)
92+
93+
94+
async def test_ragged_rows_do_not_short_circuit() -> None:
95+
# Some rows are shorter than the column count; the scan must
96+
# skip them without breaking the all-row contract.
97+
column_types = [ValueType.NULL, ValueType.NULL]
98+
row_types = [
99+
[ValueType.NULL], # ragged: only 1 col
100+
[ValueType.NULL, ValueType.NULL],
101+
[ValueType.NULL, ValueType.FLOAT],
102+
]
103+
result = await _resolve_null_rescue_type_codes(column_types, row_types)
104+
assert result[0] is _UNKNOWN_TYPE # col 0 NULL everywhere
105+
assert result[1] == int(ValueType.FLOAT) # rescued from row 2
106+
assert result == _sync_reference(column_types, row_types)
107+
108+
109+
async def test_large_all_null_column_yields_cooperatively() -> None:
110+
"""A wide all-NULL fixture (200k rows, several all-NULL cols)
111+
must let a sibling coroutine make progress during the scan —
112+
the max inter-tick gap stays small. Pre-fix the scan ran
113+
synchronously with no yield, so the sibling never ticked
114+
until the entire O(cols × rows) walk finished.
115+
"""
116+
n_rows = 200_000
117+
# 3 columns, all NULL in row 0 AND every subsequent row → each
118+
# triggers a full-row scan.
119+
column_types = [ValueType.NULL, ValueType.NULL, ValueType.NULL]
120+
row_types = [[ValueType.NULL, ValueType.NULL, ValueType.NULL] for _ in range(n_rows)]
121+
122+
inter_tick_gaps: list[float] = []
123+
stop = False
124+
last = time.perf_counter()
125+
126+
async def _ticker() -> None:
127+
nonlocal last
128+
while not stop:
129+
await asyncio.sleep(0)
130+
now = time.perf_counter()
131+
inter_tick_gaps.append(now - last)
132+
last = now
133+
134+
ticker = asyncio.create_task(_ticker())
135+
try:
136+
last = time.perf_counter()
137+
result = await _resolve_null_rescue_type_codes(column_types, row_types)
138+
finally:
139+
stop = True
140+
await asyncio.sleep(0)
141+
ticker.cancel()
142+
with contextlib.suppress(asyncio.CancelledError):
143+
await ticker
144+
145+
assert all(rc is _UNKNOWN_TYPE for rc in result)
146+
# Drop first/last samples (startup/shutdown); assert no gap
147+
# exceeds 200 ms. Pre-fix the single synchronous scan of
148+
# 600k cells blocked the ticker for the whole walk.
149+
if len(inter_tick_gaps) > 2:
150+
worst = max(inter_tick_gaps[1:-1])
151+
assert worst < 0.200, (
152+
f"description NULL-rescue scan pinned the loop for "
153+
f"{worst * 1000:.1f} ms; the per-step yield should cap "
154+
f"inter-tick gaps well under 200 ms."
155+
)
156+
157+
158+
async def test_small_result_takes_synchronous_fast_path() -> None:
159+
"""Below ``_LARGE_RESULT_ROW_THRESHOLD`` the resolver must NOT
160+
await ``asyncio.sleep(0)`` — small fetches pay no scheduler
161+
overhead. Instrument ``asyncio.sleep`` to confirm zero calls.
162+
"""
163+
from unittest.mock import patch
164+
165+
column_types = [ValueType.NULL]
166+
row_types = [[ValueType.NULL] for _ in range(100)] # < threshold
167+
168+
sleep_calls: list[float] = []
169+
real_sleep = asyncio.sleep
170+
171+
async def _tracking_sleep(delay: float, *a: object, **k: object) -> None:
172+
sleep_calls.append(delay)
173+
await real_sleep(delay, *a, **k)
174+
175+
with patch.object(asyncio, "sleep", _tracking_sleep):
176+
result = await _resolve_null_rescue_type_codes(column_types, row_types)
177+
178+
assert result[0] is _UNKNOWN_TYPE
179+
assert sleep_calls == [], f"small result should not yield; saw sleep calls: {sleep_calls!r}"

0 commit comments

Comments
 (0)