From 9e32d72da9cf2136b7c366ea72227fdbcdbc4753 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Fri, 5 Jun 2026 20:36:11 -0700 Subject: [PATCH 1/4] test(integration): run arrow scenarios in CI instead of skipping --- test-requirements.txt | 4 ++ .../test_managed_tables_lifecycle.py | 55 +++++++++++++------ tests/integration/test_results_arrow.py | 7 ++- 3 files changed, 45 insertions(+), 21 deletions(-) diff --git a/test-requirements.txt b/test-requirements.txt index e98555c..57f8403 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -4,3 +4,7 @@ tox >= 3.9.0 flake8 >= 4.0.0 types-python-dateutil >= 2.8.19.14 mypy >= 1.5 +# pyarrow backs the `arrow` extra. Required here (not just an optional extra) so +# the arrow integration scenarios actually run in CI instead of silently +# skipping via importorskip. Keep the floor in sync with pyproject's extra. +pyarrow >= 14 diff --git a/tests/integration/test_managed_tables_lifecycle.py b/tests/integration/test_managed_tables_lifecycle.py index b3df299..a310a12 100644 --- a/tests/integration/test_managed_tables_lifecycle.py +++ b/tests/integration/test_managed_tables_lifecycle.py @@ -6,33 +6,41 @@ 1. declare a schema and a table on the database's default catalog connection, 2. upload a small parquet file, 3. load it into the table (load_managed_table), - 4. read get_table_profile, - 5. refresh the catalog metadata, - 6. purge_table_cache, - 7. delete the managed table. + 4. poll get_table_profile until the load syncs, + 5. purge_table_cache, + 6. delete the managed table. + +Note on managed-catalog semantics: there is no `refresh` step. `refresh` is +rejected with a 400 on a managed catalog ("use the loads endpoint to update its +data") — `load_managed_table` is itself the load. The profile is populated +asynchronously after the load, so step 4 polls get_table_profile (a 404 means +"not synced yet") rather than reading it once. The scratch_database fixture tears the database (and its catalog) down, so the -test touches no seeded data. Skipped if pyarrow is unavailable (needed to author -the parquet payload). +test touches no seeded data. pyarrow is a hard test dependency (see +test-requirements.txt) and is imported directly — a missing pyarrow must fail +loudly, never silently skip this scenario in CI. """ from __future__ import annotations import io +import time -import pytest - -pa = pytest.importorskip("pyarrow") -pq = pytest.importorskip("pyarrow.parquet") +import pyarrow as pa +import pyarrow.parquet as pq from hotdata.api.connections_api import ConnectionsApi from hotdata.api.databases_api import DatabasesApi -from hotdata.api.refresh_api import RefreshApi from hotdata.api.uploads_api import UploadsApi +from hotdata.exceptions import ApiException from hotdata.models.add_managed_schema_request import AddManagedSchemaRequest from hotdata.models.add_managed_table_request import AddManagedTableRequest from hotdata.models.load_managed_table_request import LoadManagedTableRequest -from hotdata.models.refresh_request import RefreshRequest + + +PROFILE_SYNC_TIMEOUT_S = 60.0 +PROFILE_POLL_INTERVAL_S = 2.0 def _parquet_bytes() -> bytes: @@ -46,7 +54,6 @@ def test_managed_tables_lifecycle( databases_api: DatabasesApi, connections_api: ConnectionsApi, uploads_api: UploadsApi, - refresh_api: RefreshApi, scratch_database: str, ) -> None: # The database's auto-provisioned default catalog is a managed catalog, @@ -77,15 +84,27 @@ def test_managed_tables_lifecycle( assert loaded.table_name == table_name assert loaded.row_count == 3 - profile = connections_api.get_table_profile(connection_id, schema_name, table_name) + # The profile syncs asynchronously after the load — get_table_profile 404s + # ("Table may not be synced yet") until it lands. Poll instead of reading + # once. There is no manual trigger to force this: refresh is rejected on a + # managed catalog, and load_managed_table is the load. + deadline = time.monotonic() + PROFILE_SYNC_TIMEOUT_S + profile = None + while time.monotonic() < deadline: + try: + profile = connections_api.get_table_profile( + connection_id, schema_name, table_name + ) + break + except ApiException as exc: + if exc.status != 404: + raise + time.sleep(PROFILE_POLL_INTERVAL_S) + assert profile is not None, "table profile never synced after load" assert profile.var_schema == schema_name assert profile.table == table_name assert profile.row_count == 3 - # Refresh the catalog metadata for the managed connection. - refreshed = refresh_api.refresh(RefreshRequest(connection_id=connection_id)) - assert refreshed.actual_instance is not None - # purge_table_cache and delete_managed_table both return None on success. connections_api.purge_table_cache(connection_id, schema_name, table_name) connections_api.delete_managed_table(connection_id, schema_name, table_name) diff --git a/tests/integration/test_results_arrow.py b/tests/integration/test_results_arrow.py index e1a2311..0c0abc0 100644 --- a/tests/integration/test_results_arrow.py +++ b/tests/integration/test_results_arrow.py @@ -5,17 +5,18 @@ that Arrow IPC content negotiation works end-to-end and that the streaming variant yields the same data. -Skipped if pyarrow is not installed (the helper requires the ``arrow`` extra). +pyarrow is a hard test dependency (see test-requirements.txt), so this imports +it directly rather than via importorskip — a missing pyarrow must fail loudly, +never silently skip this scenario in CI. """ from __future__ import annotations import time +import pyarrow as pa import pytest -pa = pytest.importorskip("pyarrow") - from hotdata.api.query_api import QueryApi from hotdata.api.query_runs_api import QueryRunsApi from hotdata.arrow import ResultsApi From 16c0e40d8fed02f7561645d56608073b4caa6107 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Fri, 5 Jun 2026 21:00:25 -0700 Subject: [PATCH 2/4] test(integration): align arrow scenarios with prod behavior --- .../test_managed_tables_lifecycle.py | 48 +++++-------------- tests/integration/test_results_arrow.py | 9 +++- 2 files changed, 20 insertions(+), 37 deletions(-) diff --git a/tests/integration/test_managed_tables_lifecycle.py b/tests/integration/test_managed_tables_lifecycle.py index a310a12..426350d 100644 --- a/tests/integration/test_managed_tables_lifecycle.py +++ b/tests/integration/test_managed_tables_lifecycle.py @@ -5,16 +5,19 @@ 1. declare a schema and a table on the database's default catalog connection, 2. upload a small parquet file, - 3. load it into the table (load_managed_table), - 4. poll get_table_profile until the load syncs, - 5. purge_table_cache, - 6. delete the managed table. + 3. load it into the table (load_managed_table) and verify the load response, + 4. purge_table_cache, + 5. delete the managed table. -Note on managed-catalog semantics: there is no `refresh` step. `refresh` is -rejected with a 400 on a managed catalog ("use the loads endpoint to update its -data") — `load_managed_table` is itself the load. The profile is populated -asynchronously after the load, so step 4 polls get_table_profile (a 404 means -"not synced yet") rather than reading it once. +Notes on managed-catalog semantics (both confirmed against prod): + + * There is no `refresh` step. `refresh` is rejected with a 400 on a managed + catalog ("use the loads endpoint to update its data") — `load_managed_table` + is itself the load. + * There is no `get_table_profile` step. The profile is not populated within a + usable window after a load, and no API call (refresh is rejected; there is + no profile-build/scan endpoint) triggers it — so the load is verified via + the `load_managed_table` response (row_count etc.), not a profile read. The scratch_database fixture tears the database (and its catalog) down, so the test touches no seeded data. pyarrow is a hard test dependency (see @@ -25,7 +28,6 @@ from __future__ import annotations import io -import time import pyarrow as pa import pyarrow.parquet as pq @@ -33,16 +35,11 @@ from hotdata.api.connections_api import ConnectionsApi from hotdata.api.databases_api import DatabasesApi from hotdata.api.uploads_api import UploadsApi -from hotdata.exceptions import ApiException from hotdata.models.add_managed_schema_request import AddManagedSchemaRequest from hotdata.models.add_managed_table_request import AddManagedTableRequest from hotdata.models.load_managed_table_request import LoadManagedTableRequest -PROFILE_SYNC_TIMEOUT_S = 60.0 -PROFILE_POLL_INTERVAL_S = 2.0 - - def _parquet_bytes() -> bytes: table = pa.table({"x": [1, 2, 3], "msg": ["a", "b", "c"]}) buf = io.BytesIO() @@ -84,27 +81,6 @@ def test_managed_tables_lifecycle( assert loaded.table_name == table_name assert loaded.row_count == 3 - # The profile syncs asynchronously after the load — get_table_profile 404s - # ("Table may not be synced yet") until it lands. Poll instead of reading - # once. There is no manual trigger to force this: refresh is rejected on a - # managed catalog, and load_managed_table is the load. - deadline = time.monotonic() + PROFILE_SYNC_TIMEOUT_S - profile = None - while time.monotonic() < deadline: - try: - profile = connections_api.get_table_profile( - connection_id, schema_name, table_name - ) - break - except ApiException as exc: - if exc.status != 404: - raise - time.sleep(PROFILE_POLL_INTERVAL_S) - assert profile is not None, "table profile never synced after load" - assert profile.var_schema == schema_name - assert profile.table == table_name - assert profile.row_count == 3 - # purge_table_cache and delete_managed_table both return None on success. connections_api.purge_table_cache(connection_id, schema_name, table_name) connections_api.delete_managed_table(connection_id, schema_name, table_name) diff --git a/tests/integration/test_results_arrow.py b/tests/integration/test_results_arrow.py index 0c0abc0..c2d561b 100644 --- a/tests/integration/test_results_arrow.py +++ b/tests/integration/test_results_arrow.py @@ -53,7 +53,14 @@ def test_results_arrow( QueryRequest( var_async=True, async_after_ms=1000, - sql="SELECT 1 AS x, 'hello' AS msg UNION ALL SELECT 2, 'world'", + # ORDER BY so row order is deterministic — UNION ALL alone does not + # guarantee it, and the column/pagination asserts below depend on it. + sql=( + "SELECT x, msg FROM (" + "SELECT 1 AS x, 'hello' AS msg " + "UNION ALL SELECT 2, 'world'" + ") ORDER BY x" + ), ), x_database_id=database_id, ) From 8b0152a5758af83aab74e20aa68bf507c3cb8bd1 Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Fri, 5 Jun 2026 21:02:16 -0700 Subject: [PATCH 3/4] test(integration): drop purge_table_cache, unsupported on managed catalogs --- .../test_managed_tables_lifecycle.py | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/tests/integration/test_managed_tables_lifecycle.py b/tests/integration/test_managed_tables_lifecycle.py index 426350d..4a5cb16 100644 --- a/tests/integration/test_managed_tables_lifecycle.py +++ b/tests/integration/test_managed_tables_lifecycle.py @@ -6,18 +6,20 @@ 1. declare a schema and a table on the database's default catalog connection, 2. upload a small parquet file, 3. load it into the table (load_managed_table) and verify the load response, - 4. purge_table_cache, - 5. delete the managed table. + 4. delete the managed table. -Notes on managed-catalog semantics (both confirmed against prod): +Notes on managed-catalog semantics (all confirmed against prod). A managed +catalog rejects the maintenance ops that apply to external catalogs, so this +scenario deliberately omits them: - * There is no `refresh` step. `refresh` is rejected with a 400 on a managed - catalog ("use the loads endpoint to update its data") — `load_managed_table` - is itself the load. - * There is no `get_table_profile` step. The profile is not populated within a - usable window after a load, and no API call (refresh is rejected; there is - no profile-build/scan endpoint) triggers it — so the load is verified via - the `load_managed_table` response (row_count etc.), not a profile read. + * No `refresh` step — rejected with 400 on a managed catalog ("use the loads + endpoint to update its data"); `load_managed_table` is itself the load. + * No `purge_table_cache` step — rejected with 400 ("purge not supported for + managed catalogs"). + * No `get_table_profile` step — the profile is not populated within a usable + window after a load, and no API call (refresh and purge are rejected; there + is no profile-build/scan endpoint) triggers it. The load is verified via the + `load_managed_table` response (row_count etc.), not a profile read. The scratch_database fixture tears the database (and its catalog) down, so the test touches no seeded data. pyarrow is a hard test dependency (see @@ -81,6 +83,5 @@ def test_managed_tables_lifecycle( assert loaded.table_name == table_name assert loaded.row_count == 3 - # purge_table_cache and delete_managed_table both return None on success. - connections_api.purge_table_cache(connection_id, schema_name, table_name) + # delete_managed_table returns None on success. connections_api.delete_managed_table(connection_id, schema_name, table_name) From da9d5b5e829d8f2e1afe7f65190e1a814b6ea6be Mon Sep 17 00:00:00 2001 From: Zac Farrell Date: Fri, 5 Jun 2026 21:07:54 -0700 Subject: [PATCH 4/4] fix(arrow): drain connection before release to avoid poisoning the pool --- hotdata/arrow.py | 26 ++++++++++++++++++++++---- tests/test_arrow.py | 13 +++++++++++-- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/hotdata/arrow.py b/hotdata/arrow.py index 3526f4b..5d3c5f8 100644 --- a/hotdata/arrow.py +++ b/hotdata/arrow.py @@ -50,6 +50,22 @@ def __init__(self, status: str, result_id: str) -> None: ) +def _release_conn(response: Any) -> None: + """Return a urllib3 connection to the pool without poisoning it. + + pyarrow stops reading at the Arrow end-of-stream marker, which can leave + bytes unconsumed at the HTTP layer (e.g. the terminating zero-length chunk + of a chunked response). ``release_conn`` on a partially-read connection + hands a tainted socket back to the pool — the next request to reuse it + fails with ``http.client.ResponseNotReady`` ("Request-sent"). Draining any + unread bytes first makes the connection safe to reuse. + """ + try: + response.drain_conn() + finally: + response.release_conn() + + def _import_pyarrow() -> Any: try: import pyarrow.ipc as ipc # type: ignore[import-untyped] @@ -95,7 +111,7 @@ def get_result_arrow( try: return ipc.open_stream(response).read_all() finally: - response.release_conn() + _release_conn(response) @contextmanager def stream_result_arrow( @@ -108,8 +124,10 @@ def stream_result_arrow( ) -> Iterator["pa.RecordBatchStreamReader"]: """Yield a :class:`pyarrow.RecordBatchStreamReader` for a ready result. - The HTTP connection is released when the context exits. Iterate the - reader to consume :class:`pyarrow.RecordBatch` messages, or call + The HTTP connection is drained and released when the context exits, so + exiting early (before the reader is exhausted) reads and discards the + remaining body to keep the connection reusable. Iterate the reader to + consume :class:`pyarrow.RecordBatch` messages, or call ``reader.read_all()`` for a full :class:`pyarrow.Table`. Example:: @@ -127,7 +145,7 @@ def stream_result_arrow( try: yield ipc.open_stream(response) finally: - response.release_conn() + _release_conn(response) def _call_arrow( self, diff --git a/tests/test_arrow.py b/tests/test_arrow.py index b67188d..3accdbc 100644 --- a/tests/test_arrow.py +++ b/tests/test_arrow.py @@ -45,7 +45,8 @@ class _FakeUrllib3Response(io.RawIOBase): pyarrow.ipc.open_stream wants a real file-like object (it checks ``closed`` and ``readable()``); the SDK's RESTResponse needs ``status``, - ``reason``, ``data``, and ``headers``. release_conn is recorded. + ``reason``, ``data``, and ``headers``. drain_conn / release_conn are + recorded so tests can assert the connection is drained before reuse. """ def __init__(self, status: int, body: bytes, headers: Dict[str, str]): @@ -56,6 +57,7 @@ def __init__(self, status: int, body: bytes, headers: Dict[str, str]): self._buf = io.BytesIO(body) self.headers = headers self.release_conn_called = False + self.drain_conn_called = False @property def data(self) -> bytes: @@ -71,6 +73,9 @@ def read(self, amt: Optional[int] = -1) -> bytes: def readable(self) -> bool: return True + def drain_conn(self) -> None: + self.drain_conn_called = True + def release_conn(self) -> None: self.release_conn_called = True @@ -146,6 +151,9 @@ def test_get_result_arrow_returns_table(monkeypatch: pytest.MonkeyPatch) -> None got = results.get_result_arrow("res_123") assert got.equals(table) + # Connection is drained (not just released) so a partially-read body can't + # poison the pool and break the next request with ResponseNotReady. + assert fake.drain_conn_called assert fake.release_conn_called # Single request was made. @@ -197,7 +205,8 @@ def test_stream_result_arrow_yields_reader( roundtrip = pa.Table.from_batches(batches, schema=reader.schema) assert roundtrip.equals(table) - # Connection is released after the context exits. + # Connection is drained and released after the context exits. + assert fake.drain_conn_called assert fake.release_conn_called