Skip to content

Commit 91cd0a6

Browse files
feat(kernel): wire retry/backoff params + verify metric-view on use_kernel (#820)
* feat(kernel): wire retry/backoff params on use_kernel + verify metric-view passthrough Consume the kernel's newly-configurable retry policy (kernel PR #117) from the use_kernel path, and confirm UC Metric View metadata works end-to-end now that the kernel allowlists its session conf. Bumps KERNEL_REV to the merged kernel main. Retry: - session.py forwards the connector's _retry_* kwargs into the kernel client (new retry_options, kernel-only; Thrift/SEA unaffected). - client._kernel_retry_kwargs maps them to the kernel Session retry kwargs: _retry_delay_min -> retry_min_wait_secs, _retry_delay_max -> retry_max_wait_secs, _retry_stop_after_attempts_count -> retry_max_attempts (1:1 total-attempts; the kernel does the retries-after-first conversion), _retry_stop_after_attempts_duration -> retry_overall_timeout_secs. Connector delays are float seconds; the kernel takes whole seconds, so we round, flooring any positive sub-second value to 1s (never collapse a configured backoff to 0). _retry_delay_default has no kernel equivalent (the kernel's no-Retry-After backoff is exponential from retry_min_wait). Metric View: - No connector wiring needed: enable_metric_view_metadata already injects spark.sql.thriftserver.metadata.metricview.enabled into session_configuration (backend-agnostic), which the kernel client passes through to session_conf. Kernel #117 now allowlists that conf verbatim, so it reaches the server on the use_kernel path. Added an e2e test confirming the round-trip. KERNEL_REV -> b4d88220cdfad8dba1cfa89892269342ae26feeb (kernel main with retry config + metric-view allowlist). Tests: unit tests for _kernel_retry_kwargs (rounding, sub-second floor, count 1:1, only-set-keys) and retry-options threading through session.py (wheel-independent, verified with the kernel import blocked); live e2e (retry params accepted + metric-view passthrough) green against dogfood. black + mypy clean. Co-authored-by: Isaac Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com> * test: skip retry-threading test when pyarrow absent test_retry_kwargs_threaded_into_kernel_client patches KernelDatabricksClient, which requires importing databricks.sql.backend.kernel.client — that module imports pyarrow at load. In the no-pyarrow CI tier the patch target can't be resolved (AttributeError: module '...kernel' has no attribute 'client'), failing the Unit Tests (non-PyArrow) jobs while the PyArrow jobs passed. Add pytest.importorskip("pyarrow") to the test (matching the other kernel tests) so it skips without pyarrow and runs where it's present. Corrected the stale class docstring (it patches the client directly, not via a _create_backend stub). Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com> --------- Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com>
1 parent 39dc0a7 commit 91cd0a6

6 files changed

Lines changed: 255 additions & 2 deletions

File tree

KERNEL_REV

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ec2288742cbac0cd9fab50da353e8405972eefe9
1+
b4d88220cdfad8dba1cfa89892269342ae26feeb

src/databricks/sql/backend/kernel/client.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ def __init__(
104104
# OAuth secret is consumed during ``auth_provider`` construction
105105
# and isn't recoverable from the built provider.
106106
self._auth_options = kwargs.get("auth_options") or {}
107+
# Connector retry-tuning kwargs (the ``_retry_*`` family),
108+
# forwarded so the kernel's own retry loop honours them. Mapped
109+
# to the kernel ``Session``'s ``retry_*`` kwargs in
110+
# ``open_session`` via ``_kernel_retry_kwargs``.
111+
self._retry_options = kwargs.get("retry_options") or {}
107112
self._catalog = catalog
108113
self._schema = schema
109114
# ``_use_arrow_native_complex_types`` is the connector-side
@@ -179,6 +184,9 @@ def open_session(
179184
# Translate the connector's SSLOptions into the kernel's
180185
# ``tls_*`` Session kwargs. Empty when TLS is left at defaults.
181186
tls_kwargs = _kernel_tls_kwargs(self._ssl_options)
187+
# Translate the connector's ``_retry_*`` kwargs into the kernel's
188+
# ``retry_*`` Session kwargs. Empty when retry is left at defaults.
189+
retry_kwargs = _kernel_retry_kwargs(self._retry_options)
182190
try:
183191
self._kernel_session = _kernel.Session(
184192
host=self._server_hostname,
@@ -199,6 +207,7 @@ def open_session(
199207
intervals_as_string=True,
200208
**auth_kwargs,
201209
**tls_kwargs,
210+
**retry_kwargs,
202211
)
203212
except Exception as exc:
204213
raise _wrap_kernel_exception("open_session", exc) from exc
@@ -729,6 +738,66 @@ def _kernel_tls_kwargs(ssl_options) -> Dict[str, Any]:
729738
return kwargs
730739

731740

741+
def _kernel_retry_kwargs(retry_options: Dict[str, Any]) -> Dict[str, Any]:
742+
"""Translate the connector's ``_retry_*`` tuning into the kernel
743+
``Session``'s ``retry_*`` kwargs.
744+
745+
Only knobs the caller actually set are emitted, so an untuned
746+
connection produces an empty dict (kernel keeps its default policy:
747+
1s/60s backoff, 6 total attempts, 900s budget).
748+
749+
Mappings (connector → kernel):
750+
751+
- ``retry_delay_min`` (float secs) → ``retry_min_wait_secs``
752+
- ``retry_delay_max`` (float secs) → ``retry_max_wait_secs``
753+
- ``retry_stop_after_attempts_count`` (int, **total** attempts) →
754+
``retry_max_attempts`` (1:1 — the kernel converts to its
755+
retries-after-first internally)
756+
- ``retry_stop_after_attempts_duration`` (float secs) →
757+
``retry_overall_timeout_secs``
758+
759+
The connector expresses delays/durations as **floats in seconds**;
760+
the kernel takes **whole seconds** (``u64``). We round to the
761+
nearest second, with a floor of 1s for any positive sub-second
762+
value so a configured delay never collapses to "no wait".
763+
764+
``_retry_delay_default`` has no kernel counterpart and is ignored:
765+
the kernel's no-``Retry-After`` backoff is exponential from
766+
``retry_min_wait``, which already plays that role.
767+
"""
768+
kwargs: Dict[str, Any] = {}
769+
770+
def _secs(value: Any) -> Optional[int]:
771+
if value is None:
772+
return None
773+
rounded = round(float(value))
774+
# Never round a positive delay down to 0 — that would turn a
775+
# configured backoff into a busy-retry. Floor at 1s.
776+
if rounded <= 0 and float(value) > 0:
777+
return 1
778+
return rounded
779+
780+
min_wait = _secs(retry_options.get("retry_delay_min"))
781+
if min_wait is not None:
782+
kwargs["retry_min_wait_secs"] = min_wait
783+
784+
max_wait = _secs(retry_options.get("retry_delay_max"))
785+
if max_wait is not None:
786+
kwargs["retry_max_wait_secs"] = max_wait
787+
788+
count = retry_options.get("retry_stop_after_attempts_count")
789+
if count is not None:
790+
# Total-attempts count, forwarded 1:1; the kernel converts to
791+
# its retries-after-first representation.
792+
kwargs["retry_max_attempts"] = int(count)
793+
794+
duration = _secs(retry_options.get("retry_stop_after_attempts_duration"))
795+
if duration is not None:
796+
kwargs["retry_overall_timeout_secs"] = duration
797+
798+
return kwargs
799+
800+
732801
def _read_pem_bytes(path: str, label: str) -> bytes:
733802
"""Read a PEM file into bytes, mapping IO errors to a clear
734803
``ProgrammingError`` that names the offending TLS option. An empty

src/databricks/sql/session.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,23 @@ def _create_backend(
174174
"oauth_scopes": kwargs.get("oauth_scopes"),
175175
"credentials_provider": kwargs.get("credentials_provider"),
176176
}
177+
# Forward the connector's retry-tuning kwargs so the kernel's
178+
# own retry policy honours them (the kernel owns the retry
179+
# loop on this path). Only the keys with a kernel counterpart
180+
# are passed; `_retry_delay_default` is intentionally omitted
181+
# (the kernel's no-Retry-After backoff is exponential from
182+
# its min-wait, so a flat default delay has no equivalent).
183+
# Kernel-only; Thrift / SEA are unaffected.
184+
kernel_retry_options = {
185+
"retry_delay_min": kwargs.get("_retry_delay_min"),
186+
"retry_delay_max": kwargs.get("_retry_delay_max"),
187+
"retry_stop_after_attempts_count": kwargs.get(
188+
"_retry_stop_after_attempts_count"
189+
),
190+
"retry_stop_after_attempts_duration": kwargs.get(
191+
"_retry_stop_after_attempts_duration"
192+
),
193+
}
177194
return KernelDatabricksClient(
178195
server_hostname=server_hostname,
179196
http_path=http_path,
@@ -185,6 +202,7 @@ def _create_backend(
185202
schema=kwargs.get("schema"),
186203
_use_arrow_native_complex_types=_use_arrow_native_complex_types,
187204
auth_options=kernel_auth_options,
205+
retry_options=kernel_retry_options,
188206
)
189207

190208
databricks_client_class: Type[DatabricksClient]

tests/e2e/test_kernel_backend.py

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import databricks.sql as sql
2727
from databricks.sql.exc import DatabaseError
2828

29-
3029
# Skip the whole module unless the kernel wheel is genuinely installed.
3130
# ``pytest.importorskip`` alone isn't enough: the kernel unit tests inject a
3231
# fake ``databricks_sql_kernel`` ModuleType into ``sys.modules`` so the
@@ -183,6 +182,46 @@ def test_session_configuration_round_trips(kernel_conn_params):
183182
assert kv.get("ANSI_MODE") == "false", f"got {rows!r}"
184183

185184

185+
def test_retry_params_accepted_end_to_end(kernel_conn_params):
186+
"""The connector's `_retry_*` tuning kwargs are translated to the
187+
kernel `Session`'s `retry_*` kwargs and accepted end-to-end. We
188+
can't easily force a retry against a live warehouse, so this is a
189+
smoke test: a connection configured with explicit retry params
190+
opens and runs a query successfully (proving the kwargs reach and
191+
are accepted by the kernel)."""
192+
params = dict(kernel_conn_params)
193+
params.update(
194+
_retry_delay_min=2,
195+
_retry_delay_max=30,
196+
_retry_stop_after_attempts_count=4,
197+
_retry_stop_after_attempts_duration=120,
198+
)
199+
with sql.connect(**params) as c:
200+
with c.cursor() as cur:
201+
cur.execute("SELECT 1 AS n")
202+
assert cur.fetchall()[0][0] == 1
203+
204+
205+
def test_enable_metric_view_metadata_lists_metric_view_table_type(kernel_conn_params):
206+
"""`enable_metric_view_metadata=True` injects the
207+
`spark.sql.thriftserver.metadata.metricview.enabled` session conf,
208+
which the kernel now passes through (verbatim) so the server
209+
surfaces `METRIC_VIEW` in `cursor.tables()`'s table-type column.
210+
211+
We assert the connection opens and `tables()` runs; the kernel
212+
already lists `METRIC_VIEW` among its table types, and the conf
213+
enables the server side. Not asserting a specific metric view
214+
exists in the catalog (workspace-dependent)."""
215+
params = dict(kernel_conn_params)
216+
params["enable_metric_view_metadata"] = True
217+
with sql.connect(**params) as c:
218+
with c.cursor() as cur:
219+
# Smoke: the conf was accepted (no SqlError on open) and a
220+
# metadata call works with it set.
221+
cur.tables()
222+
cur.fetchall()
223+
224+
186225
# ── Error mapping ─────────────────────────────────────────────────
187226

188227

tests/unit/test_kernel_client.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -956,3 +956,62 @@ def test_empty_ca_file_raises_programming_error(self, tmp_path):
956956
ca.write_bytes(b" \n")
957957
with pytest.raises(ProgrammingError, match="is empty"):
958958
kernel_client._kernel_tls_kwargs(SSLOptions(tls_trusted_ca_file=str(ca)))
959+
960+
961+
# ---------------------------------------------------------------------------
962+
# Retry translation: connector _retry_* -> kernel Session retry kwargs.
963+
# ---------------------------------------------------------------------------
964+
965+
966+
class TestKernelRetryKwargs:
967+
"""``_kernel_retry_kwargs`` maps the connector's ``_retry_*`` tuning
968+
onto the kernel ``Session``'s ``retry_*`` kwargs, rounding float
969+
seconds to whole seconds and forwarding the total-attempts count
970+
1:1 (the kernel does the retries-after-first conversion)."""
971+
972+
def test_empty_options_emit_no_kwargs(self):
973+
assert kernel_client._kernel_retry_kwargs({}) == {}
974+
975+
def test_all_options_mapped(self):
976+
out = kernel_client._kernel_retry_kwargs(
977+
{
978+
"retry_delay_min": 2.0,
979+
"retry_delay_max": 90.0,
980+
"retry_stop_after_attempts_count": 10,
981+
"retry_stop_after_attempts_duration": 600.0,
982+
}
983+
)
984+
assert out == {
985+
"retry_min_wait_secs": 2,
986+
"retry_max_wait_secs": 90,
987+
"retry_max_attempts": 10,
988+
"retry_overall_timeout_secs": 600,
989+
}
990+
991+
def test_count_forwarded_one_to_one(self):
992+
# Total-attempts count is passed verbatim; the kernel converts
993+
# to retries-after-first internally (so 1 means a single attempt).
994+
out = kernel_client._kernel_retry_kwargs({"retry_stop_after_attempts_count": 1})
995+
assert out == {"retry_max_attempts": 1}
996+
997+
def test_float_seconds_rounded(self):
998+
out = kernel_client._kernel_retry_kwargs(
999+
{"retry_delay_min": 2.4, "retry_delay_max": 2.6}
1000+
)
1001+
assert out == {"retry_min_wait_secs": 2, "retry_max_wait_secs": 3}
1002+
1003+
def test_subsecond_delay_floored_to_one(self):
1004+
# A positive sub-second delay (the connector allows 0.1) must
1005+
# not round down to 0 — that would turn backoff into busy-retry.
1006+
out = kernel_client._kernel_retry_kwargs({"retry_delay_min": 0.1})
1007+
assert out == {"retry_min_wait_secs": 1}
1008+
1009+
def test_only_set_keys_emitted(self):
1010+
out = kernel_client._kernel_retry_kwargs({"retry_delay_max": 30.0})
1011+
assert out == {"retry_max_wait_secs": 30}
1012+
1013+
def test_retry_delay_default_has_no_mapping(self):
1014+
# _retry_delay_default isn't forwarded by session.py and isn't a
1015+
# recognised key here — it has no kernel equivalent.
1016+
out = kernel_client._kernel_retry_kwargs({"retry_delay_default": 5.0})
1017+
assert out == {}

tests/unit/test_session.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,3 +408,71 @@ def test_use_kernel_pat_builds_minimal_access_token_provider(self):
408408
# PAT path: a minimal AccessTokenAuthProvider, not the
409409
# federation-wrapped connector provider.
410410
assert isinstance(sess.auth_provider, AccessTokenAuthProvider)
411+
412+
413+
class TestKernelRetryOptionsThreading:
414+
"""The connector's ``_retry_*`` kwargs must be forwarded into the
415+
kernel client's ``retry_options`` on the use_kernel path (the kernel
416+
owns the retry loop). Captures the kwargs session.py passes by
417+
patching ``KernelDatabricksClient`` and inspecting its call args.
418+
419+
Patching ``KernelDatabricksClient`` requires importing
420+
``databricks.sql.backend.kernel.client``, which imports pyarrow at
421+
module load — so this test is skipped when pyarrow is absent (the
422+
no-pyarrow CI tier), matching the other kernel tests. The Rust wheel
423+
is still faked via sys.modules so the kernel extension itself isn't
424+
needed.
425+
"""
426+
427+
PACKAGE = "databricks.sql"
428+
429+
def test_retry_kwargs_threaded_into_kernel_client(self):
430+
import sys
431+
import types
432+
433+
pytest.importorskip(
434+
"pyarrow",
435+
reason="kernel client module imports pyarrow at load",
436+
)
437+
438+
# The lazy ``from databricks.sql.backend.kernel.client import
439+
# KernelDatabricksClient`` triggers ``import databricks_sql_kernel``
440+
# at module load; the unit-test job has no Rust wheel, so inject
441+
# a fake module (scoped via patch.dict) before connect() runs.
442+
fake = types.ModuleType("databricks_sql_kernel")
443+
fake.KernelError = type("KernelError", (Exception,), {})
444+
fake.Session = MagicMock()
445+
446+
# Patch the kernel client class (imported lazily inside
447+
# _create_backend) and the provider builder; capture the kwargs
448+
# session.py passes to the kernel client.
449+
with patch.dict(sys.modules, {"databricks_sql_kernel": fake}), patch(
450+
"databricks.sql.backend.kernel.client.KernelDatabricksClient"
451+
) as mock_kernel_client, patch(
452+
"%s.session.get_python_sql_connector_auth_provider" % self.PACKAGE
453+
):
454+
instance = mock_kernel_client.return_value
455+
instance.open_session.return_value = SessionId(
456+
BackendType.SEA, "sess-id", None
457+
)
458+
459+
conn = databricks.sql.connect(
460+
server_hostname="foo",
461+
http_path="/sql/1.0/warehouses/abc",
462+
use_kernel=True,
463+
access_token="dapi-xyz",
464+
enable_telemetry=False,
465+
_retry_delay_min=2.0,
466+
_retry_delay_max=90.0,
467+
_retry_stop_after_attempts_count=10,
468+
_retry_stop_after_attempts_duration=600.0,
469+
)
470+
try:
471+
_, kwargs = mock_kernel_client.call_args
472+
opts = kwargs["retry_options"]
473+
assert opts["retry_delay_min"] == 2.0
474+
assert opts["retry_delay_max"] == 90.0
475+
assert opts["retry_stop_after_attempts_count"] == 10
476+
assert opts["retry_stop_after_attempts_duration"] == 600.0
477+
finally:
478+
conn.close()

0 commit comments

Comments
 (0)