Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions hotdata/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,22 @@ def __init__(self, status: str, result_id: str) -> None:
)


def _release_conn(response: Any) -> None:
"""Return a urllib3 connection to the pool without poisoning it.

pyarrow stops reading at the Arrow end-of-stream marker, which can leave
bytes unconsumed at the HTTP layer (e.g. the terminating zero-length chunk
of a chunked response). ``release_conn`` on a partially-read connection
hands a tainted socket back to the pool — the next request to reuse it
fails with ``http.client.ResponseNotReady`` ("Request-sent"). Draining any
unread bytes first makes the connection safe to reuse.
"""
try:
response.drain_conn()
finally:
response.release_conn()


def _import_pyarrow() -> Any:
try:
import pyarrow.ipc as ipc # type: ignore[import-untyped]
Expand Down Expand Up @@ -95,7 +111,7 @@ def get_result_arrow(
try:
return ipc.open_stream(response).read_all()
finally:
response.release_conn()
_release_conn(response)

@contextmanager
def stream_result_arrow(
Expand All @@ -108,8 +124,10 @@ def stream_result_arrow(
) -> Iterator["pa.RecordBatchStreamReader"]:
"""Yield a :class:`pyarrow.RecordBatchStreamReader` for a ready result.

The HTTP connection is released when the context exits. Iterate the
reader to consume :class:`pyarrow.RecordBatch` messages, or call
The HTTP connection is drained and released when the context exits, so
exiting early (before the reader is exhausted) reads and discards the
remaining body to keep the connection reusable. Iterate the reader to
consume :class:`pyarrow.RecordBatch` messages, or call
``reader.read_all()`` for a full :class:`pyarrow.Table`.

Example::
Expand All @@ -127,7 +145,7 @@ def stream_result_arrow(
try:
yield ipc.open_stream(response)
finally:
response.release_conn()
_release_conn(response)

def _call_arrow(
self,
Expand Down
4 changes: 4 additions & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ tox >= 3.9.0
flake8 >= 4.0.0
types-python-dateutil >= 2.8.19.14
mypy >= 1.5
# pyarrow backs the `arrow` extra. Required here (not just an optional extra) so
# the arrow integration scenarios actually run in CI instead of silently
# skipping via importorskip. Keep the floor in sync with pyproject's extra.
pyarrow >= 14
46 changes: 21 additions & 25 deletions tests/integration/test_managed_tables_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,41 @@

1. declare a schema and a table on the database's default catalog connection,
2. upload a small parquet file,
3. load it into the table (load_managed_table),
4. read get_table_profile,
5. refresh the catalog metadata,
6. purge_table_cache,
7. delete the managed table.
3. load it into the table (load_managed_table) and verify the load response,
4. delete the managed table.

Notes on managed-catalog semantics (all confirmed against prod). A managed
catalog rejects the maintenance ops that apply to external catalogs, so this
scenario deliberately omits them:

* No `refresh` step — rejected with 400 on a managed catalog ("use the loads
endpoint to update its data"); `load_managed_table` is itself the load.
* No `purge_table_cache` step — rejected with 400 ("purge not supported for
managed catalogs").
* No `get_table_profile` step — the profile is not populated within a usable
window after a load, and no API call (refresh and purge are rejected; there
is no profile-build/scan endpoint) triggers it. The load is verified via the
`load_managed_table` response (row_count etc.), not a profile read.

The scratch_database fixture tears the database (and its catalog) down, so the
test touches no seeded data. Skipped if pyarrow is unavailable (needed to author
the parquet payload).
test touches no seeded data. pyarrow is a hard test dependency (see
test-requirements.txt) and is imported directly — a missing pyarrow must fail
loudly, never silently skip this scenario in CI.
"""

from __future__ import annotations

import io

import pytest

pa = pytest.importorskip("pyarrow")
pq = pytest.importorskip("pyarrow.parquet")
import pyarrow as pa
import pyarrow.parquet as pq

from hotdata.api.connections_api import ConnectionsApi
from hotdata.api.databases_api import DatabasesApi
from hotdata.api.refresh_api import RefreshApi
from hotdata.api.uploads_api import UploadsApi
from hotdata.models.add_managed_schema_request import AddManagedSchemaRequest
from hotdata.models.add_managed_table_request import AddManagedTableRequest
from hotdata.models.load_managed_table_request import LoadManagedTableRequest
from hotdata.models.refresh_request import RefreshRequest


def _parquet_bytes() -> bytes:
Expand All @@ -46,7 +53,6 @@ def test_managed_tables_lifecycle(
databases_api: DatabasesApi,
connections_api: ConnectionsApi,
uploads_api: UploadsApi,
refresh_api: RefreshApi,
scratch_database: str,
) -> None:
# The database's auto-provisioned default catalog is a managed catalog,
Expand Down Expand Up @@ -77,15 +83,5 @@ def test_managed_tables_lifecycle(
assert loaded.table_name == table_name
assert loaded.row_count == 3

profile = connections_api.get_table_profile(connection_id, schema_name, table_name)
assert profile.var_schema == schema_name
assert profile.table == table_name
assert profile.row_count == 3

# Refresh the catalog metadata for the managed connection.
refreshed = refresh_api.refresh(RefreshRequest(connection_id=connection_id))
assert refreshed.actual_instance is not None

# purge_table_cache and delete_managed_table both return None on success.
connections_api.purge_table_cache(connection_id, schema_name, table_name)
# delete_managed_table returns None on success.
connections_api.delete_managed_table(connection_id, schema_name, table_name)
16 changes: 12 additions & 4 deletions tests/integration/test_results_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
that Arrow IPC content negotiation works end-to-end and that the streaming
variant yields the same data.

Skipped if pyarrow is not installed (the helper requires the ``arrow`` extra).
pyarrow is a hard test dependency (see test-requirements.txt), so this imports
it directly rather than via importorskip — a missing pyarrow must fail loudly,
never silently skip this scenario in CI.
"""

from __future__ import annotations

import time

import pyarrow as pa
import pytest

pa = pytest.importorskip("pyarrow")

from hotdata.api.query_api import QueryApi
from hotdata.api.query_runs_api import QueryRunsApi
from hotdata.arrow import ResultsApi
Expand Down Expand Up @@ -52,7 +53,14 @@ def test_results_arrow(
QueryRequest(
var_async=True,
async_after_ms=1000,
sql="SELECT 1 AS x, 'hello' AS msg UNION ALL SELECT 2, 'world'",
# ORDER BY so row order is deterministic — UNION ALL alone does not
# guarantee it, and the column/pagination asserts below depend on it.
sql=(
"SELECT x, msg FROM ("
"SELECT 1 AS x, 'hello' AS msg "
"UNION ALL SELECT 2, 'world'"
") ORDER BY x"
),
),
x_database_id=database_id,
)
Expand Down
13 changes: 11 additions & 2 deletions tests/test_arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class _FakeUrllib3Response(io.RawIOBase):

pyarrow.ipc.open_stream wants a real file-like object (it checks
``closed`` and ``readable()``); the SDK's RESTResponse needs ``status``,
``reason``, ``data``, and ``headers``. release_conn is recorded.
``reason``, ``data``, and ``headers``. drain_conn / release_conn are
recorded so tests can assert the connection is drained before reuse.
"""

def __init__(self, status: int, body: bytes, headers: Dict[str, str]):
Expand All @@ -56,6 +57,7 @@ def __init__(self, status: int, body: bytes, headers: Dict[str, str]):
self._buf = io.BytesIO(body)
self.headers = headers
self.release_conn_called = False
self.drain_conn_called = False

@property
def data(self) -> bytes:
Expand All @@ -71,6 +73,9 @@ def read(self, amt: Optional[int] = -1) -> bytes:
def readable(self) -> bool:
return True

def drain_conn(self) -> None:
self.drain_conn_called = True

def release_conn(self) -> None:
self.release_conn_called = True

Expand Down Expand Up @@ -146,6 +151,9 @@ def test_get_result_arrow_returns_table(monkeypatch: pytest.MonkeyPatch) -> None
got = results.get_result_arrow("res_123")

assert got.equals(table)
# Connection is drained (not just released) so a partially-read body can't
# poison the pool and break the next request with ResponseNotReady.
assert fake.drain_conn_called
assert fake.release_conn_called

# Single request was made.
Expand Down Expand Up @@ -197,7 +205,8 @@ def test_stream_result_arrow_yields_reader(
roundtrip = pa.Table.from_batches(batches, schema=reader.schema)
assert roundtrip.equals(table)

# Connection is released after the context exits.
# Connection is drained and released after the context exits.
assert fake.drain_conn_called
assert fake.release_conn_called


Expand Down