diff --git a/hotdata/arrow.py b/hotdata/arrow.py index 3526f4b..5d3c5f8 100644 --- a/hotdata/arrow.py +++ b/hotdata/arrow.py @@ -50,6 +50,22 @@ def __init__(self, status: str, result_id: str) -> None: ) +def _release_conn(response: Any) -> None: + """Return a urllib3 connection to the pool without poisoning it. + + pyarrow stops reading at the Arrow end-of-stream marker, which can leave + bytes unconsumed at the HTTP layer (e.g. the terminating zero-length chunk + of a chunked response). ``release_conn`` on a partially-read connection + hands a tainted socket back to the pool — the next request to reuse it + fails with ``http.client.ResponseNotReady`` ("Request-sent"). Draining any + unread bytes first makes the connection safe to reuse. + """ + try: + response.drain_conn() + finally: + response.release_conn() + + def _import_pyarrow() -> Any: try: import pyarrow.ipc as ipc # type: ignore[import-untyped] @@ -95,7 +111,7 @@ def get_result_arrow( try: return ipc.open_stream(response).read_all() finally: - response.release_conn() + _release_conn(response) @contextmanager def stream_result_arrow( @@ -108,8 +124,10 @@ def stream_result_arrow( ) -> Iterator["pa.RecordBatchStreamReader"]: """Yield a :class:`pyarrow.RecordBatchStreamReader` for a ready result. - The HTTP connection is released when the context exits. Iterate the - reader to consume :class:`pyarrow.RecordBatch` messages, or call + The HTTP connection is drained and released when the context exits, so + exiting early (before the reader is exhausted) reads and discards the + remaining body to keep the connection reusable. Iterate the reader to + consume :class:`pyarrow.RecordBatch` messages, or call ``reader.read_all()`` for a full :class:`pyarrow.Table`. Example:: @@ -127,7 +145,7 @@ def stream_result_arrow( try: yield ipc.open_stream(response) finally: - response.release_conn() + _release_conn(response) def _call_arrow( self, diff --git a/test-requirements.txt b/test-requirements.txt index e98555c..57f8403 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -4,3 +4,7 @@ tox >= 3.9.0 flake8 >= 4.0.0 types-python-dateutil >= 2.8.19.14 mypy >= 1.5 +# pyarrow backs the `arrow` extra. Required here (not just an optional extra) so +# the arrow integration scenarios actually run in CI instead of silently +# skipping via importorskip. Keep the floor in sync with pyproject's extra. +pyarrow >= 14 diff --git a/tests/integration/test_managed_tables_lifecycle.py b/tests/integration/test_managed_tables_lifecycle.py index b3df299..4a5cb16 100644 --- a/tests/integration/test_managed_tables_lifecycle.py +++ b/tests/integration/test_managed_tables_lifecycle.py @@ -5,34 +5,41 @@ 1. declare a schema and a table on the database's default catalog connection, 2. upload a small parquet file, - 3. load it into the table (load_managed_table), - 4. read get_table_profile, - 5. refresh the catalog metadata, - 6. purge_table_cache, - 7. delete the managed table. + 3. load it into the table (load_managed_table) and verify the load response, + 4. delete the managed table. + +Notes on managed-catalog semantics (all confirmed against prod). A managed +catalog rejects the maintenance ops that apply to external catalogs, so this +scenario deliberately omits them: + + * No `refresh` step — rejected with 400 on a managed catalog ("use the loads + endpoint to update its data"); `load_managed_table` is itself the load. + * No `purge_table_cache` step — rejected with 400 ("purge not supported for + managed catalogs"). + * No `get_table_profile` step — the profile is not populated within a usable + window after a load, and no API call (refresh and purge are rejected; there + is no profile-build/scan endpoint) triggers it. The load is verified via the + `load_managed_table` response (row_count etc.), not a profile read. The scratch_database fixture tears the database (and its catalog) down, so the -test touches no seeded data. Skipped if pyarrow is unavailable (needed to author -the parquet payload). +test touches no seeded data. pyarrow is a hard test dependency (see +test-requirements.txt) and is imported directly — a missing pyarrow must fail +loudly, never silently skip this scenario in CI. """ from __future__ import annotations import io -import pytest - -pa = pytest.importorskip("pyarrow") -pq = pytest.importorskip("pyarrow.parquet") +import pyarrow as pa +import pyarrow.parquet as pq from hotdata.api.connections_api import ConnectionsApi from hotdata.api.databases_api import DatabasesApi -from hotdata.api.refresh_api import RefreshApi from hotdata.api.uploads_api import UploadsApi from hotdata.models.add_managed_schema_request import AddManagedSchemaRequest from hotdata.models.add_managed_table_request import AddManagedTableRequest from hotdata.models.load_managed_table_request import LoadManagedTableRequest -from hotdata.models.refresh_request import RefreshRequest def _parquet_bytes() -> bytes: @@ -46,7 +53,6 @@ def test_managed_tables_lifecycle( databases_api: DatabasesApi, connections_api: ConnectionsApi, uploads_api: UploadsApi, - refresh_api: RefreshApi, scratch_database: str, ) -> None: # The database's auto-provisioned default catalog is a managed catalog, @@ -77,15 +83,5 @@ def test_managed_tables_lifecycle( assert loaded.table_name == table_name assert loaded.row_count == 3 - profile = connections_api.get_table_profile(connection_id, schema_name, table_name) - assert profile.var_schema == schema_name - assert profile.table == table_name - assert profile.row_count == 3 - - # Refresh the catalog metadata for the managed connection. - refreshed = refresh_api.refresh(RefreshRequest(connection_id=connection_id)) - assert refreshed.actual_instance is not None - - # purge_table_cache and delete_managed_table both return None on success. - connections_api.purge_table_cache(connection_id, schema_name, table_name) + # delete_managed_table returns None on success. connections_api.delete_managed_table(connection_id, schema_name, table_name) diff --git a/tests/integration/test_results_arrow.py b/tests/integration/test_results_arrow.py index e1a2311..c2d561b 100644 --- a/tests/integration/test_results_arrow.py +++ b/tests/integration/test_results_arrow.py @@ -5,17 +5,18 @@ that Arrow IPC content negotiation works end-to-end and that the streaming variant yields the same data. -Skipped if pyarrow is not installed (the helper requires the ``arrow`` extra). +pyarrow is a hard test dependency (see test-requirements.txt), so this imports +it directly rather than via importorskip — a missing pyarrow must fail loudly, +never silently skip this scenario in CI. """ from __future__ import annotations import time +import pyarrow as pa import pytest -pa = pytest.importorskip("pyarrow") - from hotdata.api.query_api import QueryApi from hotdata.api.query_runs_api import QueryRunsApi from hotdata.arrow import ResultsApi @@ -52,7 +53,14 @@ def test_results_arrow( QueryRequest( var_async=True, async_after_ms=1000, - sql="SELECT 1 AS x, 'hello' AS msg UNION ALL SELECT 2, 'world'", + # ORDER BY so row order is deterministic — UNION ALL alone does not + # guarantee it, and the column/pagination asserts below depend on it. + sql=( + "SELECT x, msg FROM (" + "SELECT 1 AS x, 'hello' AS msg " + "UNION ALL SELECT 2, 'world'" + ") ORDER BY x" + ), ), x_database_id=database_id, ) diff --git a/tests/test_arrow.py b/tests/test_arrow.py index b67188d..3accdbc 100644 --- a/tests/test_arrow.py +++ b/tests/test_arrow.py @@ -45,7 +45,8 @@ class _FakeUrllib3Response(io.RawIOBase): pyarrow.ipc.open_stream wants a real file-like object (it checks ``closed`` and ``readable()``); the SDK's RESTResponse needs ``status``, - ``reason``, ``data``, and ``headers``. release_conn is recorded. + ``reason``, ``data``, and ``headers``. drain_conn / release_conn are + recorded so tests can assert the connection is drained before reuse. """ def __init__(self, status: int, body: bytes, headers: Dict[str, str]): @@ -56,6 +57,7 @@ def __init__(self, status: int, body: bytes, headers: Dict[str, str]): self._buf = io.BytesIO(body) self.headers = headers self.release_conn_called = False + self.drain_conn_called = False @property def data(self) -> bytes: @@ -71,6 +73,9 @@ def read(self, amt: Optional[int] = -1) -> bytes: def readable(self) -> bool: return True + def drain_conn(self) -> None: + self.drain_conn_called = True + def release_conn(self) -> None: self.release_conn_called = True @@ -146,6 +151,9 @@ def test_get_result_arrow_returns_table(monkeypatch: pytest.MonkeyPatch) -> None got = results.get_result_arrow("res_123") assert got.equals(table) + # Connection is drained (not just released) so a partially-read body can't + # poison the pool and break the next request with ResponseNotReady. + assert fake.drain_conn_called assert fake.release_conn_called # Single request was made. @@ -197,7 +205,8 @@ def test_stream_result_arrow_yields_reader( roundtrip = pa.Table.from_batches(batches, schema=reader.schema) assert roundtrip.equals(table) - # Connection is released after the context exits. + # Connection is drained and released after the context exits. + assert fake.drain_conn_called assert fake.release_conn_called