Skip to content

Commit 3569277

Browse files
Pin SAVEPOINT round-trip and concurrent-cursor visibility at the dbapi layer
Two integration test additions: - SAVEPOINT round-trip with partial rollback (sync) and RELEASE (async). ISSUE-701 added the equivalent at the SQLAlchemy adapter layer; the dbapi layer had no analogous pin and a future change to the SQL classifier or in-transaction tracking could silently break SAVEPOINT routing. - Concurrent-cursor visibility per PEP 249 §6.1.2. Two cursors on the same connection share that connection's transaction visibility: cursor B sees cursor A's uncommitted INSERT, and B sees A's ROLLBACK. The pin guards against a future regression that would route cursors through separate underlying client connections. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent dcb01a4 commit 3569277

2 files changed

Lines changed: 109 additions & 0 deletions

File tree

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
"""PEP 249 §6.1.2: cursors created from the same connection share that
2+
connection's transaction visibility. Pin that two cursors on one
3+
connection see each other's uncommitted writes inside a transaction,
4+
and that ROLLBACK on one cursor is observed by the other.
5+
"""
6+
7+
from __future__ import annotations
8+
9+
import pytest
10+
11+
from dqlitedbapi import connect
12+
from dqlitedbapi.aio import aconnect
13+
14+
15+
def test_sync_two_cursors_share_uncommitted_writes(cluster_address: str) -> None:
16+
conn = connect(cluster_address, timeout=2.0)
17+
try:
18+
setup = conn.cursor()
19+
setup.execute("DROP TABLE IF EXISTS test_two_cursors")
20+
setup.execute("CREATE TABLE test_two_cursors (n INTEGER PRIMARY KEY)")
21+
cur_a = conn.cursor()
22+
cur_b = conn.cursor()
23+
24+
cur_a.execute("BEGIN")
25+
cur_a.execute("INSERT INTO test_two_cursors (n) VALUES (1)")
26+
cur_b.execute("SELECT n FROM test_two_cursors WHERE n = 1")
27+
assert cur_b.fetchone() == (1,)
28+
cur_a.execute("ROLLBACK")
29+
cur_b.execute("SELECT n FROM test_two_cursors WHERE n = 1")
30+
assert cur_b.fetchone() is None
31+
finally:
32+
conn.close()
33+
34+
35+
@pytest.mark.asyncio
36+
async def test_async_two_cursors_share_uncommitted_writes(
37+
cluster_address: str,
38+
) -> None:
39+
conn = await aconnect(cluster_address, timeout=2.0)
40+
try:
41+
setup = conn.cursor()
42+
await setup.execute("DROP TABLE IF EXISTS test_two_cursors_aio")
43+
await setup.execute("CREATE TABLE test_two_cursors_aio (n INTEGER PRIMARY KEY)")
44+
cur_a = conn.cursor()
45+
cur_b = conn.cursor()
46+
47+
await cur_a.execute("BEGIN")
48+
await cur_a.execute("INSERT INTO test_two_cursors_aio (n) VALUES (5)")
49+
await cur_b.execute("SELECT n FROM test_two_cursors_aio WHERE n = 5")
50+
assert await cur_b.fetchone() == (5,)
51+
await cur_a.execute("ROLLBACK")
52+
await cur_b.execute("SELECT n FROM test_two_cursors_aio WHERE n = 5")
53+
assert await cur_b.fetchone() is None
54+
finally:
55+
await conn.close()
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
"""SAVEPOINT round-trip pins at the dbapi layer.
2+
3+
ISSUE-701 added SAVEPOINT integration coverage for the SQLAlchemy
4+
adapter. The dbapi layer had no equivalent — a future change to the
5+
SQL classifier or to the in-transaction flag tracking could silently
6+
break SAVEPOINT routing.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
import pytest
12+
13+
from dqlitedbapi import connect
14+
from dqlitedbapi.aio import aconnect
15+
16+
17+
def test_sync_savepoint_roundtrip_partial_rollback(cluster_address: str) -> None:
18+
conn = connect(cluster_address, timeout=2.0)
19+
try:
20+
cur = conn.cursor()
21+
cur.execute("DROP TABLE IF EXISTS test_sp_dbapi")
22+
cur.execute("CREATE TABLE test_sp_dbapi (n INTEGER PRIMARY KEY)")
23+
cur.execute("BEGIN")
24+
cur.execute("INSERT INTO test_sp_dbapi (n) VALUES (1)")
25+
cur.execute("SAVEPOINT sp1")
26+
cur.execute("INSERT INTO test_sp_dbapi (n) VALUES (2)")
27+
cur.execute("ROLLBACK TO SAVEPOINT sp1")
28+
cur.execute("RELEASE SAVEPOINT sp1")
29+
cur.execute("COMMIT")
30+
31+
cur.execute("SELECT n FROM test_sp_dbapi ORDER BY n")
32+
assert cur.fetchall() == [(1,)]
33+
finally:
34+
conn.close()
35+
36+
37+
@pytest.mark.asyncio
38+
async def test_async_savepoint_roundtrip_release(cluster_address: str) -> None:
39+
conn = await aconnect(cluster_address, timeout=2.0)
40+
try:
41+
cur = conn.cursor()
42+
await cur.execute("DROP TABLE IF EXISTS test_sp_dbapi_aio")
43+
await cur.execute("CREATE TABLE test_sp_dbapi_aio (n INTEGER PRIMARY KEY)")
44+
await cur.execute("BEGIN")
45+
await cur.execute("INSERT INTO test_sp_dbapi_aio (n) VALUES (10)")
46+
await cur.execute("SAVEPOINT sp_a")
47+
await cur.execute("INSERT INTO test_sp_dbapi_aio (n) VALUES (20)")
48+
await cur.execute("RELEASE SAVEPOINT sp_a")
49+
await cur.execute("COMMIT")
50+
51+
await cur.execute("SELECT n FROM test_sp_dbapi_aio ORDER BY n")
52+
assert await cur.fetchall() == [(10,), (20,)]
53+
finally:
54+
await conn.close()

0 commit comments

Comments
 (0)