diff --git a/docs/index.rst b/docs/index.rst index cbe2c5ddb..e64ef885d 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -117,6 +117,10 @@ application using OpenTelemetry. One call to ``enable_tracing()`` instruments query sessions, transactions, and connection pool operations — so you can visualize request flow in Jaeger, Grafana, or any OpenTelemetry-compatible backend. +The same page also covers client-side metrics. ``enable_metrics()`` exposes operation +latency, retry cost, and query session pool metrics through an OpenTelemetry +``MeterProvider``. + API Reference ------------- diff --git a/docs/opentelemetry.rst b/docs/opentelemetry.rst index c4eb810e8..b0d633146 100644 --- a/docs/opentelemetry.rst +++ b/docs/opentelemetry.rst @@ -1,14 +1,22 @@ -OpenTelemetry Tracing -===================== - -The SDK provides built-in distributed tracing via `OpenTelemetry `_. -When enabled, key YDB operations — such as session creation, query execution, transaction -commit/rollback, and driver initialization — produce OpenTelemetry spans. Trace -context is automatically propagated to the YDB server through gRPC metadata using the +OpenTelemetry +============= + +The SDK provides built-in distributed tracing and client-side metrics via +`OpenTelemetry `_. When tracing is enabled, key YDB +operations — such as session creation, query execution, transaction commit/rollback, +and driver initialization — produce OpenTelemetry spans. Trace context is automatically +propagated to the YDB server through gRPC metadata using the `W3C Trace Context `_ standard. -Tracing is **zero-cost when disabled**: the SDK uses no-op stubs by default, so there is -no overhead unless you explicitly opt in. +Metrics expose operation latency/failures, retry cost, and query session pool state. +Tracing and metrics are configured independently: enabling one does not require enabling +the other. + +Instrumentation is **zero-cost when disabled**: the SDK uses no-op tracing and +metrics registries by default, so importing the SDK does not import OpenTelemetry +or create metric instruments unless you explicitly opt in. ``enable_tracing()`` +loads the tracing plugin, while ``enable_metrics()`` loads the metrics plugin and +replaces the no-op metrics registry with an OpenTelemetry-backed registry. Installation @@ -22,7 +30,7 @@ OpenTelemetry packages are not included by default. Install the SDK with the pip install ydb[opentelemetry] This pulls in ``opentelemetry-api``. You will also need ``opentelemetry-sdk`` and an -exporter for your tracing backend, for example: +exporter for your tracing or metrics backend, for example: .. code-block:: sh @@ -73,6 +81,53 @@ Repeated calls to ``enable_tracing()`` do nothing until you call ``disable_traci which removes hooks so you can reconfigure or turn instrumentation off. +Enabling Metrics +---------------- + +Call ``enable_metrics()`` once, after configuring your OpenTelemetry meter provider +and before creating YDB drivers or query session pools: + +.. code-block:: python + + from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + from opentelemetry.sdk.resources import Resource + + import ydb + from ydb.opentelemetry import enable_metrics + + # 1. Set up OpenTelemetry + resource = Resource(attributes={"service.name": "my-service"}) + metric_reader = PeriodicExportingMetricReader( + OTLPMetricExporter(endpoint="http://localhost:4317"), + export_interval_millis=1000, + ) + meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) + + # 2. Enable YDB metrics + enable_metrics(meter_provider) + + # 3. Use the SDK as usual — metrics are recorded automatically + with ydb.Driver(endpoint="grpc://localhost:2136", database="/local") as driver: + driver.wait(timeout=5) + with ydb.QuerySessionPool(driver, name="main-pool") as pool: + pool.execute_with_retries("SELECT 1") + + meter_provider.shutdown() + +``enable_metrics()`` accepts an optional ``meter_provider`` argument. If omitted, the +SDK obtains a meter named ``"ydb.sdk"`` from the global meter provider. + +Repeated calls to ``enable_metrics()`` do nothing until you call +``disable_metrics()``, which clears the in-memory observable metric values and allows +metrics to be reconfigured. After disabling metrics, the SDK restores the no-op +metrics registry, so metric recording calls remain cheap no-ops. + +Metrics are independent from tracing. If both ``enable_tracing()`` and +``enable_metrics()`` are called, YDB client operations produce both spans and metrics. + + What Is Instrumented -------------------- @@ -171,6 +226,89 @@ On errors, the span also records: - ``db.response.status_code`` — the YDB status code name (e.g. ``"SCHEME_ERROR"``). +Metric Instruments +------------------ + +The SDK creates the following instruments with meter name ``"ydb.sdk"``: + +.. list-table:: + :header-rows: 1 + :widths: 30 15 15 40 + + * - Metric + - Instrument + - Unit + - Description + * - ``db.client.operation.duration`` + - Histogram + - ``s`` + - Latency of user-visible YDB client operations. + * - ``ydb.client.operation.failed`` + - Counter + - ``{command}`` + - Failed user-visible YDB client operations. + * - ``ydb.query.session.create_time`` + - Histogram + - ``s`` + - Time spent creating a query session. + * - ``ydb.query.session.pending_requests`` + - UpDownCounter + - ``{request}`` + - Requests currently waiting for a session from the pool. + * - ``ydb.query.session.timeouts`` + - Counter + - ``{connection}`` + - Session acquisition timeouts. + * - ``ydb.query.session.count`` + - ObservableUpDownCounter + - ``{connection}`` + - Current number of open query sessions by pool and state. + * - ``ydb.query.session.max`` + - ObservableUpDownCounter + - ``{connection}`` + - Maximum configured number of sessions for a query session pool. + * - ``ydb.query.session.min`` + - ObservableUpDownCounter + - ``{connection}`` + - Minimum configured number of sessions for a query session pool. The SDK does not configure + a pool minimum, so this metric is always reported as ``0``. + * - ``ydb.client.retry.duration`` + - Histogram + - ``s`` + - Total user-visible duration of a logical retried operation, including attempts and backoff. + * - ``ydb.client.retry.attempts`` + - Histogram + - ``{attempt}`` + - Number of attempts performed for one logical retried operation. + +Operation metrics use stable labels only: + +.. list-table:: + :header-rows: 1 + :widths: 35 65 + + * - Attribute + - Description + * - ``database`` + - Database path. + * - ``endpoint`` + - Configured endpoint in ``host:port`` form. + * - ``operation.name`` + - SDK operation name without the ``ydb.`` prefix, for example ``"ExecuteQuery"``. + * - ``status_code`` + - Added only to ``ydb.client.operation.failed``. + +Operation metrics are recorded for ``ExecuteQuery``, ``Commit``, ``Rollback``, +``CreateSession``, and ``BeginTransaction``. + +Query session metrics use ``ydb.query.session.pool.name``. The pool name is generated +automatically, or can be set explicitly with ``QuerySessionPool(..., name="main-pool")`` +for both synchronous and asynchronous pools. ``ydb.query.session.count`` also includes +``ydb.query.session.state`` with values ``"idle"`` or ``"used"``. + +Retry metrics are recorded without attributes. + + Trace Context Propagation ------------------------- @@ -189,16 +327,17 @@ request path. Async Usage ----------- -Tracing works identically with the async driver. Call ``enable_tracing()`` once at -startup: +Tracing and metrics work identically with the async driver. Call +``enable_tracing()`` and/or ``enable_metrics()`` once at startup: .. code-block:: python import asyncio import ydb - from ydb.opentelemetry import enable_tracing + from ydb.opentelemetry import enable_metrics, enable_tracing enable_tracing() + enable_metrics() async def main(): async with ydb.aio.Driver( @@ -206,7 +345,7 @@ startup: database="/local", ) as driver: await driver.wait(timeout=5) - async with ydb.aio.QuerySessionPool(driver) as pool: + async with ydb.aio.QuerySessionPool(driver, name="async-main-pool") as pool: await pool.execute_with_retries("SELECT 1") asyncio.run(main()) @@ -229,12 +368,14 @@ To use a specific tracer instead of the global one: Running the Examples -------------------- -The runnable script is ``examples/opentelemetry/otel_example.py`` (bank table + concurrent -Serializable transactions and ``app_startup`` / ``example_tli`` application spans). **Start -Docker (YDB or the full stack) first**, then install and run on the host — see -``examples/opentelemetry/README.md`` for the full order of commands and environment variables. +The runnable script is ``examples/opentelemetry/otel_example.py``. It demonstrates both +tracing and metrics: bank table + concurrent Serializable transactions, +``app_startup`` / ``example_tli`` application spans, and SDK metrics exported through +OTLP. **Start Docker (YDB or the full stack) first**, then install and run on the host +— see ``examples/opentelemetry/README.md`` for the full order of commands and +environment variables. -**Full stack in one command** (YDB + OTLP + Tempo + Grafana; the ``otel-example`` service is built from ``examples/opentelemetry/Dockerfile`` and runs the script once): +**Full stack in one command** (YDB + OTLP + Tempo + Grafana + Prometheus; the ``otel-example`` service is built from ``examples/opentelemetry/Dockerfile`` and runs the script once): .. code-block:: sh @@ -250,4 +391,5 @@ The first run builds the ``otel-example`` image from the local SDK source; subse pip install -e '.[opentelemetry]' -r examples/opentelemetry/requirements.txt python examples/opentelemetry/otel_example.py -Open `http://localhost:3000 `_ (Grafana) to explore traces via Tempo. +Open `http://localhost:3000 `_ (Grafana) to explore traces via +Tempo and metrics through the configured Prometheus data source. diff --git a/examples/opentelemetry/Dockerfile b/examples/opentelemetry/Dockerfile index 326721a1c..041eb4abf 100644 --- a/examples/opentelemetry/Dockerfile +++ b/examples/opentelemetry/Dockerfile @@ -1,11 +1,13 @@ -# Isolated image for the OpenTelemetry demo. Build context is the repository root. +# Isolated image for the OpenTelemetry demo scripts. Build context is the repository root. # -# docker compose -f examples/opentelemetry/compose-e2e.yaml build otel-example +# docker compose -f examples/opentelemetry/compose-e2e.yaml build # # A separate ``.dockerignore`` at the repo root keeps the context small. FROM python:3.11-slim +ENV PYTHONUNBUFFERED=1 + WORKDIR /app # Dependency layer: copy only what setup.py needs so changes to the demo script do @@ -15,7 +17,6 @@ COPY ydb ./ydb COPY examples/opentelemetry/requirements.txt ./examples/opentelemetry/requirements.txt RUN pip install --no-cache-dir -e '.[opentelemetry]' -r examples/opentelemetry/requirements.txt -# Demo script. +# Demo scripts. COPY examples/opentelemetry/otel_example.py ./examples/opentelemetry/otel_example.py - -CMD ["python", "examples/opentelemetry/otel_example.py"] +COPY examples/opentelemetry/load_tank.py ./examples/opentelemetry/load_tank.py diff --git a/examples/opentelemetry/README.md b/examples/opentelemetry/README.md index 1af90f6d3..33fd68991 100644 --- a/examples/opentelemetry/README.md +++ b/examples/opentelemetry/README.md @@ -1,7 +1,15 @@ # OpenTelemetry example (YDB Python SDK) Async demo in [`otel_example.py`](otel_example.py): OTLP export, `enable_tracing()`, -`app_startup` and `example_tli` application spans, bank table, Serializable transactions (TLI-style load). +`enable_metrics()`, `app_startup` and `example_tli` application spans, SDK client +metrics, bank table, Serializable transactions (TLI-style load). + +[`load_tank.py`](load_tank.py) runs a small step-like load profile for the +metrics dashboard: + +```text +Peak RPS -> Medium RPS -> Min RPS -> Medium RPS -> repeat +``` Most steps assume the **repository root** as the current directory; the install step also shows the variant from this folder. @@ -17,7 +25,10 @@ docker compose up -d # wait until the ydb container is healthy / port 2136 is open, then continue ``` -**Full stack** (YDB + OTLP collector + Tempo + Grafana; the `otel-example` service is built from a `Dockerfile` and runs the script once inside Compose). The compose file is `compose-e2e.yaml` next to this README. +**Full stack** (YDB + OTLP collector + Tempo + Prometheus + Grafana; the +`otel-example` service runs the tracing/metrics demo once, and `load-generator` +runs the metrics load tank). The compose file is `compose-e2e.yaml` next to this +README. ```sh cd /path/to/ydb-python-sdk @@ -34,9 +45,29 @@ docker compose -f compose-e2e.yaml up --build The first run builds the `otel-example` image from the local SDK source (`Dockerfile` in this folder, `.dockerignore` at the repo root keeps the context small). Subsequent runs reuse the cached image; pass `--build` if you change the SDK or the demo script. Grafana: http://localhost:3000 +Prometheus: http://localhost:9090 + +Grafana is provisioned with the **YDB Python SDK Metrics** dashboard. It uses +Prometheus queries for SDK metrics such as `db_client_operation_duration`, +`ydb_client_operation_failed`, `ydb_query_session_count`, +`ydb_query_session_pending_requests`, `ydb_query_session_create_time`, and +`ydb_client_retry_duration`. Use Grafana Explore for ad-hoc traces through Tempo +and metrics through Prometheus. + +The SDK configures explicit OpenTelemetry histogram bucket boundaries for its +own duration and retry-attempt metrics. Duration values are recorded in seconds, +with sub-millisecond and millisecond-scale buckets so Grafana percentiles show +meaningful latency distributions for fast local YDB operations. + +Metrics are wired through a dedicated SDK metrics plugin. Until `enable_metrics()` +is called, the SDK uses a no-op metrics registry and does not import +OpenTelemetry metrics packages from the hot-path metric helpers. **Logs for `otel-example`:** the container name is prefixed (e.g. `opentelemetry-otel-example-1`); use `docker compose -f examples/opentelemetry/compose-e2e.yaml ps` or `docker ps -a` to find it. The service is one-shot (`restart: "no"`) — it may already have exited. +**Logs for `load-generator`:** the service is also one-shot. It runs for +`LOAD_TANK_TOTAL_TIME` seconds and then exits after flushing metrics. + ## 2. Install dependencies (on the host, for a local `python` run) **From the repository root** (editable SDK + pins from this example): @@ -63,12 +94,37 @@ pip install -e '../..[opentelemetry]' -r requirements.txt python examples/opentelemetry/otel_example.py ``` -Defaults: YDB `grpc://localhost:2136`, OTLP `http://localhost:4317` (for a local collector, if you use one). +Defaults: YDB `grpc://localhost:2136`, OTLP `http://localhost:4317` (for a local collector, if you use one). The same OTLP endpoint receives both traces and metrics. + +Run the load tank against an already running local stack: + +```sh +python examples/opentelemetry/load_tank.py +``` ## Environment (Docker / overrides) -| Variable | Meaning | -|----------|---------| -| `YDB_ENDPOINT` | e.g. `grpc://ydb:2136` inside the Compose network | -| `YDB_DATABASE` | default `/local` | -| `OTEL_EXPORTER_OTLP_ENDPOINT` | e.g. `http://otel-collector:4317` | +| Variable | Meaning | +|----------|----------------------------------------------------------| +| `YDB_ENDPOINT` | e.g. `grpc://ydb:2136` inside the Compose network | +| `YDB_DATABASE` | default `/local` | +| `OTEL_EXPORTER_OTLP_ENDPOINT` | e.g. `http://otel-collector:4317` | +| `OTEL_SERVICE_NAME` | service name attached to exported telemetry | +| `LOAD_TANK_TOTAL_TIME` | total load duration in seconds, default `6000` | +| `LOAD_TANK_WORKERS` | number of concurrent workers, default `40` | +| `LOAD_TANK_POOL_SIZE` | query session pool size, default `20` | +| `LOAD_TANK_PEAK_RPS` | peak phase target RPS, default `120` | +| `LOAD_TANK_MEDIUM_RPS` | medium phase target RPS, default `30` | +| `LOAD_TANK_MIN_RPS` | low phase target RPS, default `3` | +| `LOAD_TANK_ERROR_RPS` | failed query target RPS, default `1`; set `0` to disable | +| `LOAD_TANK_PRESSURE_POOL_SIZE` | pool size for session pressure metrics, default `1` | +| `LOAD_TANK_PRESSURE_WORKERS` | concurrent contenders for the pressure pool, default `8` | +| `LOAD_TANK_PRESSURE_HOLD_TIME` | seconds to hold the pressure-pool session, default `1.5` | +| `LOAD_TANK_PRESSURE_ACQUIRE_TIMEOUT` | short acquire timeout for timeout metrics, default `1.0` | +| `LOAD_TANK_PRESSURE_INTERVAL` | pause between pressure rounds, default `0.2` | +| `LOAD_TANK_SESSION_CHURN_INTERVAL` | interval for creating fresh sessions, default `2.0` | +| `LOAD_TANK_PEAK_DURATION` | peak phase duration in seconds, default `60` | +| `LOAD_TANK_MEDIUM_DURATION` | medium phase duration in seconds, default `90` | +| `LOAD_TANK_MIN_DURATION` | low phase duration in seconds, default `60` | +| `LOAD_TANK_QUERY` | query executed by workers, default `SELECT 1 AS value` | +| `LOAD_TANK_ERROR_QUERY` | query used to produce failed-operation metrics | diff --git a/examples/opentelemetry/compose-e2e.yaml b/examples/opentelemetry/compose-e2e.yaml index f8402d50f..e0bf9a5e4 100644 --- a/examples/opentelemetry/compose-e2e.yaml +++ b/examples/opentelemetry/compose-e2e.yaml @@ -1,5 +1,5 @@ # Full OpenTelemetry demo: YDB (server-side tracing config), collector, Tempo, Prometheus, Grafana, -# and a one-shot container that runs otel_example.py once. +# a one-shot container that runs otel_example.py once, and a load generator for live metrics. # # Run from this directory (paths below are relative to this file): # cd examples/opentelemetry && docker compose -f compose-e2e.yaml up @@ -78,6 +78,7 @@ services: build: context: ../.. dockerfile: examples/opentelemetry/Dockerfile + command: ["python", "examples/opentelemetry/otel_example.py"] environment: YDB_ENDPOINT: grpc://ydb:2136 YDB_DATABASE: /local @@ -89,3 +90,33 @@ services: otel-collector: condition: service_started restart: "no" + + load-generator: + build: + context: ../.. + dockerfile: examples/opentelemetry/Dockerfile + command: ["python", "examples/opentelemetry/load_tank.py"] + environment: + YDB_ENDPOINT: grpc://ydb:2136 + YDB_DATABASE: /local + OTEL_EXPORTER_OTLP_ENDPOINT: http://otel-collector:4317 + OTEL_SERVICE_NAME: ydb-python-load-tank + LOAD_TANK_TOTAL_TIME: "6000" + LOAD_TANK_WORKERS: "40" + LOAD_TANK_POOL_SIZE: "20" + LOAD_TANK_PEAK_RPS: "120" + LOAD_TANK_MEDIUM_RPS: "30" + LOAD_TANK_MIN_RPS: "3" + LOAD_TANK_ERROR_RPS: "1" + LOAD_TANK_PRESSURE_POOL_SIZE: "1" + LOAD_TANK_PRESSURE_WORKERS: "8" + LOAD_TANK_PRESSURE_HOLD_TIME: "1.5" + LOAD_TANK_PRESSURE_ACQUIRE_TIMEOUT: "1.0" + LOAD_TANK_PRESSURE_INTERVAL: "0.2" + LOAD_TANK_SESSION_CHURN_INTERVAL: "2.0" + depends_on: + ydb: + condition: service_healthy + otel-collector: + condition: service_started + restart: "no" diff --git a/examples/opentelemetry/grafana/dashboards/README.md b/examples/opentelemetry/grafana/dashboards/README.md index eb47493ad..365fd38bf 100644 --- a/examples/opentelemetry/grafana/dashboards/README.md +++ b/examples/opentelemetry/grafana/dashboards/README.md @@ -1,5 +1,7 @@ -This folder is intentionally left empty. +This folder contains Grafana dashboards provisioned by the local OpenTelemetry example. -Grafana is provisioned with Tempo + Prometheus datasources; use **Explore** to search traces. +`ydb-python-sdk-metrics.json` shows the YDB Python SDK metrics exported to Prometheus, +including client operation latency, failures, query session pool usage, pool min/max, +pending session requests, acquisition timeouts, and retry metrics. diff --git a/examples/opentelemetry/grafana/dashboards/ydb-python-sdk-metrics.json b/examples/opentelemetry/grafana/dashboards/ydb-python-sdk-metrics.json new file mode 100644 index 000000000..1a0706aa8 --- /dev/null +++ b/examples/opentelemetry/grafana/dashboards/ydb-python-sdk-metrics.json @@ -0,0 +1,191 @@ +{ + "title": "YDB Python SDK Metrics", + "uid": "ydb-python-sdk-metrics", + "schemaVersion": 38, + "timezone": "browser", + "refresh": "5s", + "tags": ["ydb", "python", "opentelemetry"], + "panels": [ + { + "title": "Request Rate (RPS)", + "type": "timeseries", + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 0 }, + "fieldConfig": { "defaults": { "unit": "reqps" } }, + "options": { "legend": { "displayMode": "table", "placement": "right" } }, + "targets": [ + { + "expr": "sum by (ydb_operation_name) (rate(db_client_operation_duration_seconds_count[$__rate_interval]))", + "legendFormat": "{{ydb_operation_name}}", + "datasource": { "type": "prometheus", "uid": "prometheus" } + } + ] + }, + { + "title": "Error Rate (RPS)", + "type": "timeseries", + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 0 }, + "fieldConfig": { "defaults": { "unit": "reqps" } }, + "options": { "legend": { "displayMode": "table", "placement": "right" } }, + "targets": [ + { + "expr": "sum by (db_response_status_code) (rate(ydb_client_operation_failed_total[$__rate_interval]))", + "legendFormat": "{{db_response_status_code}}", + "datasource": { "type": "prometheus", "uid": "prometheus" } + } + ] + }, + { + "title": "Query Session Pool (Used / Idle)", + "type": "timeseries", + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 8 }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "options": { "legend": { "displayMode": "table", "placement": "right" } }, + "targets": [ + { + "expr": "sum by (ydb_query_session_pool_name) (ydb_query_session_count{ydb_query_session_state=\"used\"})", + "legendFormat": "used - {{ydb_query_session_pool_name}}", + "datasource": { "type": "prometheus", "uid": "prometheus" } + }, + { + "expr": "sum by (ydb_query_session_pool_name) (ydb_query_session_count{ydb_query_session_state=\"idle\"})", + "legendFormat": "idle - {{ydb_query_session_pool_name}}", + "datasource": { "type": "prometheus", "uid": "prometheus" } + }, + { + "expr": "sum by (ydb_query_session_pool_name) (ydb_query_session_max)", + "legendFormat": "max - {{ydb_query_session_pool_name}}", + "datasource": { "type": "prometheus", "uid": "prometheus" } + }, + { + "expr": "sum by (ydb_query_session_pool_name) (ydb_query_session_min)", + "legendFormat": "min - {{ydb_query_session_pool_name}}", + "datasource": { "type": "prometheus", "uid": "prometheus" } + } + ] + }, + { + "title": "Pending Session Requests", + "type": "timeseries", + "gridPos": { "h": 8, "w": 6, "x": 12, "y": 8 }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "options": { "legend": { "displayMode": "table", "placement": "right" } }, + "targets": [ + { + "expr": "sum by (ydb_query_session_pool_name) (ydb_query_session_pending_requests)", + "legendFormat": "{{ydb_query_session_pool_name}}", + "datasource": { "type": "prometheus", "uid": "prometheus" } + } + ] + }, + { + "title": "Session Acquire Timeouts", + "type": "timeseries", + "gridPos": { "h": 8, "w": 6, "x": 18, "y": 8 }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "options": { "legend": { "displayMode": "table", "placement": "right" } }, + "targets": [ + { + "expr": "sum by (ydb_query_session_pool_name) (increase(ydb_query_session_timeouts_total[$__rate_interval]))", + "legendFormat": "{{ydb_query_session_pool_name}}", + "datasource": { "type": "prometheus", "uid": "prometheus" } + } + ] + }, + { + "title": "Operation Latency Percentiles", + "type": "timeseries", + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 16 }, + "fieldConfig": { "defaults": { "unit": "s" } }, + "options": { "legend": { "displayMode": "table", "placement": "right" } }, + "targets": [ + { + "expr": "histogram_quantile(0.50, sum by (le, ydb_operation_name) (rate(db_client_operation_duration_seconds_bucket[$__rate_interval])))", + "legendFormat": "p50 - {{ydb_operation_name}}", + "datasource": { "type": "prometheus", "uid": "prometheus" } + }, + { + "expr": "histogram_quantile(0.95, sum by (le, ydb_operation_name) (rate(db_client_operation_duration_seconds_bucket[$__rate_interval])))", + "legendFormat": "p95 - {{ydb_operation_name}}", + "datasource": { "type": "prometheus", "uid": "prometheus" } + }, + { + "expr": "histogram_quantile(0.99, sum by (le, ydb_operation_name) (rate(db_client_operation_duration_seconds_bucket[$__rate_interval])))", + "legendFormat": "p99 - {{ydb_operation_name}}", + "datasource": { "type": "prometheus", "uid": "prometheus" } + } + ] + }, + { + "title": "Session Create Time Percentiles", + "type": "timeseries", + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 16 }, + "fieldConfig": { "defaults": { "unit": "s" } }, + "options": { "legend": { "displayMode": "table", "placement": "right" } }, + "targets": [ + { + "expr": "histogram_quantile(0.50, sum by (le, ydb_query_session_pool_name) (rate(ydb_query_session_create_time_seconds_bucket[$__rate_interval])))", + "legendFormat": "p50 - {{ydb_query_session_pool_name}}", + "datasource": { "type": "prometheus", "uid": "prometheus" } + }, + { + "expr": "histogram_quantile(0.95, sum by (le, ydb_query_session_pool_name) (rate(ydb_query_session_create_time_seconds_bucket[$__rate_interval])))", + "legendFormat": "p95 - {{ydb_query_session_pool_name}}", + "datasource": { "type": "prometheus", "uid": "prometheus" } + }, + { + "expr": "histogram_quantile(0.99, sum by (le, ydb_query_session_pool_name) (rate(ydb_query_session_create_time_seconds_bucket[$__rate_interval])))", + "legendFormat": "p99 - {{ydb_query_session_pool_name}}", + "datasource": { "type": "prometheus", "uid": "prometheus" } + } + ] + }, + { + "title": "Retry Duration Percentiles", + "type": "timeseries", + "gridPos": { "h": 8, "w": 12, "x": 0, "y": 24 }, + "fieldConfig": { "defaults": { "unit": "s" } }, + "options": { "legend": { "displayMode": "table", "placement": "right" } }, + "targets": [ + { + "expr": "histogram_quantile(0.50, sum by (le) (rate(ydb_client_retry_duration_seconds_bucket[$__rate_interval])))", + "legendFormat": "p50", + "datasource": { "type": "prometheus", "uid": "prometheus" } + }, + { + "expr": "histogram_quantile(0.95, sum by (le) (rate(ydb_client_retry_duration_seconds_bucket[$__rate_interval])))", + "legendFormat": "p95", + "datasource": { "type": "prometheus", "uid": "prometheus" } + }, + { + "expr": "histogram_quantile(0.99, sum by (le) (rate(ydb_client_retry_duration_seconds_bucket[$__rate_interval])))", + "legendFormat": "p99", + "datasource": { "type": "prometheus", "uid": "prometheus" } + } + ] + }, + { + "title": "Retry Attempts", + "type": "timeseries", + "gridPos": { "h": 8, "w": 12, "x": 12, "y": 24 }, + "fieldConfig": { "defaults": { "unit": "short" } }, + "options": { "legend": { "displayMode": "table", "placement": "right" } }, + "targets": [ + { + "expr": "sum(rate(ydb_client_retry_attempts_sum[$__rate_interval])) / sum(rate(ydb_client_retry_attempts_count[$__rate_interval]))", + "legendFormat": "avg attempts", + "datasource": { "type": "prometheus", "uid": "prometheus" } + }, + { + "expr": "sum(rate(ydb_client_retry_attempts_count[$__rate_interval])) - sum(rate(ydb_client_retry_attempts_bucket{le=\"1\"}[$__rate_interval]))", + "legendFormat": "retried operations rps", + "datasource": { "type": "prometheus", "uid": "prometheus" } + }, + { + "expr": "(sum(rate(ydb_client_retry_attempts_count[$__rate_interval])) - sum(rate(ydb_client_retry_attempts_bucket{le=\"1\"}[$__rate_interval]))) / sum(rate(ydb_client_retry_attempts_count[$__rate_interval]))", + "legendFormat": "retried operations ratio", + "datasource": { "type": "prometheus", "uid": "prometheus" } + } + ] + } + ] +} diff --git a/examples/opentelemetry/grafana/provisioning/datasources/datasources.yaml b/examples/opentelemetry/grafana/provisioning/datasources/datasources.yaml index 05ba5bd95..5898f3ce5 100644 --- a/examples/opentelemetry/grafana/provisioning/datasources/datasources.yaml +++ b/examples/opentelemetry/grafana/provisioning/datasources/datasources.yaml @@ -3,6 +3,7 @@ apiVersion: 1 datasources: - name: Prometheus type: prometheus + uid: prometheus access: proxy url: http://prometheus:9090 isDefault: true @@ -10,13 +11,14 @@ datasources: - name: Tempo type: tempo + uid: tempo access: proxy url: http://tempo:3200 editable: false jsonData: tracesToMetrics: - datasourceUid: Prometheus + datasourceUid: prometheus serviceMap: - datasourceUid: Prometheus + datasourceUid: prometheus diff --git a/examples/opentelemetry/load_tank.py b/examples/opentelemetry/load_tank.py new file mode 100644 index 000000000..801ddadf5 --- /dev/null +++ b/examples/opentelemetry/load_tank.py @@ -0,0 +1,287 @@ +"""Small OpenTelemetry load generator for the YDB Python SDK example.""" + +from __future__ import annotations + +import asyncio +import os +import random +import time +from dataclasses import dataclass +from typing import AsyncIterator, Tuple + +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics._internal.export import PeriodicExportingMetricReader +from opentelemetry.sdk.resources import Resource + +import ydb +from ydb.opentelemetry import enable_metrics + + +@dataclass(frozen=True) +class LoadConfig: + endpoint: str + database: str + otlp_endpoint: str + service_name: str + pool_size: int + worker_count: int + peak_rps: int + medium_rps: int + min_rps: int + peak_duration: int + medium_duration: int + min_duration: int + total_time: int + query: str + error_rps: int + error_query: str + pressure_pool_size: int + pressure_workers: int + pressure_hold_time: float + pressure_acquire_timeout: float + pressure_interval: float + session_churn_interval: float + + +def _env(name: str, default: str) -> str: + value = os.environ.get(name) + return value if value is not None and value != "" else default + + +def _env_int(name: str, default: int) -> int: + return int(_env(name, str(default))) + + +def _env_float(name: str, default: float) -> float: + return float(_env(name, str(default))) + + +def _load_config() -> LoadConfig: + return LoadConfig( + endpoint=_env("YDB_ENDPOINT", "grpc://localhost:2136"), + database=_env("YDB_DATABASE", "/local"), + otlp_endpoint=_env("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317"), + service_name=_env("OTEL_SERVICE_NAME", "ydb-python-load-tank"), + pool_size=_env_int("LOAD_TANK_POOL_SIZE", 20), + worker_count=_env_int("LOAD_TANK_WORKERS", 40), + peak_rps=_env_int("LOAD_TANK_PEAK_RPS", 120), + medium_rps=_env_int("LOAD_TANK_MEDIUM_RPS", 30), + min_rps=_env_int("LOAD_TANK_MIN_RPS", 3), + peak_duration=_env_int("LOAD_TANK_PEAK_DURATION", 60), + medium_duration=_env_int("LOAD_TANK_MEDIUM_DURATION", 90), + min_duration=_env_int("LOAD_TANK_MIN_DURATION", 60), + total_time=_env_int("LOAD_TANK_TOTAL_TIME", 300), + query=_env("LOAD_TANK_QUERY", "SELECT 1 AS value"), + error_rps=_env_int("LOAD_TANK_ERROR_RPS", 1), + error_query=_env("LOAD_TANK_ERROR_QUERY", "SELECT * FROM table_that_does_not_exist_for_metrics"), + pressure_pool_size=_env_int("LOAD_TANK_PRESSURE_POOL_SIZE", 1), + pressure_workers=_env_int("LOAD_TANK_PRESSURE_WORKERS", 8), + pressure_hold_time=_env_float("LOAD_TANK_PRESSURE_HOLD_TIME", 1.5), + pressure_acquire_timeout=_env_float("LOAD_TANK_PRESSURE_ACQUIRE_TIMEOUT", 1.0), + pressure_interval=_env_float("LOAD_TANK_PRESSURE_INTERVAL", 0.2), + session_churn_interval=_env_float("LOAD_TANK_SESSION_CHURN_INTERVAL", 2.0), + ) + + +async def _load_steps(config: LoadConfig) -> AsyncIterator[Tuple[int, str, int]]: + pattern = ( + (config.peak_rps, "Peak", config.peak_duration), + (config.medium_rps, "Medium down", config.medium_duration), + (config.min_rps, "Min", config.min_duration), + (config.medium_rps, "Medium up", config.medium_duration), + ) + deadline = time.monotonic() + config.total_time + + while time.monotonic() < deadline: + for rps, label, duration in pattern: + remaining = int(deadline - time.monotonic()) + if remaining <= 0: + return + yield rps, label, min(duration, remaining) + + +async def _worker( + pool: ydb.aio.QuerySessionPool, + queue: asyncio.Queue[object], + query: str, + stop: asyncio.Event, +) -> None: + while not stop.is_set(): + try: + await asyncio.wait_for(queue.get(), timeout=0.5) + except asyncio.TimeoutError: + continue + + try: + await pool.execute_with_retries(query) + except Exception as exc: + print("Load operation failed: %s" % exc) + finally: + queue.task_done() + + +async def _feed_phase(queue: asyncio.Queue[object], rps: int, duration: int) -> None: + interval = 1.0 / max(rps, 1) + deadline = time.monotonic() + duration + next_tick = time.monotonic() + + while time.monotonic() < deadline: + await queue.put(object()) + next_tick += interval + delay = next_tick - time.monotonic() + if delay > 0: + await asyncio.sleep(delay) + else: + await asyncio.sleep(0) + + +async def _error_worker(pool: ydb.aio.QuerySessionPool, config: LoadConfig, stop: asyncio.Event) -> None: + if config.error_rps <= 0: + return + + interval = 1.0 / config.error_rps + next_tick = time.monotonic() + + while not stop.is_set(): + try: + await pool.execute_with_retries(config.error_query) + except Exception: + pass + + next_tick += interval + delay = next_tick - time.monotonic() + if delay > 0: + try: + await asyncio.wait_for(stop.wait(), timeout=delay) + except asyncio.TimeoutError: + pass + else: + await asyncio.sleep(0) + + +async def _pressure_round(pool: ydb.aio.QuerySessionPool, config: LoadConfig) -> None: + async def holder() -> None: + async with pool.checkout(timeout=5): + await asyncio.sleep(config.pressure_hold_time) + + async def contender() -> None: + try: + async with pool.checkout(timeout=config.pressure_acquire_timeout): + pass + except Exception: + pass + + holder_task = asyncio.create_task(holder()) + await asyncio.sleep(0) + contenders = [asyncio.create_task(contender()) for _ in range(config.pressure_workers)] + await asyncio.gather(holder_task, *contenders, return_exceptions=True) + + +async def _pool_pressure_worker(driver: ydb.aio.Driver, config: LoadConfig, stop: asyncio.Event) -> None: + if config.pressure_workers <= 0 or config.pressure_pool_size <= 0: + return + + async with ydb.aio.QuerySessionPool( + driver, + size=config.pressure_pool_size, + name="pool-pressure", + ) as pool: + while not stop.is_set(): + await _pressure_round(pool, config) + try: + await asyncio.wait_for(stop.wait(), timeout=config.pressure_interval) + except asyncio.TimeoutError: + pass + + +async def _session_churn_worker(driver: ydb.aio.Driver, config: LoadConfig, stop: asyncio.Event) -> None: + if config.session_churn_interval <= 0: + return + + while not stop.is_set(): + async with ydb.aio.QuerySessionPool(driver, size=1, name="session-churn") as pool: + await pool.execute_with_retries("SELECT 1 AS value") + + try: + await asyncio.wait_for(stop.wait(), timeout=config.session_churn_interval) + except asyncio.TimeoutError: + pass + + +async def main() -> None: + config = _load_config() + + resource = Resource(attributes={"service.name": config.service_name}) + metric_reader = PeriodicExportingMetricReader( + OTLPMetricExporter(endpoint=config.otlp_endpoint), + export_interval_millis=2000, + ) + meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) + enable_metrics(meter_provider) + + print( + "=== YDB Python SDK load tank ===\n" + " total=%ss workers=%s pool_size=%s query=%r error_rps=%s\n" + " pressure_pool_size=%s pressure_workers=%s pressure_timeout=%ss session_churn_interval=%ss\n" + " pattern: Peak(%s rps, %ss) -> Medium(%s rps, %ss) -> " + "Min(%s rps, %ss) -> Medium -> repeat" + % ( + config.total_time, + config.worker_count, + config.pool_size, + config.query, + config.error_rps, + config.pressure_pool_size, + config.pressure_workers, + config.pressure_acquire_timeout, + config.session_churn_interval, + config.peak_rps, + config.peak_duration, + config.medium_rps, + config.medium_duration, + config.min_rps, + config.min_duration, + ) + ) + + async with ydb.aio.Driver( + endpoint=config.endpoint, + database=config.database, + disable_discovery=True, + ) as driver: + await driver.wait(timeout=60) + + async with ydb.aio.QuerySessionPool(driver, size=config.pool_size, name="load-tank") as pool: + queue: asyncio.Queue[object] = asyncio.Queue(maxsize=max(config.worker_count * 4, config.peak_rps)) + stop = asyncio.Event() + workers = [ + asyncio.create_task(_worker(pool, queue, config.query, stop)) for _ in range(config.worker_count) + ] + error_task = asyncio.create_task(_error_worker(pool, config, stop)) + pressure_task = asyncio.create_task(_pool_pressure_worker(driver, config, stop)) + churn_task = asyncio.create_task(_session_churn_worker(driver, config, stop)) + + try: + async for rps, label, duration in _load_steps(config): + print("[%s] Phase: %s (%s RPS for %ss)" % (time.strftime("%H:%M:%S"), label, rps, duration)) + await _feed_phase(queue, rps, duration) + await asyncio.sleep(random.random() / 10.0) + + await queue.join() + finally: + stop.set() + for worker in workers: + worker.cancel() + error_task.cancel() + pressure_task.cancel() + churn_task.cancel() + await asyncio.gather(*workers, error_task, pressure_task, churn_task, return_exceptions=True) + + print("Waiting 10s to flush metrics...") + await asyncio.sleep(10) + meter_provider.shutdown() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/opentelemetry/otel_example.py b/examples/opentelemetry/otel_example.py index 6ec0c5a84..3bdd6a6b8 100644 --- a/examples/opentelemetry/otel_example.py +++ b/examples/opentelemetry/otel_example.py @@ -8,8 +8,13 @@ import asyncio import os + +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics._internal.export import PeriodicExportingMetricReader + import ydb -from ydb.opentelemetry import enable_tracing +from ydb.opentelemetry import enable_metrics, enable_tracing from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource @@ -45,13 +50,20 @@ async def main() -> None: otlp_endpoint = _env("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") resource = Resource(attributes={"service.name": _env("OTEL_SERVICE_NAME", "ydb-otel-example")}) - provider = TracerProvider(resource=resource) - provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))) - trace.set_tracer_provider(provider) + tracer_provider = TracerProvider(resource=resource) + tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))) + trace.set_tracer_provider(tracer_provider) tracer = trace.get_tracer(__name__) enable_tracing(tracer) + metric_reader = PeriodicExportingMetricReader( + OTLPMetricExporter(endpoint=otlp_endpoint), + export_interval_millis=1000, + ) + meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) + enable_metrics(meter_provider) + async with ydb.aio.Driver( endpoint=endpoint, database=database, @@ -84,8 +96,10 @@ async def concurrent_task(task_num: int) -> None: final_rows = await pool.execute_with_retries("SELECT amount FROM bank WHERE id = 1") amount = int(list(final_rows[0].rows)[0]["amount"]) print(f"Final amount (after serializable retries): {amount}") - - provider.shutdown() + print("Application will shut down in 15 seconds...") + await asyncio.sleep(15) + tracer_provider.shutdown() + meter_provider.shutdown() if __name__ == "__main__": diff --git a/tests/tracing/__init__.py b/tests/opentelemetry/__init__.py similarity index 100% rename from tests/tracing/__init__.py rename to tests/opentelemetry/__init__.py diff --git a/tests/tracing/conftest.py b/tests/opentelemetry/conftest.py similarity index 54% rename from tests/tracing/conftest.py rename to tests/opentelemetry/conftest.py index 26c39cef1..e28fe305a 100644 --- a/tests/tracing/conftest.py +++ b/tests/opentelemetry/conftest.py @@ -7,6 +7,10 @@ import pytest from opentelemetry import trace +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics import Counter, Histogram, ObservableUpDownCounter, UpDownCounter +from opentelemetry.sdk.metrics.export import AggregationTemporality +from opentelemetry.sdk.metrics.export import InMemoryMetricReader from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter @@ -18,7 +22,7 @@ @pytest.fixture() -def otel_setup(): +def tracing_setup(): """Enable SDK tracing, yield the exporter, then restore noop defaults. Each test gets a clean exporter (cleared before and after). @@ -37,6 +41,30 @@ def otel_setup(): _exporter.clear() +@pytest.fixture() +def metrics_setup(): + """Enable SDK metrics with an in-memory reader, then restore noop defaults.""" + from ydb.opentelemetry import disable_metrics, enable_metrics + + reader = InMemoryMetricReader( + preferred_temporality={ + Counter: AggregationTemporality.CUMULATIVE, + Histogram: AggregationTemporality.CUMULATIVE, + ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, + UpDownCounter: AggregationTemporality.CUMULATIVE, + } + ) + provider = MeterProvider(metric_readers=[reader]) + + disable_metrics() + enable_metrics(provider) + try: + yield reader + finally: + disable_metrics() + provider.shutdown() + + class FakeDriverConfig: def __init__(self, endpoint="test_endpoint:1337", database="/test_database"): self.endpoint = endpoint diff --git a/tests/opentelemetry/test_metrics.py b/tests/opentelemetry/test_metrics.py new file mode 100644 index 000000000..874f9ca60 --- /dev/null +++ b/tests/opentelemetry/test_metrics.py @@ -0,0 +1,545 @@ +import inspect +from unittest.mock import MagicMock + +import pytest +from opentelemetry.metrics import Meter + + +def _metrics_by_name(reader): + data = reader.get_metrics_data() + if data is None: + return {} + + return { + metric.name: metric + for resource_metrics in data.resource_metrics + for scope_metrics in resource_metrics.scope_metrics + for metric in scope_metrics.metrics + } + + +def _single_point(reader, name): + return _single_point_from_metrics(_metrics_by_name(reader), name) + + +def _single_point_from_metrics(metrics, name): + metric = metrics[name] + points = list(metric.data.data_points) + assert len(points) == 1 + return points[0] + + +def _points(reader, name): + metrics = _metrics_by_name(reader) + if name not in metrics: + return [] + return list(metrics[name].data.data_points) + + +def _histogram_sum(reader, name): + return _single_point(reader, name).sum + + +def _sum_value(reader, name): + return _single_point(reader, name).value + + +def _histogram_boundaries_advisory_supported(): + return "explicit_bucket_boundaries_advisory" in inspect.signature(Meter.create_histogram).parameters + + +def test_metrics_registry_records_all_instruments(metrics_setup, monkeypatch): + from ydb import issues + from ydb.opentelemetry.metrics import ( + CLIENT_OPERATION_DURATION, + CLIENT_OPERATION_FAILED, + QUERY_SESSION_COUNT, + QUERY_SESSION_CREATE_TIME, + QUERY_SESSION_MAX, + QUERY_SESSION_MIN, + QUERY_SESSION_PENDING_REQUESTS, + QUERY_SESSION_TIMEOUTS, + RETRY_ATTEMPTS, + RETRY_DURATION, + ATTEMPT_BUCKETS, + DURATION_BUCKETS_SECONDS, + RETRY_DURATION_BUCKETS_SECONDS, + create_metrics_operation, + record_query_session_count, + record_query_session_create_time, + record_query_session_max, + record_query_session_pending_requests, + record_query_session_timeout, + record_retry_metrics, + ) + + monkeypatch.setattr("ydb.opentelemetry.metrics.time.monotonic", MagicMock(side_effect=[1.0, 1.25])) + + with pytest.raises(issues.Unavailable): + with create_metrics_operation("ExecuteQuery"): + raise issues.Unavailable("transient") + + record_query_session_count(2, "main", "used") + record_query_session_create_time(0.5, "main") + record_query_session_max(100, "main") + record_query_session_pending_requests(1, "main") + record_query_session_timeout("main") + record_retry_metrics(0.75, 3) + + metrics = _metrics_by_name(metrics_setup) + + assert set(metrics) == { + CLIENT_OPERATION_DURATION, + CLIENT_OPERATION_FAILED, + QUERY_SESSION_COUNT, + QUERY_SESSION_CREATE_TIME, + QUERY_SESSION_MAX, + QUERY_SESSION_MIN, + QUERY_SESSION_PENDING_REQUESTS, + QUERY_SESSION_TIMEOUTS, + RETRY_ATTEMPTS, + RETRY_DURATION, + } + assert metrics[CLIENT_OPERATION_DURATION].unit == "s" + assert metrics[CLIENT_OPERATION_FAILED].unit == "{command}" + assert metrics[QUERY_SESSION_COUNT].unit == "{connection}" + assert metrics[QUERY_SESSION_CREATE_TIME].unit == "s" + assert metrics[QUERY_SESSION_MAX].unit == "{connection}" + assert metrics[QUERY_SESSION_MIN].unit == "{connection}" + assert metrics[QUERY_SESSION_PENDING_REQUESTS].unit == "{request}" + assert metrics[QUERY_SESSION_TIMEOUTS].unit == "{connection}" + assert metrics[RETRY_DURATION].unit == "s" + assert metrics[RETRY_ATTEMPTS].unit == "{attempt}" + if _histogram_boundaries_advisory_supported(): + assert ( + _single_point_from_metrics(metrics, CLIENT_OPERATION_DURATION).explicit_bounds == DURATION_BUCKETS_SECONDS + ) + assert ( + _single_point_from_metrics(metrics, QUERY_SESSION_CREATE_TIME).explicit_bounds == DURATION_BUCKETS_SECONDS + ) + assert _single_point_from_metrics(metrics, RETRY_DURATION).explicit_bounds == RETRY_DURATION_BUCKETS_SECONDS + assert _single_point_from_metrics(metrics, RETRY_ATTEMPTS).explicit_bounds == ATTEMPT_BUCKETS + + +def test_metrics_registry_supports_old_histogram_api(): + from ydb.opentelemetry.metrics_plugin import MetricsRegistry + + class FakeInstrument: + def add(self, value, attributes=None): + pass + + def record(self, value, attributes=None): + pass + + class FakeMeter: + def create_histogram(self, name, unit="", description="", **kwargs): + if "explicit_bucket_boundaries_advisory" in kwargs: + raise TypeError( + "create_histogram() got an unexpected keyword argument 'explicit_bucket_boundaries_advisory'" + ) + return FakeInstrument() + + def create_counter(self, name, unit="", description=""): + return FakeInstrument() + + def create_up_down_counter(self, name, unit="", description=""): + return FakeInstrument() + + def create_observable_up_down_counter(self, name, callbacks=None, unit="", description=""): + return FakeInstrument() + + MetricsRegistry(FakeMeter()) + + +def test_metrics_registry_is_noop_without_meter(): + from ydb.opentelemetry.metrics import ( + create_metrics_operation, + record_query_session_create_time, + record_query_session_pending_requests, + record_query_session_timeout, + record_retry_metrics, + ) + + record_query_session_create_time(1.0, "pool") + record_query_session_pending_requests(1, "pool") + record_query_session_timeout("pool") + record_retry_metrics(1.0, 2) + + with create_metrics_operation("test.operation"): + pass + + +def test_metrics_operation_records_duration_once(metrics_setup, monkeypatch): + from ydb.opentelemetry.metrics import CLIENT_OPERATION_DURATION, create_metrics_operation + + monotonic = MagicMock(side_effect=[10.0, 10.25, 11.0]) + monkeypatch.setattr("ydb.opentelemetry.metrics.time.monotonic", monotonic) + + operation = create_metrics_operation( + "ExecuteQuery", + { + "database": "/Root/test", + "endpoint": "localhost:2136", + }, + ) + operation.end() + operation.end() + + point = _single_point(metrics_setup, CLIENT_OPERATION_DURATION) + + assert point.sum == 0.25 + assert point.count == 1 + assert point.attributes == { + "database": "/Root/test", + "endpoint": "localhost:2136", + "operation.name": "ExecuteQuery", + } + + +def test_metrics_operation_records_ydb_error(metrics_setup, monkeypatch): + from ydb import issues + from ydb.opentelemetry.metrics import CLIENT_OPERATION_FAILED, create_metrics_operation + + monkeypatch.setattr("ydb.opentelemetry.metrics.time.monotonic", MagicMock(side_effect=[1.0, 1.1])) + + with pytest.raises(issues.Unavailable): + with create_metrics_operation("ExecuteQuery"): + raise issues.Unavailable("transient") + + point = _single_point(metrics_setup, CLIENT_OPERATION_FAILED) + + assert point.value == 1 + assert point.attributes["status_code"] == "UNAVAILABLE" + assert point.attributes["operation.name"] == "ExecuteQuery" + + +def test_metrics_operation_records_generic_error_status_code(metrics_setup): + from ydb.opentelemetry.metrics import CLIENT_OPERATION_FAILED, create_metrics_operation + + with pytest.raises(ValueError): + with create_metrics_operation("ExecuteQuery"): + raise ValueError("bad value") + + assert _single_point(metrics_setup, CLIENT_OPERATION_FAILED).attributes["status_code"] == "ValueError" + + +def test_metrics_operation_set_attribute(metrics_setup): + from ydb.opentelemetry.metrics import CLIENT_OPERATION_DURATION, create_metrics_operation + + operation = create_metrics_operation("ExecuteQuery") + operation.set_attribute("database", "/Root/test") + operation.end() + + assert _single_point(metrics_setup, CLIENT_OPERATION_DURATION).attributes["database"] == "/Root/test" + + +def test_metrics_operation_ignores_non_metric_attributes(metrics_setup): + from ydb.opentelemetry.metrics import CLIENT_OPERATION_DURATION, create_metrics_operation + + operation = create_metrics_operation("ExecuteQuery") + operation.set_attribute("db.namespace", "/Root/test") + operation.set_attribute("server.address", "localhost") + operation.set_attribute("server.port", 2136) + operation.set_attribute("ydb.operation.name", "ydb.Commit") + operation.set_attribute("network.peer.address", "node.example.net") + operation.set_attribute("network.peer.port", 2136) + operation.set_attribute("ydb.node.dc", "dc-a") + operation.set_attribute("ydb.node.id", 123) + operation.end() + + attrs = _single_point(metrics_setup, CLIENT_OPERATION_DURATION).attributes + + assert "db.namespace" not in attrs + assert "server.address" not in attrs + assert "server.port" not in attrs + assert "ydb.operation.name" not in attrs + assert "network.peer.address" not in attrs + assert "network.peer.port" not in attrs + assert "ydb.node.dc" not in attrs + assert "ydb.node.id" not in attrs + + +def test_metrics_operation_respects_end_on_exit_false(metrics_setup): + from ydb.opentelemetry.metrics import CLIENT_OPERATION_DURATION, create_metrics_operation + + operation = create_metrics_operation("ExecuteQuery") + with operation.attach_context(end_on_exit=False): + pass + + assert CLIENT_OPERATION_DURATION not in _metrics_by_name(metrics_setup) + + operation.end() + + point = _single_point(metrics_setup, CLIENT_OPERATION_DURATION) + assert point.count == 1 + assert point.sum >= 0 + + +def test_metrics_operation_ignores_unknown_operation_name(metrics_setup): + from ydb.opentelemetry.metrics import CLIENT_OPERATION_DURATION, create_metrics_operation + + with create_metrics_operation("ydb.Driver.Initialize"): + pass + + assert CLIENT_OPERATION_DURATION not in _metrics_by_name(metrics_setup) + + +def test_create_ydb_span_records_metrics_when_tracing_is_active(metrics_setup, tracing_setup): + from tests.opentelemetry.conftest import FakeDriverConfig + from ydb.opentelemetry.metrics import CLIENT_OPERATION_DURATION + from ydb.opentelemetry.tracing import create_ydb_span + + exporter = tracing_setup + + with create_ydb_span( + "ydb.ExecuteQuery", + FakeDriverConfig(), + node_id=123, + peer=("node.example.net", 2136, "dc-a"), + ).attach_context(): + pass + + spans = exporter.get_finished_spans() + assert len(spans) == 1 + span_attrs = dict(spans[0].attributes) + assert span_attrs["network.peer.address"] == "node.example.net" + assert span_attrs["network.peer.port"] == 2136 + assert span_attrs["ydb.node.dc"] == "dc-a" + assert span_attrs["ydb.node.id"] == 123 + + metric_attrs = _single_point(metrics_setup, CLIENT_OPERATION_DURATION).attributes + assert metric_attrs["database"] == "/test_database" + assert metric_attrs["endpoint"] == "test_endpoint:1337" + assert metric_attrs["operation.name"] == "ExecuteQuery" + assert "network.peer.address" not in metric_attrs + assert "network.peer.port" not in metric_attrs + assert "ydb.node.dc" not in metric_attrs + assert "ydb.node.id" not in metric_attrs + + +def test_create_ydb_span_records_metrics_when_tracing_is_disabled(metrics_setup): + from tests.opentelemetry.conftest import FakeDriverConfig + from ydb.opentelemetry.metrics import CLIENT_OPERATION_DURATION + from ydb.opentelemetry.tracing import _registry, create_ydb_span + + _registry.set_create_span(None) + + with create_ydb_span("ydb.ExecuteQuery", FakeDriverConfig()).attach_context(): + pass + + metric_attrs = _single_point(metrics_setup, CLIENT_OPERATION_DURATION).attributes + assert metric_attrs["database"] == "/test_database" + assert metric_attrs["endpoint"] == "test_endpoint:1337" + assert metric_attrs["operation.name"] == "ExecuteQuery" + + +def test_query_session_count_accumulates_by_attributes(metrics_setup): + from ydb.opentelemetry.metrics import QUERY_SESSION_COUNT, record_query_session_count + + record_query_session_count(1, "main", "used") + record_query_session_count(2, "main", "used") + record_query_session_count(1, None, "idle") + + metric = _metrics_by_name(metrics_setup)[QUERY_SESSION_COUNT] + values = {tuple(sorted(point.attributes.items())): point.value for point in metric.data.data_points} + + assert ( + values[ + ( + ("ydb.query.session.pool.name", "main"), + ("ydb.query.session.state", "used"), + ) + ] + == 3 + ) + assert ( + values[ + ( + ("ydb.query.session.pool.name", "unknown"), + ("ydb.query.session.state", "idle"), + ) + ] + == 1 + ) + + +def test_query_session_helpers_record_pool_attributes(metrics_setup): + from ydb.opentelemetry.metrics import ( + QUERY_SESSION_CREATE_TIME, + QUERY_SESSION_MAX, + QUERY_SESSION_MIN, + QUERY_SESSION_PENDING_REQUESTS, + QUERY_SESSION_TIMEOUTS, + record_query_session_create_time, + record_query_session_max, + record_query_session_pending_requests, + record_query_session_timeout, + ) + + record_query_session_create_time(0.5, "main") + record_query_session_max(100, "main") + record_query_session_pending_requests(1, None) + record_query_session_timeout("main") + + metrics = _metrics_by_name(metrics_setup) + create_time = _single_point_from_metrics(metrics, QUERY_SESSION_CREATE_TIME) + pending_requests = _single_point_from_metrics(metrics, QUERY_SESSION_PENDING_REQUESTS) + timeouts = _single_point_from_metrics(metrics, QUERY_SESSION_TIMEOUTS) + session_max = _single_point_from_metrics(metrics, QUERY_SESSION_MAX) + session_min = _single_point_from_metrics(metrics, QUERY_SESSION_MIN) + + assert create_time.sum == 0.5 + assert create_time.attributes == {"ydb.query.session.pool.name": "main"} + assert pending_requests.value == 1 + assert pending_requests.attributes == {"ydb.query.session.pool.name": "unknown"} + assert timeouts.value == 1 + assert timeouts.attributes == {"ydb.query.session.pool.name": "main"} + assert session_max.value == 100 + assert session_max.attributes == {"ydb.query.session.pool.name": "main"} + assert session_min.value == 0 + assert session_min.attributes == {"ydb.query.session.pool.name": "main"} + + +def test_sync_query_session_pool_records_max(metrics_setup): + from ydb.opentelemetry.metrics import QUERY_SESSION_MAX, QUERY_SESSION_MIN + from ydb.query.pool import QuerySessionPool + + QuerySessionPool(driver=object(), size=42, name="sync-pool") + + assert _single_point(metrics_setup, QUERY_SESSION_MAX).value == 42 + assert _single_point(metrics_setup, QUERY_SESSION_MAX).attributes == {"ydb.query.session.pool.name": "sync-pool"} + assert _single_point(metrics_setup, QUERY_SESSION_MIN).value == 0 + assert _single_point(metrics_setup, QUERY_SESSION_MIN).attributes == {"ydb.query.session.pool.name": "sync-pool"} + + +def test_sync_query_session_pool_stop_removes_observable_metrics(metrics_setup): + from ydb.opentelemetry.metrics import QUERY_SESSION_COUNT, QUERY_SESSION_MAX, QUERY_SESSION_MIN + from ydb.query.pool import QuerySessionPool + + pool = QuerySessionPool(driver=object(), size=42, name="sync-pool") + pool.stop() + + assert _points(metrics_setup, QUERY_SESSION_COUNT) == [] + assert _points(metrics_setup, QUERY_SESSION_MAX) == [] + assert _points(metrics_setup, QUERY_SESSION_MIN) == [] + + +def test_sync_query_session_pool_uses_endpoint_as_default_pool_name(metrics_setup): + from tests.opentelemetry.conftest import FakeDriverConfig + from ydb.opentelemetry.metrics import QUERY_SESSION_MAX + from ydb.query.pool import QuerySessionPool + + class FakeDriver: + _driver_config = FakeDriverConfig(endpoint="grpc://localhost:2136") + + QuerySessionPool(driver=FakeDriver(), size=42) + + assert _single_point(metrics_setup, QUERY_SESSION_MAX).value == 42 + assert _single_point(metrics_setup, QUERY_SESSION_MAX).attributes == { + "ydb.query.session.pool.name": "grpc://localhost:2136" + } + + +@pytest.mark.asyncio +async def test_async_query_session_pool_records_max(metrics_setup): + from ydb.aio.query.pool import QuerySessionPool + from ydb.opentelemetry.metrics import QUERY_SESSION_MAX, QUERY_SESSION_MIN + + QuerySessionPool(driver=object(), size=24, name="async-pool") + + assert _single_point(metrics_setup, QUERY_SESSION_MAX).value == 24 + assert _single_point(metrics_setup, QUERY_SESSION_MAX).attributes == {"ydb.query.session.pool.name": "async-pool"} + assert _single_point(metrics_setup, QUERY_SESSION_MIN).value == 0 + assert _single_point(metrics_setup, QUERY_SESSION_MIN).attributes == {"ydb.query.session.pool.name": "async-pool"} + + +@pytest.mark.asyncio +async def test_async_query_session_pool_stop_removes_observable_metrics(metrics_setup): + from ydb.aio.query.pool import QuerySessionPool + from ydb.opentelemetry.metrics import QUERY_SESSION_COUNT, QUERY_SESSION_MAX, QUERY_SESSION_MIN + + pool = QuerySessionPool(driver=object(), size=24, name="async-pool") + await pool.stop() + + assert _points(metrics_setup, QUERY_SESSION_COUNT) == [] + assert _points(metrics_setup, QUERY_SESSION_MAX) == [] + assert _points(metrics_setup, QUERY_SESSION_MIN) == [] + + +@pytest.mark.asyncio +async def test_async_query_session_pool_uses_endpoint_as_default_pool_name(metrics_setup): + from tests.opentelemetry.conftest import FakeDriverConfig + from ydb.aio.query.pool import QuerySessionPool + from ydb.opentelemetry.metrics import QUERY_SESSION_MAX + + class FakeDriver: + _driver_config = FakeDriverConfig(endpoint="grpc://localhost:2136") + + QuerySessionPool(driver=FakeDriver(), size=24) + + assert _single_point(metrics_setup, QUERY_SESSION_MAX).value == 24 + assert _single_point(metrics_setup, QUERY_SESSION_MAX).attributes == { + "ydb.query.session.pool.name": "grpc://localhost:2136" + } + + +@pytest.mark.asyncio +async def test_sync_and_async_query_session_pool_auto_names_do_not_collide(metrics_setup): + from ydb.aio.query.pool import QuerySessionPool as AsyncQuerySessionPool + from ydb.opentelemetry.metrics import QUERY_SESSION_MAX + from ydb.query.pool import QuerySessionPool + + QuerySessionPool(driver=object(), size=42) + AsyncQuerySessionPool(driver=object(), size=24) + + metric = _metrics_by_name(metrics_setup)[QUERY_SESSION_MAX] + values = {point.attributes["ydb.query.session.pool.name"]: point.value for point in metric.data.data_points} + + assert len(values) == 2 + assert sorted(values.values()) == [24, 42] + + +def test_retry_operation_sync_records_retry_metrics(metrics_setup): + from ydb import issues + from ydb.opentelemetry.metrics import RETRY_ATTEMPTS, RETRY_DURATION + from ydb.retries import RetrySettings, retry_operation_sync + + attempts = {"count": 0} + + def flaky(): + attempts["count"] += 1 + if attempts["count"] < 3: + raise issues.Aborted("retry") + return "ok" + + assert retry_operation_sync(flaky, RetrySettings(max_retries=5)) == "ok" + + metrics = _metrics_by_name(metrics_setup) + duration = _single_point_from_metrics(metrics, RETRY_DURATION) + retry_attempts = _single_point_from_metrics(metrics, RETRY_ATTEMPTS) + + assert duration.count == 1 + assert duration.sum >= 0 + assert duration.attributes == {} + assert retry_attempts.sum == 3 + + +async def _async_value(): + return "ok" + + +@pytest.mark.asyncio +async def test_retry_operation_async_records_retry_metrics(metrics_setup): + from ydb.opentelemetry.metrics import RETRY_ATTEMPTS, RETRY_DURATION + from ydb.retries import retry_operation_async + + assert await retry_operation_async(_async_value) == "ok" + + metrics = _metrics_by_name(metrics_setup) + duration = _single_point_from_metrics(metrics, RETRY_DURATION) + retry_attempts = _single_point_from_metrics(metrics, RETRY_ATTEMPTS) + + assert duration.count == 1 + assert duration.sum >= 0 + assert duration.attributes == {} + assert retry_attempts.sum == 1 diff --git a/tests/tracing/test_tracing_async.py b/tests/opentelemetry/test_tracing_async.py similarity index 92% rename from tests/tracing/test_tracing_async.py rename to tests/opentelemetry/test_tracing_async.py index 6b4e96ad1..1276fa69d 100644 --- a/tests/tracing/test_tracing_async.py +++ b/tests/opentelemetry/test_tracing_async.py @@ -70,8 +70,8 @@ def _make_fresh_async_tx(session, driver): class TestAsyncCreateSessionSpan: @pytest.mark.asyncio - async def test_create_session_emits_span(self, otel_setup): - exporter = otel_setup + async def test_create_session_emits_span(self, tracing_setup): + exporter = tracing_setup from ydb.aio.query.session import QuerySession @@ -95,8 +95,8 @@ async def test_create_session_emits_span(self, otel_setup): assert attrs["server.address"] == "test_endpoint" assert attrs["server.port"] == 1337 - def test_async_connection_peer_attributes_are_resolved(self, otel_setup): - exporter = otel_setup + def test_async_connection_peer_attributes_are_resolved(self, tracing_setup): + exporter = tracing_setup from ydb.aio.connection import Connection from ydb.connection import EndpointOptions @@ -140,8 +140,8 @@ def test_async_connection_peer_attributes_are_resolved(self, otel_setup): class TestAsyncExecuteQuerySpan: @pytest.mark.asyncio - async def test_session_execute_emits_span(self, otel_setup): - exporter = otel_setup + async def test_session_execute_emits_span(self, tracing_setup): + exporter = tracing_setup from ydb.aio.query.session import QuerySession @@ -169,8 +169,8 @@ async def test_session_execute_emits_span(self, otel_setup): assert "ydb.session.id" not in attrs @pytest.mark.asyncio - async def test_tx_execute_emits_span(self, otel_setup): - exporter = otel_setup + async def test_tx_execute_emits_span(self, tracing_setup): + exporter = tracing_setup session, driver = _make_async_session_mock(peer=("n1", 2136, "dc-a")) tx = _make_async_tx(session, driver) @@ -192,8 +192,8 @@ async def test_tx_execute_emits_span(self, otel_setup): class TestAsyncBeginTransactionSpan: @pytest.mark.asyncio - async def test_begin_emits_span(self, otel_setup): - exporter = otel_setup + async def test_begin_emits_span(self, tracing_setup): + exporter = tracing_setup session, driver = _make_async_session_mock(peer=("n1", 2136, "dc-a")) tx = _make_fresh_async_tx(session, driver) @@ -214,10 +214,10 @@ async def test_begin_emits_span(self, otel_setup): assert span.status.status_code == StatusCode.UNSET @pytest.mark.asyncio - async def test_begin_sets_error_status_on_failure(self, otel_setup): + async def test_begin_sets_error_status_on_failure(self, tracing_setup): from ydb import issues - exporter = otel_setup + exporter = tracing_setup session, driver = _make_async_session_mock(peer=("n1", 2136, "dc-a")) tx = _make_fresh_async_tx(session, driver) @@ -236,8 +236,8 @@ async def test_begin_sets_error_status_on_failure(self, otel_setup): class TestAsyncCommitSpan: @pytest.mark.asyncio - async def test_commit_emits_span(self, otel_setup): - exporter = otel_setup + async def test_commit_emits_span(self, tracing_setup): + exporter = tracing_setup session, driver = _make_async_session_mock(peer=("n1", 2136, "dc-a")) tx = _make_async_tx(session, driver) @@ -255,8 +255,8 @@ async def test_commit_emits_span(self, otel_setup): class TestAsyncRollbackSpan: @pytest.mark.asyncio - async def test_rollback_emits_span(self, otel_setup): - exporter = otel_setup + async def test_rollback_emits_span(self, tracing_setup): + exporter = tracing_setup session, driver = _make_async_session_mock(peer=("n1", 2136, "dc-a")) tx = _make_async_tx(session, driver) @@ -279,10 +279,10 @@ class TestAsyncCommitRollbackErrorRecording: """ @pytest.mark.asyncio - async def test_commit_records_exception_on_failure(self, otel_setup): + async def test_commit_records_exception_on_failure(self, tracing_setup): from ydb import issues - exporter = otel_setup + exporter = tracing_setup session, driver = _make_async_session_mock(peer=("n1", 2136, "dc-a")) tx = _make_async_tx(session, driver) @@ -299,10 +299,10 @@ async def test_commit_records_exception_on_failure(self, otel_setup): assert any(e.name == "exception" for e in span.events) @pytest.mark.asyncio - async def test_rollback_records_exception_on_failure(self, otel_setup): + async def test_rollback_records_exception_on_failure(self, tracing_setup): from ydb import issues - exporter = otel_setup + exporter = tracing_setup session, driver = _make_async_session_mock(peer=("n1", 2136, "dc-a")) tx = _make_async_tx(session, driver) @@ -321,8 +321,8 @@ async def test_rollback_records_exception_on_failure(self, otel_setup): class TestAsyncErrorHandling: @pytest.mark.asyncio - async def test_error_sets_error_status_and_attributes(self, otel_setup): - exporter = otel_setup + async def test_error_sets_error_status_and_attributes(self, tracing_setup): + exporter = tracing_setup from ydb import issues @@ -353,10 +353,10 @@ async def test_error_sets_error_status_and_attributes(self, otel_setup): class TestAsyncRetryPolicySpans: @pytest.mark.asyncio - async def test_success_emits_single_try(self, otel_setup): + async def test_success_emits_single_try(self, tracing_setup): from ydb.retries import retry_operation_async - exporter = otel_setup + exporter = tracing_setup async def callee(): return 7 @@ -373,12 +373,12 @@ async def callee(): assert tries[0].status.status_code == StatusCode.UNSET @pytest.mark.asyncio - async def test_retry_failed_tries_set_error_status(self, otel_setup): + async def test_retry_failed_tries_set_error_status(self, tracing_setup): """Failed async attempts must set ``ydb.Try`` status to ERROR (not UNSET).""" from ydb import issues from ydb.retries import BackoffSettings, RetrySettings, retry_operation_async - exporter = otel_setup + exporter = tracing_setup counter = {"n": 0} async def flaky(): @@ -402,14 +402,14 @@ async def flaky(): assert tries[2].status.status_code == StatusCode.UNSET @pytest.mark.asyncio - async def test_context_cancel_during_backoff_records_exception(self, otel_setup): + async def test_context_cancel_during_backoff_records_exception(self, tracing_setup): """Inter-attempt sleep is outside ``ydb.Try``; cancellation during ``asyncio.sleep`` is recorded on ``ydb.RunWithRetry`` (``record_exception``). """ from ydb import issues from ydb.retries import BackoffSettings, RetrySettings, retry_operation_async - exporter = otel_setup + exporter = tracing_setup calls = {"n": 0} async def flaky(): @@ -442,7 +442,7 @@ async def flaky(): class TestAsyncRetrySpanNesting: @pytest.mark.asyncio - async def test_execute_query_is_child_of_try_under_run_with_retry(self, otel_setup): + async def test_execute_query_is_child_of_try_under_run_with_retry(self, tracing_setup): """``ydb.RunWithRetry`` -> ``ydb.Try`` -> ``ydb.ExecuteQuery`` (deep nesting). The previous implementation produced sibling spans because ``ydb.Try`` was @@ -452,7 +452,7 @@ async def test_execute_query_is_child_of_try_under_run_with_retry(self, otel_set from ydb.aio.query.session import QuerySession from ydb.retries import retry_operation_async - exporter = otel_setup + exporter = tracing_setup qs = QuerySession.__new__(QuerySession) cfg = FakeDriverConfig() @@ -485,9 +485,9 @@ async def callee(): class TestAsyncConcurrentSpansIsolation: @pytest.mark.asyncio - async def test_parallel_executes_do_not_become_parent_child(self, otel_setup): + async def test_parallel_executes_do_not_become_parent_child(self, tracing_setup): """Two concurrent execute calls must produce sibling spans, not parent-child.""" - exporter = otel_setup + exporter = tracing_setup from ydb.aio.query.session import QuerySession @@ -509,10 +509,10 @@ def _make_session(): async def do_execute(qs): fake_stream = _slow_async_iter() - with patch.object(QuerySession, "_execute_call", new_callable=AsyncMock, return_value=fake_stream): - result = await qs.execute("SELECT 1") - async for _ in result: - pass + qs._execute_call = AsyncMock(return_value=fake_stream) + result = await qs.execute("SELECT 1") + async for _ in result: + pass qs1 = _make_session() qs2 = _make_session() diff --git a/tests/tracing/test_tracing_sync.py b/tests/opentelemetry/test_tracing_sync.py similarity index 91% rename from tests/tracing/test_tracing_sync.py rename to tests/opentelemetry/test_tracing_sync.py index 9f8bbc421..1e9af1c78 100644 --- a/tests/tracing/test_tracing_sync.py +++ b/tests/opentelemetry/test_tracing_sync.py @@ -67,8 +67,8 @@ def _make_fresh_tx(session, driver): class TestCreateSessionSpan: - def test_create_session_emits_span(self, otel_setup): - exporter = otel_setup + def test_create_session_emits_span(self, tracing_setup): + exporter = tracing_setup from ydb.query.session import QuerySession @@ -95,8 +95,8 @@ def test_create_session_emits_span(self, otel_setup): class TestExecuteQuerySpan: - def test_session_execute_emits_span(self, otel_setup): - exporter = otel_setup + def test_session_execute_emits_span(self, tracing_setup): + exporter = tracing_setup from ydb.query.session import QuerySession @@ -130,8 +130,8 @@ def test_session_execute_emits_span(self, otel_setup): assert "ydb.session.id" not in attrs assert "ydb.tx.id" not in attrs - def test_tx_execute_emits_span(self, otel_setup): - exporter = otel_setup + def test_tx_execute_emits_span(self, tracing_setup): + exporter = tracing_setup session, driver = _make_session_mock(peer=("n1", 2136, "dc-a")) tx = _make_tx(session, driver) @@ -152,8 +152,8 @@ def test_tx_execute_emits_span(self, otel_setup): class TestBeginTransactionSpan: - def test_begin_emits_span(self, otel_setup): - exporter = otel_setup + def test_begin_emits_span(self, tracing_setup): + exporter = tracing_setup session, driver = _make_session_mock(peer=("n1", 2136, "dc-a")) tx = _make_fresh_tx(session, driver) @@ -173,10 +173,10 @@ def test_begin_emits_span(self, otel_setup): assert "ydb.tx.id" not in attrs assert span.status.status_code == StatusCode.UNSET - def test_begin_sets_error_status_on_failure(self, otel_setup): + def test_begin_sets_error_status_on_failure(self, tracing_setup): from ydb import issues - exporter = otel_setup + exporter = tracing_setup session, driver = _make_session_mock(peer=("n1", 2136, "dc-a")) tx = _make_fresh_tx(session, driver) @@ -194,8 +194,8 @@ def test_begin_sets_error_status_on_failure(self, otel_setup): class TestCommitSpan: - def test_commit_emits_span(self, otel_setup): - exporter = otel_setup + def test_commit_emits_span(self, tracing_setup): + exporter = tracing_setup session, driver = _make_session_mock(peer=("n1", 2136, "dc-a")) tx = _make_tx(session, driver) @@ -214,8 +214,8 @@ def test_commit_emits_span(self, otel_setup): class TestRollbackSpan: - def test_rollback_emits_span(self, otel_setup): - exporter = otel_setup + def test_rollback_emits_span(self, tracing_setup): + exporter = tracing_setup session, driver = _make_session_mock(peer=("n1", 2136, "dc-a")) tx = _make_tx(session, driver) @@ -240,10 +240,10 @@ class TestCommitRollbackErrorRecording: - have the exception recorded as a span event (``record_exception``) """ - def test_commit_records_exception_on_failure(self, otel_setup): + def test_commit_records_exception_on_failure(self, tracing_setup): from ydb import issues - exporter = otel_setup + exporter = tracing_setup session, driver = _make_session_mock(peer=("n1", 2136, "dc-a")) tx = _make_tx(session, driver) @@ -259,10 +259,10 @@ def test_commit_records_exception_on_failure(self, otel_setup): assert attrs["db.response.status_code"] == "ABORTED" assert any(e.name == "exception" for e in span.events) - def test_rollback_records_exception_on_failure(self, otel_setup): + def test_rollback_records_exception_on_failure(self, tracing_setup): from ydb import issues - exporter = otel_setup + exporter = tracing_setup session, driver = _make_session_mock(peer=("n1", 2136, "dc-a")) tx = _make_tx(session, driver) @@ -280,8 +280,8 @@ def test_rollback_records_exception_on_failure(self, otel_setup): class TestErrorHandling: - def test_error_sets_error_status_and_attributes(self, otel_setup): - exporter = otel_setup + def test_error_sets_error_status_and_attributes(self, tracing_setup): + exporter = tracing_setup from ydb import issues @@ -314,7 +314,7 @@ class TestNoSpansWhenDisabled: def test_no_spans_without_enable_tracing(self): """Without enable_tracing(), the registry uses noop — no spans are created.""" - from tests.tracing.conftest import _exporter + from tests.opentelemetry.conftest import _exporter _registry.set_create_span(None) _registry.set_metadata_hook(None) @@ -327,8 +327,8 @@ def test_no_spans_without_enable_tracing(self): class TestParentChildRelationship: - def test_sdk_span_is_child_of_user_span(self, otel_setup): - exporter = otel_setup + def test_sdk_span_is_child_of_user_span(self, tracing_setup): + exporter = tracing_setup tracer = trace.get_tracer("test.tracer") @@ -346,7 +346,7 @@ def test_sdk_span_is_child_of_user_span(self, otel_setup): class TestTraceMetadataInjection: - def test_get_trace_metadata_returns_traceparent(self, otel_setup): + def test_get_trace_metadata_returns_traceparent(self, tracing_setup): from ydb.opentelemetry.tracing import get_trace_metadata tracer = trace.get_tracer("test.tracer") @@ -359,8 +359,8 @@ def test_get_trace_metadata_returns_traceparent(self, otel_setup): class TestDriverInitializeSpan: - def test_driver_initialize_emits_internal_span(self, otel_setup): - exporter = otel_setup + def test_driver_initialize_emits_internal_span(self, tracing_setup): + exporter = tracing_setup cfg = FakeDriverConfig() @@ -383,8 +383,8 @@ class TestCommonAttributes: ("[::1]:2136", "[::1]", 2136), ], ) - def test_endpoint_parsing(self, otel_setup, endpoint, expected_host, expected_port): - exporter = otel_setup + def test_endpoint_parsing(self, tracing_setup, endpoint, expected_host, expected_port): + exporter = tracing_setup cfg = FakeDriverConfig(endpoint=endpoint, database="/mydb") with create_ydb_span("ydb.Test", cfg).attach_context(): @@ -396,8 +396,8 @@ def test_endpoint_parsing(self, otel_setup, endpoint, expected_host, expected_po assert attrs["server.port"] == expected_port assert attrs["db.namespace"] == "/mydb" - def test_peer_attributes_are_optional(self, otel_setup): - exporter = otel_setup + def test_peer_attributes_are_optional(self, tracing_setup): + exporter = tracing_setup cfg = FakeDriverConfig() with create_ydb_span("ydb.Test", cfg).attach_context(): @@ -408,8 +408,8 @@ def test_peer_attributes_are_optional(self, otel_setup): assert "network.peer.address" not in attrs assert "network.peer.port" not in attrs - def test_peer_attributes_emitted_when_known(self, otel_setup): - exporter = otel_setup + def test_peer_attributes_emitted_when_known(self, tracing_setup): + exporter = tracing_setup cfg = FakeDriverConfig() with create_ydb_span("ydb.Test", cfg, peer=("peer.example.com", 2137, "dc-west")).attach_context(): @@ -423,7 +423,7 @@ def test_peer_attributes_emitted_when_known(self, otel_setup): class TestPeerFromEndpointMap: - def test_wrapper_create_session_pulls_peer_from_store(self, otel_setup): + def test_wrapper_create_session_pulls_peer_from_store(self, tracing_setup): """wrapper_create_session must resolve peer (host, port, dc) via the driver's connections_by_node_id cache, not via the grpc target string of the rpc call. """ @@ -454,10 +454,10 @@ def test_wrapper_create_session_pulls_peer_from_store(self, otel_setup): class TestRetryPolicySpans: - def test_success_on_first_try_emits_single_try(self, otel_setup): + def test_success_on_first_try_emits_single_try(self, tracing_setup): from ydb.retries import retry_operation_sync - exporter = otel_setup + exporter = tracing_setup def callee(): return 42 @@ -474,12 +474,12 @@ def callee(): assert "ydb.retry.backoff_ms" not in dict(tries[0].attributes) assert tries[0].parent.span_id == run.context.span_id - def test_retry_backoff_ms_on_each_try(self, otel_setup): + def test_retry_backoff_ms_on_each_try(self, tracing_setup): from ydb import issues from ydb.retries import retry_operation_sync from ydb.retries import RetrySettings, BackoffSettings - exporter = otel_setup + exporter = tracing_setup counter = {"n": 0} def flaky(): @@ -509,7 +509,7 @@ def flaky(): assert tries[1].status.status_code == StatusCode.ERROR assert tries[2].status.status_code == StatusCode.UNSET - def test_backoff_ms_attribute_matches_actual_sleep(self, otel_setup, monkeypatch): + def test_backoff_ms_attribute_matches_actual_sleep(self, tracing_setup, monkeypatch): """Pin the closure: ``ydb.retry.backoff_ms`` on the n-th ``ydb.Try`` equals the sleep that preceded it, regardless of which retry attempt triggered it. @@ -528,7 +528,7 @@ def test_backoff_ms_attribute_matches_actual_sleep(self, otel_setup, monkeypatch sleeps = [] monkeypatch.setattr("time.sleep", sleeps.append) - exporter = otel_setup + exporter = tracing_setup counter = {"n": 0} def flaky(): @@ -553,12 +553,12 @@ def flaky(): assert dict(tries[2].attributes)["ydb.retry.backoff_ms"] == expected_ms assert sleeps == [expected_ms / 1000.0, expected_ms / 1000.0] - def test_skip_backoff_errors_still_emit_one_try_per_attempt(self, otel_setup): + def test_skip_backoff_errors_still_emit_one_try_per_attempt(self, tracing_setup): """Aborted/BadSession path skips the inter-attempt sleep but must still rotate ydb.Try spans.""" from ydb import issues from ydb.retries import RetrySettings, retry_operation_sync - exporter = otel_setup + exporter = tracing_setup counter = {"n": 0} def flaky(): @@ -581,11 +581,11 @@ def flaky(): assert dict(tries[1].attributes)["ydb.retry.backoff_ms"] == 0 assert dict(tries[2].attributes)["ydb.retry.backoff_ms"] == 0 - def test_non_retryable_error_propagates_to_run_span(self, otel_setup): + def test_non_retryable_error_propagates_to_run_span(self, tracing_setup): from ydb import issues from ydb.retries import retry_operation_sync - exporter = otel_setup + exporter = tracing_setup def broken(): raise issues.SchemeError("boom") @@ -603,12 +603,12 @@ def broken(): assert attrs["error.type"] == "ydb_error" assert attrs["db.response.status_code"] == "SCHEME_ERROR" - def test_execute_query_is_child_of_try_under_run_with_retry(self, otel_setup): + def test_execute_query_is_child_of_try_under_run_with_retry(self, tracing_setup): """``ydb.RunWithRetry`` -> ``ydb.Try`` -> ``ydb.ExecuteQuery`` (sync path).""" from ydb.query.session import QuerySession from ydb.retries import retry_operation_sync - exporter = otel_setup + exporter = tracing_setup qs = QuerySession.__new__(QuerySession) cfg = FakeDriverConfig() diff --git a/ydb/aio/query/pool.py b/ydb/aio/query/pool.py index a0d9d93c9..16c44b842 100644 --- a/ydb/aio/query/pool.py +++ b/ydb/aio/query/pool.py @@ -2,6 +2,7 @@ import asyncio import logging +import time from typing import ( Callable, Optional, @@ -22,6 +23,15 @@ from ...query.base import QueryClientSettings from ... import convert from ... import issues +from ...opentelemetry.metrics import ( + query_session_pool_name, + record_query_session_count, + record_query_session_create_time, + record_query_session_max, + record_query_session_pending_requests, + record_query_session_timeout, + remove_query_session_pool_metrics, +) from ..._grpc.grpcwrapper import common_utils from ..._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public @@ -38,11 +48,13 @@ def __init__( *, query_client_settings: Optional[QueryClientSettings] = None, loop: Optional[asyncio.AbstractEventLoop] = None, + name: Optional[str] = None, ): """ :param driver: A driver instance :param size: Size of session pool :param query_client_settings: ydb.QueryClientSettings object to configure QueryService behavior + :param name: Optional session pool name for OpenTelemetry metrics. """ self._driver = driver @@ -52,10 +64,17 @@ def __init__( self._current_size = 0 self._loop = asyncio.get_running_loop() if loop is None else loop self._query_client_settings = query_client_settings + driver_config = getattr(driver, "_driver_config", None) + self._metrics_pool_name = query_session_pool_name(name, getattr(driver_config, "endpoint", None)) + record_query_session_max(self._size, self._metrics_pool_name) async def _create_new_session(self): session = QuerySession(self._driver, settings=self._query_client_settings) + session._metrics_pool_name = self._metrics_pool_name + session._metrics_state = "used" + start_time = time.monotonic() await session.create() + record_query_session_create_time(time.monotonic() - start_time, self._metrics_pool_name) logger.debug(f"New session was created for pool. Session id: {session.session_id}") return session @@ -81,6 +100,7 @@ async def acquire(self, timeout: Optional[float] = None) -> QuerySession: pass if session is None and self._current_size == self._size: + record_query_session_pending_requests(1, self._metrics_pool_name) queue_get = asyncio.ensure_future(self._queue.get()) task_stop = asyncio.ensure_future(self._should_stop.wait()) task_timeout = ( @@ -97,6 +117,8 @@ async def acquire(self, timeout: Optional[float] = None) -> QuerySession: if not cancelled and not queue_get.exception(): await self.release(queue_get.result()) raise + finally: + record_query_session_pending_requests(-1, self._metrics_pool_name) task_stop.cancel() if task_timeout is not None: @@ -110,12 +132,16 @@ async def acquire(self, timeout: Optional[float] = None) -> QuerySession: cancelled = queue_get.cancel() if not cancelled and not queue_get.exception(): await self.release(queue_get.result()) + record_query_session_timeout(self._metrics_pool_name) raise issues.SessionPoolEmpty("Timeout on acquire session") session = queue_get.result() if session is not None: if session.is_active: + record_query_session_count(-1, self._metrics_pool_name, "idle") + session._metrics_state = "used" + record_query_session_count(1, self._metrics_pool_name, "used") logger.debug(f"Acquired active session from queue: {session.session_id}") return session else: @@ -137,6 +163,9 @@ async def acquire(self, timeout: Optional[float] = None) -> QuerySession: async def release(self, session: QuerySession) -> None: """Release a session back to Session Pool.""" + record_query_session_count(-1, self._metrics_pool_name, "used") + session._metrics_state = "idle" + record_query_session_count(1, self._metrics_pool_name, "idle") self._queue.put_nowait(session) logger.debug("Session returned to queue: %s", session.session_id) @@ -268,6 +297,7 @@ async def stop(self): await asyncio.gather(*tasks) logger.debug("All session were deleted.") + remove_query_session_pool_metrics(self._metrics_pool_name) async def __aenter__(self): return self diff --git a/ydb/aio/query/pool_test.py b/ydb/aio/query/pool_test.py index de33a8e02..ad62f30e1 100644 --- a/ydb/aio/query/pool_test.py +++ b/ydb/aio/query/pool_test.py @@ -19,6 +19,7 @@ def _make_pool(size=1): pool._current_size = 0 pool._loop = asyncio.get_event_loop() pool._query_client_settings = None + pool._metrics_pool_name = "test-query-session-pool" return pool diff --git a/ydb/aio/query/session.py b/ydb/aio/query/session.py index b776b6382..081ad2baa 100644 --- a/ydb/aio/query/session.py +++ b/ydb/aio/query/session.py @@ -13,6 +13,7 @@ from .transaction import QueryTxContext from .. import _utilities from ... import issues +from ...opentelemetry.metrics import record_query_session_count from ...settings import BaseRequestSettings from ..._grpc.grpcwrapper import common_utils from ..._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public @@ -110,6 +111,13 @@ async def create(self, settings: Optional[BaseRequestSettings] = None) -> "Query await self._create_call(settings=settings) set_peer_attributes(span, self._peer) await self._attach() + if not getattr(self, "_metrics_counted", False): + record_query_session_count( + 1, + pool_name=getattr(self, "_metrics_pool_name", None), + state=getattr(self, "_metrics_state", "used"), + ) + self._metrics_counted = True return self diff --git a/ydb/opentelemetry/__init__.py b/ydb/opentelemetry/__init__.py index fc058d0d8..844c397e3 100644 --- a/ydb/opentelemetry/__init__.py +++ b/ydb/opentelemetry/__init__.py @@ -13,7 +13,7 @@ def enable_tracing(tracer=None): ``ydb.sdk`` from the global tracer provider will be used. """ try: - from ydb.opentelemetry.plugin import _enable_tracing + from ydb.opentelemetry.tracing_plugin import _enable_tracing except ImportError: raise ImportError( "OpenTelemetry packages are required for tracing support. " @@ -26,11 +26,48 @@ def enable_tracing(tracer=None): def disable_tracing(): """Disable YDB OpenTelemetry hooks and allow :func:`enable_tracing` to run again.""" try: - from ydb.opentelemetry.plugin import _disable_tracing + from ydb.opentelemetry.tracing_plugin import _disable_tracing except ImportError: return _disable_tracing() -__all__ = ["disable_tracing", "enable_tracing"] +def enable_metrics(meter_provider=None): + """Enable OpenTelemetry metrics collection for YDB SDK client metrics. + + This call is **idempotent**: if metrics are already enabled, later calls do nothing + (including passing a different ``meter_provider``). Call :func:`disable_metrics` + first to reconfigure or turn instrumentation off. + + Args: + meter_provider: Optional OTel meter provider to use. If not provided, the + default meter named ``ydb.sdk`` from the global meter provider will be used. + """ + try: + from ydb.opentelemetry.metrics_plugin import _enable_metrics + except ImportError: + raise ImportError( + "OpenTelemetry packages are required for metrics support. " + "Install them with: pip install ydb[opentelemetry]" + ) from None + + _enable_metrics(meter_provider) + + +def disable_metrics(): + """Disable YDB OpenTelemetry metrics collection and allow :func:`enable_metrics` to run again.""" + try: + from ydb.opentelemetry.metrics_plugin import _disable_metrics + except ImportError: + return + + _disable_metrics() + + +__all__ = [ + "disable_tracing", + "enable_tracing", + "disable_metrics", + "enable_metrics", +] diff --git a/ydb/opentelemetry/_endpoint.py b/ydb/opentelemetry/_endpoint.py new file mode 100644 index 000000000..a88b19bc3 --- /dev/null +++ b/ydb/opentelemetry/_endpoint.py @@ -0,0 +1,21 @@ +from typing import Optional, Tuple + + +def split_endpoint(endpoint: Optional[str]) -> Tuple[str, int]: + ep = endpoint or "" + if ep.startswith("grpcs://"): + ep = ep[len("grpcs://") :] + elif ep.startswith("grpc://"): + ep = ep[len("grpc://") :] + + if ep.startswith("["): + close = ep.find("]") + if close != -1 and len(ep) > close + 1 and ep[close + 1] == ":": + host = ep[: close + 1] + port_s = ep[close + 2 :] + return host, int(port_s) if port_s.isdigit() else 0 + + host, sep, port_s = ep.rpartition(":") + if not sep: + return ep, 0 + return host, int(port_s) if port_s.isdigit() else 0 diff --git a/ydb/opentelemetry/metrics.py b/ydb/opentelemetry/metrics.py new file mode 100644 index 000000000..7afec52d2 --- /dev/null +++ b/ydb/opentelemetry/metrics.py @@ -0,0 +1,313 @@ +"""No-op-safe helpers for YDB OpenTelemetry client metrics. + +The SDK records metrics only after :func:`ydb.opentelemetry.enable_metrics` +installs the OpenTelemetry-backed registry from ``metrics_plugin``. Until then +every helper delegates to a no-op registry, which keeps metrics independent from +tracing and safe to call from hot paths. +""" + +import time +import threading +import itertools +from typing import Any, Dict, Optional + +from ydb.opentelemetry._endpoint import split_endpoint + +CLIENT_OPERATION_DURATION = "db.client.operation.duration" +CLIENT_OPERATION_FAILED = "ydb.client.operation.failed" +QUERY_SESSION_COUNT = "ydb.query.session.count" +QUERY_SESSION_CREATE_TIME = "ydb.query.session.create_time" +QUERY_SESSION_PENDING_REQUESTS = "ydb.query.session.pending_requests" +QUERY_SESSION_TIMEOUTS = "ydb.query.session.timeouts" +QUERY_SESSION_MAX = "ydb.query.session.max" +QUERY_SESSION_MIN = "ydb.query.session.min" +RETRY_ATTEMPTS = "ydb.client.retry.attempts" +RETRY_DURATION = "ydb.client.retry.duration" + +DURATION_BUCKETS_SECONDS = ( + 0.001, + 0.005, + 0.01, + 0.05, + 0.1, + 0.5, + 1, + 5, + 10, +) +RETRY_DURATION_BUCKETS_SECONDS = ( + 0.001, + 0.005, + 0.01, + 0.05, + 0.1, + 0.5, + 1, + 2, + 5, + 10, + 30, +) +ATTEMPT_BUCKETS = (1, 2, 3, 4, 5, 7, 10, 20) +_UNKNOWN_POOL = "unknown" +_pool_name_counter = itertools.count(1) +_pool_name_lock = threading.Lock() +_OPERATION_ATTR_KEYS = frozenset( + { + "database", + "endpoint", + "operation.name", + } +) +_CLIENT_OPERATION_NAMES = frozenset( + { + "ExecuteQuery", + "Commit", + "Rollback", + "CreateSession", + "BeginTransaction", + } +) +_CLIENT_OPERATION_NAME_BY_INPUT = { + "ydb.ExecuteQuery": "ExecuteQuery", + "ExecuteQuery": "ExecuteQuery", + "ydb.Commit": "Commit", + "Commit": "Commit", + "ydb.Rollback": "Rollback", + "Rollback": "Rollback", + "ydb.CreateSession": "CreateSession", + "CreateSession": "CreateSession", + "ydb.BeginTransaction": "BeginTransaction", + "BeginTransaction": "BeginTransaction", +} + + +class MetricRegistry: + """No-op metric registry used until the OpenTelemetry metrics plugin is enabled.""" + + enabled = False + + def create_metrics_operation(self, name: str, attributes: Optional[Dict[str, Any]] = None): + return _NOOP_METRICS_OPERATION + + def clear(self) -> None: + pass + + def add(self, name: str, value: int, attributes: Optional[Dict[str, Any]] = None) -> None: + pass + + def record(self, name: str, value: float, attributes: Optional[Dict[str, Any]] = None) -> None: + pass + + def add_query_session_count(self, value: int, attributes: Optional[Dict[str, Any]] = None) -> None: + pass + + def set_query_session_max(self, value: int, attributes: Optional[Dict[str, Any]] = None) -> None: + pass + + def remove_query_session_pool(self, attributes: Optional[Dict[str, Any]] = None) -> None: + pass + + +class _NoopMetricsOperation: + def set_error(self, exception: BaseException) -> None: + pass + + def set_attribute(self, key: str, value: Any) -> None: + pass + + def attach_context(self, end_on_exit=True) -> "_NoopMetricsOperationContext": + return _NoopMetricsOperationContext(self) + + def end(self) -> None: + pass + + def __enter__(self) -> "_NoopMetricsOperation": + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + return False + + +class _NoopMetricsOperationContext: + def __init__(self, operation: _NoopMetricsOperation) -> None: + self._operation = operation + + def __enter__(self) -> _NoopMetricsOperation: + return self._operation + + def __exit__(self, exc_type, exc_val, exc_tb): + return False + + +_NOOP_METRICS_OPERATION = _NoopMetricsOperation() +_NOOP_METRICS_REGISTRY = MetricRegistry() +_metrics_registry: MetricRegistry = _NOOP_METRICS_REGISTRY + + +def is_metrics_enabled() -> bool: + return _metrics_registry.enabled + + +def next_query_session_pool_name() -> str: + """Return a process-unique default query session pool name for metric labels.""" + return "query-session-pool-%d" % next(_pool_name_counter) + + +def query_session_pool_name(name: Optional[str], endpoint: Optional[str]) -> str: + return name or endpoint or next_query_session_pool_name() + + +def _set_metrics_registry(metrics_registry: MetricRegistry) -> None: + global _metrics_registry + + _metrics_registry = metrics_registry + + +def _reset_metrics_registry() -> None: + global _metrics_registry + + _metrics_registry.clear() + _metrics_registry = _NOOP_METRICS_REGISTRY + + +def _pool_attrs(pool_name: Optional[str]) -> Dict[str, Any]: + return {"ydb.query.session.pool.name": pool_name or _UNKNOWN_POOL} + + +def _build_ydb_metrics_attrs(driver_config) -> Dict[str, Any]: + host, port = split_endpoint(getattr(driver_config, "endpoint", None)) + endpoint = "%s:%d" % (host, port) if port else host + return { + "database": getattr(driver_config, "database", None) or "", + "endpoint": endpoint, + } + + +def _operation_name(operation_name: str) -> str: + return _CLIENT_OPERATION_NAME_BY_INPUT.get(operation_name, operation_name) + + +def _operation_attrs(operation_name: str, attributes: Dict[str, Any]) -> Dict[str, Any]: + name = _operation_name(operation_name) + return { + "database": attributes.get("database", ""), + "endpoint": attributes.get("endpoint", ""), + "operation.name": name, + } + + +def _response_status_code(exception: BaseException) -> str: + status = getattr(exception, "status", None) + if status is not None: + return getattr(status, "name", str(status)) + return type(exception).__qualname__ + + +class MetricsOperation: + """Metric lifecycle object for one user-visible YDB client operation. + + ``MetricsOperation`` mirrors the small span-like interface used by tracing + so both can be composed by ``create_ydb_span``. It records operation + duration once, records a failed-operation counter when an exception is + attached, and accepts only stable operation labels. + """ + + def __init__(self, name: str, attributes: Optional[Dict[str, Any]] = None) -> None: + self._name = name + self._attributes = _operation_attrs(name, attributes or {}) + self._start_time = time.monotonic() + self._exception: Optional[BaseException] = None + self._ended = False + self._end_lock = threading.Lock() + + def set_error(self, exception: BaseException) -> None: + """Remember the operation exception for the failed-operation metric.""" + self._exception = exception + + def set_attribute(self, key: str, value: Any) -> None: + """Set a metric label only when it is part of the operation metric contract.""" + if key in _OPERATION_ATTR_KEYS: + self._attributes[key] = value + + def attach_context(self, end_on_exit=True) -> "_MetricsOperationContext": + return _MetricsOperationContext(self, end_on_exit) + + def end(self) -> None: + with self._end_lock: + if self._ended: + return + self._ended = True + + duration = time.monotonic() - self._start_time + _metrics_registry.record(CLIENT_OPERATION_DURATION, duration, self._attributes) + + if self._exception is not None: + attrs = dict(self._attributes) + attrs["status_code"] = _response_status_code(self._exception) + _metrics_registry.add(CLIENT_OPERATION_FAILED, 1, attrs) + + def __enter__(self) -> "MetricsOperation": + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_val is not None: + self.set_error(exc_val) + self.end() + return False + + +class _MetricsOperationContext: + """Context manager that optionally leaves ``end()`` to a streaming result iterator.""" + + def __init__(self, operation: MetricsOperation, end_on_exit: bool) -> None: + self._operation = operation + self._end_on_exit = end_on_exit + + def __enter__(self) -> MetricsOperation: + return self._operation + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_val is not None: + self._operation.set_error(exc_val) + self._operation.end() + elif self._end_on_exit: + self._operation.end() + return False + + +def create_metrics_operation(name: str, attributes: Optional[Dict[str, Any]] = None): + if _operation_name(name) not in _CLIENT_OPERATION_NAMES: + return _NOOP_METRICS_OPERATION + return _metrics_registry.create_metrics_operation(name, attributes) + + +def record_query_session_count(delta: int, pool_name: Optional[str] = None, state: str = "used") -> None: + attrs = _pool_attrs(pool_name) + attrs["ydb.query.session.state"] = state + _metrics_registry.add_query_session_count(delta, attrs) + + +def record_query_session_create_time(duration: float, pool_name: Optional[str]) -> None: + _metrics_registry.record(QUERY_SESSION_CREATE_TIME, duration, _pool_attrs(pool_name)) + + +def record_query_session_pending_requests(delta: int, pool_name: Optional[str]) -> None: + _metrics_registry.add(QUERY_SESSION_PENDING_REQUESTS, delta, _pool_attrs(pool_name)) + + +def record_query_session_timeout(pool_name: Optional[str]) -> None: + _metrics_registry.add(QUERY_SESSION_TIMEOUTS, 1, _pool_attrs(pool_name)) + + +def record_query_session_max(value: int, pool_name: Optional[str]) -> None: + _metrics_registry.set_query_session_max(value, _pool_attrs(pool_name)) + + +def remove_query_session_pool_metrics(pool_name: Optional[str]) -> None: + _metrics_registry.remove_query_session_pool(_pool_attrs(pool_name)) + + +def record_retry_metrics(duration: float, attempts: int) -> None: + _metrics_registry.record(RETRY_DURATION, duration) + _metrics_registry.record(RETRY_ATTEMPTS, attempts) diff --git a/ydb/opentelemetry/metrics_plugin.py b/ydb/opentelemetry/metrics_plugin.py new file mode 100644 index 000000000..9968cefb5 --- /dev/null +++ b/ydb/opentelemetry/metrics_plugin.py @@ -0,0 +1,231 @@ +"""OpenTelemetry metrics bridge for YDB.""" + +import threading +from typing import Any, Dict, Iterable, Optional, Union + +from opentelemetry import metrics as otel_metrics +from opentelemetry.metrics import ( + CallbackOptions, + Counter, + Histogram, + Meter, + MeterProvider, + ObservableUpDownCounter, + Observation, + UpDownCounter, +) + +from ydb.opentelemetry.metrics import ( + CLIENT_OPERATION_DURATION, + CLIENT_OPERATION_FAILED, + QUERY_SESSION_COUNT, + QUERY_SESSION_CREATE_TIME, + QUERY_SESSION_MAX, + QUERY_SESSION_MIN, + QUERY_SESSION_PENDING_REQUESTS, + QUERY_SESSION_TIMEOUTS, + RETRY_ATTEMPTS, + RETRY_DURATION, + ATTEMPT_BUCKETS, + DURATION_BUCKETS_SECONDS, + RETRY_DURATION_BUCKETS_SECONDS, + MetricRegistry as NoOpMetricRegistry, + MetricsOperation, + _reset_metrics_registry, + _set_metrics_registry, +) + +_MetricInstrument = Union[Counter, Histogram, ObservableUpDownCounter, UpDownCounter] + +_meter: Optional[Meter] = None + + +class MetricsRegistry(NoOpMetricRegistry): + """Process-wide OpenTelemetry metric instrument registry.""" + + enabled = True + + def __init__(self, meter: Meter) -> None: + self._query_session_count_values: Dict[Any, int] = {} + self._query_session_max_values: Dict[Any, int] = {} + self._observable_values_lock = threading.Lock() + self._instruments: Dict[str, _MetricInstrument] = { + CLIENT_OPERATION_DURATION: _create_histogram( + meter, + CLIENT_OPERATION_DURATION, + unit="s", + description="Duration of YDB client operations.", + bucket_boundaries=DURATION_BUCKETS_SECONDS, + ), + CLIENT_OPERATION_FAILED: meter.create_counter( + CLIENT_OPERATION_FAILED, + unit="{command}", + description="Number of failed YDB client operations.", + ), + QUERY_SESSION_COUNT: meter.create_observable_up_down_counter( + QUERY_SESSION_COUNT, + callbacks=[self._observe_query_session_count], + unit="{connection}", + description="Number of open YDB query sessions.", + ), + QUERY_SESSION_CREATE_TIME: _create_histogram( + meter, + QUERY_SESSION_CREATE_TIME, + unit="s", + description="Duration of YDB query session creation.", + bucket_boundaries=DURATION_BUCKETS_SECONDS, + ), + QUERY_SESSION_PENDING_REQUESTS: meter.create_up_down_counter( + QUERY_SESSION_PENDING_REQUESTS, + unit="{request}", + description="Number of requests waiting for a YDB query session.", + ), + QUERY_SESSION_TIMEOUTS: meter.create_counter( + QUERY_SESSION_TIMEOUTS, + unit="{connection}", + description="Number of YDB query session acquisition timeouts.", + ), + QUERY_SESSION_MAX: meter.create_observable_up_down_counter( + QUERY_SESSION_MAX, + callbacks=[self._observe_query_session_max], + unit="{connection}", + description="Maximum configured number of YDB query sessions.", + ), + QUERY_SESSION_MIN: meter.create_observable_up_down_counter( + QUERY_SESSION_MIN, + callbacks=[self._observe_query_session_min], + unit="{connection}", + description="Minimum configured number of YDB query sessions.", + ), + RETRY_DURATION: _create_histogram( + meter, + RETRY_DURATION, + unit="s", + description=( + "Total user-visible duration of a logical operation executed through the retry policy, " + "including all attempts and back-off delays." + ), + bucket_boundaries=RETRY_DURATION_BUCKETS_SECONDS, + ), + RETRY_ATTEMPTS: _create_histogram( + meter, + RETRY_ATTEMPTS, + unit="{attempt}", + description=( + "Total number of attempts performed by the retry policy for one logical operation. " + "A value of 1 means the operation succeeded on the first try." + ), + bucket_boundaries=ATTEMPT_BUCKETS, + ), + } + + def create_metrics_operation(self, name: str, attributes: Optional[Dict[str, Any]] = None) -> MetricsOperation: + return MetricsOperation(name, attributes) + + def clear(self) -> None: + self._instruments = {} + with self._observable_values_lock: + self._query_session_count_values = {} + self._query_session_max_values = {} + + def add(self, name: str, value: int, attributes: Optional[Dict[str, Any]] = None) -> None: + """Add ``value`` to a counter-like instrument if metrics are enabled.""" + instrument = self._instruments.get(name) + if instrument is not None: + instrument.add(value, attributes=attributes or {}) + + def record(self, name: str, value: float, attributes: Optional[Dict[str, Any]] = None) -> None: + """Record ``value`` in a histogram-like instrument if metrics are enabled.""" + instrument = self._instruments.get(name) + if instrument is not None: + instrument.record(value, attributes=attributes or {}) + + def add_query_session_count(self, value: int, attributes: Optional[Dict[str, Any]] = None) -> None: + attrs = tuple(sorted((attributes or {}).items())) + + with self._observable_values_lock: + new_value = self._query_session_count_values.get(attrs, 0) + value + + self._query_session_count_values.pop(attrs, None) + self._query_session_count_values[attrs] = new_value + + def set_query_session_max(self, value: int, attributes: Optional[Dict[str, Any]] = None) -> None: + attrs = tuple(sorted((attributes or {}).items())) + + with self._observable_values_lock: + self._query_session_max_values[attrs] = value + + def remove_query_session_pool(self, attributes: Optional[Dict[str, Any]] = None) -> None: + base_attrs = list((attributes or {}).items()) + attrs = tuple(sorted(base_attrs)) + idle_attrs = tuple(sorted(base_attrs + [("ydb.query.session.state", "idle")])) + used_attrs = tuple(sorted(base_attrs + [("ydb.query.session.state", "used")])) + + with self._observable_values_lock: + self._query_session_count_values.pop(idle_attrs, None) + self._query_session_count_values.pop(used_attrs, None) + self._query_session_max_values.pop(attrs, None) + + def _observe_query_session_count(self, _: CallbackOptions) -> Iterable[Observation]: + return self._observe(self._query_session_count_values) + + def _observe_query_session_max(self, _: CallbackOptions) -> Iterable[Observation]: + return self._observe(self._query_session_max_values) + + def _observe_query_session_min(self, _: CallbackOptions) -> Iterable[Observation]: + with self._observable_values_lock: + return [Observation(0, attributes=dict(attrs)) for attrs in self._query_session_max_values] + + def _observe(self, values: Dict[Any, int]) -> Iterable[Observation]: + with self._observable_values_lock: + return [Observation(value, attributes=dict(attrs)) for attrs, value in values.items()] + + +def _enable_metrics(meter_provider: Optional[MeterProvider]) -> None: + """Create SDK metric instruments from an OTel MeterProvider and enable recording.""" + global _meter + + if _meter is not None: + return + + if meter_provider is None: + _meter = otel_metrics.get_meter("ydb.sdk") + elif hasattr(meter_provider, "get_meter"): + _meter = meter_provider.get_meter("ydb.sdk") + else: + raise TypeError("meter_provider must be an OpenTelemetry MeterProvider") + + registry = MetricsRegistry(_meter) + _set_metrics_registry(registry) + + +def _disable_metrics() -> None: + global _meter + + _reset_metrics_registry() + _meter = None + + +def _create_histogram( + meter: Meter, + name: str, + unit: str, + description: str, + bucket_boundaries, +) -> Histogram: + """Create a histogram with bucket advice when the installed OpenTelemetry SDK supports it.""" + try: + return meter.create_histogram( + name, + unit=unit, + description=description, + explicit_bucket_boundaries_advisory=bucket_boundaries, + ) + except TypeError as e: + if "explicit_bucket_boundaries_advisory" not in str(e): + raise + return meter.create_histogram( + name, + unit=unit, + description=description, + ) diff --git a/ydb/opentelemetry/tracing.py b/ydb/opentelemetry/tracing.py index 1d0995df8..2fdd74b3c 100644 --- a/ydb/opentelemetry/tracing.py +++ b/ydb/opentelemetry/tracing.py @@ -1,7 +1,9 @@ -"""Internal SDK tracing helpers and registry.""" +"""Internal SDK tracing helpers and telemetry facade.""" import enum -from typing import Optional, Tuple + +from ydb.opentelemetry._endpoint import split_endpoint +from ydb.opentelemetry.metrics import _build_ydb_metrics_attrs, create_metrics_operation, is_metrics_enabled class SpanName(str, enum.Enum): @@ -31,7 +33,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): class _NoopSpan: - """Returned by create_ydb_span when tracing is disabled.""" + """Span-compatible object used when tracing is disabled.""" def set_error(self, exception): pass @@ -49,6 +51,52 @@ def attach_context(self, end_on_exit=True): _NOOP_SPAN = _NoopSpan() +class _TelemetryContext: + """Attach tracing context and metrics lifecycle for one SDK operation.""" + + def __init__(self, telemetry, span_context, metrics_context): + self._telemetry = telemetry + self._span_context = span_context + self._metrics_context = metrics_context + + def __enter__(self): + self._metrics_context.__enter__() + self._span_context.__enter__() + return self._telemetry + + def __exit__(self, exc_type, exc_val, exc_tb): + span_result = self._span_context.__exit__(exc_type, exc_val, exc_tb) + metrics_result = self._metrics_context.__exit__(exc_type, exc_val, exc_tb) + return bool(span_result or metrics_result) + + +class _TelemetryOperation: + """Span-like facade that forwards lifecycle events to tracing and metrics.""" + + def __init__(self, span, metrics): + self._span = span + self._metrics = metrics + + def set_error(self, exception): + self._span.set_error(exception) + self._metrics.set_error(exception) + + def set_attribute(self, key, value): + self._span.set_attribute(key, value) + self._metrics.set_attribute(key, value) + + def end(self): + self._span.end() + self._metrics.end() + + def attach_context(self, end_on_exit=True): + return _TelemetryContext( + self, + self._span.attach_context(end_on_exit=end_on_exit), + self._metrics.attach_context(end_on_exit=end_on_exit), + ) + + class OtelTracingRegistry: """Singleton registry for OpenTelemetry tracing. @@ -87,28 +135,8 @@ def get_trace_metadata(): return _registry.get_trace_metadata() -def _split_endpoint(endpoint: Optional[str]) -> Tuple[str, int]: - ep = endpoint or "" - if ep.startswith("grpcs://"): - ep = ep[len("grpcs://") :] - elif ep.startswith("grpc://"): - ep = ep[len("grpc://") :] - - if ep.startswith("["): - close = ep.find("]") - if close != -1 and len(ep) > close + 1 and ep[close + 1] == ":": - host = ep[: close + 1] - port_s = ep[close + 2 :] - return host, int(port_s) if port_s.isdigit() else 0 - - host, sep, port_s = ep.rpartition(":") - if not sep: - return ep, 0 - return host, int(port_s) if port_s.isdigit() else 0 - - -def _build_ydb_attrs(driver_config, node_id=None, peer=None): - host, port = _split_endpoint(getattr(driver_config, "endpoint", None)) +def _build_ydb_tracing_attrs(driver_config, node_id=None, peer=None): + host, port = split_endpoint(getattr(driver_config, "endpoint", None)) attrs = { "db.system.name": "ydb", "db.namespace": getattr(driver_config, "database", None) or "", @@ -134,11 +162,15 @@ def create_span(name, attributes=None, kind="internal"): def create_ydb_span(name, driver_config, node_id=None, kind=None, peer=None): - """Create a span pre-filled with standard YDB attributes.""" - if not _registry.is_active(): - return _NOOP_SPAN - attrs = _build_ydb_attrs(driver_config, node_id, peer) - return _registry.create_span(name, attributes=attrs, kind=kind) + """Create telemetry for one user-visible YDB client operation. + + Tracing receives full operation context, including peer/node details. Metrics + receive only the stable labels defined for client operation metrics. + """ + metrics_attrs = _build_ydb_metrics_attrs(driver_config) if is_metrics_enabled() else None + tracing_attrs = _build_ydb_tracing_attrs(driver_config, node_id, peer) + metrics = create_metrics_operation(name, metrics_attrs) + return _TelemetryOperation(_registry.create_span(name, attributes=tracing_attrs, kind=kind), metrics) def set_peer_attributes(span, peer): diff --git a/ydb/opentelemetry/plugin.py b/ydb/opentelemetry/tracing_plugin.py similarity index 88% rename from ydb/opentelemetry/plugin.py rename to ydb/opentelemetry/tracing_plugin.py index 76942789f..59cabde1f 100644 --- a/ydb/opentelemetry/plugin.py +++ b/ydb/opentelemetry/tracing_plugin.py @@ -7,7 +7,7 @@ from ydb import issues from ydb.issues import StatusCode as YdbStatusCode -from ydb.opentelemetry.tracing import _registry +from ydb.opentelemetry.tracing import _registry as _tracing_registry # YDB client transport StatusCode values (401xxx band) -> OTel error.type transport_error. _TRANSPORT_STATUSES = frozenset( @@ -21,7 +21,7 @@ ) _tracer = None -_enabled = False +_tracing_enabled = False _KIND_MAP = { "client": trace.SpanKind.CLIENT, @@ -113,22 +113,22 @@ def _create_span(name, attributes=None, kind=None): def _enable_tracing(tracer=None): - global _enabled, _tracer + global _tracing_enabled, _tracer - if _enabled: + if _tracing_enabled: return _tracer = tracer if tracer is not None else trace.get_tracer("ydb.sdk") - _enabled = True - _registry.set_metadata_hook(_otel_metadata_hook) - _registry.set_create_span(_create_span) + _tracing_enabled = True + _tracing_registry.set_metadata_hook(_otel_metadata_hook) + _tracing_registry.set_create_span(_create_span) def _disable_tracing(): """Clear hooks and tracer; after this, :func:`~ydb.opentelemetry.enable_tracing` may be called again.""" - global _enabled, _tracer + global _tracing_enabled, _tracer - _registry.set_create_span(None) - _registry.set_metadata_hook(None) - _enabled = False + _tracing_registry.set_create_span(None) + _tracing_registry.set_metadata_hook(None) + _tracing_enabled = False _tracer = None diff --git a/ydb/query/pool.py b/ydb/query/pool.py index 44d4d34af..c4b5a24d4 100644 --- a/ydb/query/pool.py +++ b/ydb/query/pool.py @@ -27,6 +27,15 @@ from .. import issues from .. import convert from ..settings import BaseRequestSettings +from ..opentelemetry.metrics import ( + query_session_pool_name, + record_query_session_count, + record_query_session_create_time, + record_query_session_max, + record_query_session_pending_requests, + record_query_session_timeout, + remove_query_session_pool_metrics, +) from .._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public if TYPE_CHECKING: @@ -47,12 +56,14 @@ def __init__( *, query_client_settings: Optional[QueryClientSettings] = None, workers_threads_count: int = 4, + name: Optional[str] = None, ): """ :param driver: A driver instance. :param size: Max size of Session Pool. :param query_client_settings: ydb.QueryClientSettings object to configure QueryService behavior :param workers_threads_count: A number of threads in executor used for ``*_async`` methods + :param name: Optional session pool name for OpenTelemetry metrics. """ self._driver = driver @@ -63,10 +74,17 @@ def __init__( self._should_stop = threading.Event() self._lock = threading.RLock() self._query_client_settings = query_client_settings + driver_config = getattr(driver, "_driver_config", None) + self._metrics_pool_name = query_session_pool_name(name, getattr(driver_config, "endpoint", None)) + record_query_session_max(self._size, self._metrics_pool_name) def _create_new_session(self, timeout: Optional[float]): session = QuerySession(self._driver, settings=self._query_client_settings) + session._metrics_pool_name = self._metrics_pool_name + session._metrics_state = "used" + start_time = time.monotonic() session.create(settings=BaseRequestSettings().with_timeout(timeout)) + record_query_session_create_time(time.monotonic() - start_time, self._metrics_pool_name) logger.debug(f"New session was created for pool. Session id: {session.session_id}") return session @@ -95,17 +113,24 @@ def acquire(self, timeout: Optional[float] = None) -> QuerySession: pass finish = time.monotonic() - timeout = timeout - (finish - start) if timeout is not None else None + timeout = max(0, timeout - (finish - start)) if timeout is not None else None start = time.monotonic() if session is None and self._current_size == self._size: + record_query_session_pending_requests(1, self._metrics_pool_name) try: session = self._queue.get(block=True, timeout=timeout) except queue.Empty: + record_query_session_timeout(self._metrics_pool_name) raise issues.SessionPoolEmpty("Timeout on acquire session") + finally: + record_query_session_pending_requests(-1, self._metrics_pool_name) if session is not None: if session.is_active: + record_query_session_count(-1, self._metrics_pool_name, "idle") + session._metrics_state = "used" + record_query_session_count(1, self._metrics_pool_name, "used") logger.debug(f"Acquired active session from queue: {session.session_id}") return session else: @@ -114,7 +139,7 @@ def acquire(self, timeout: Optional[float] = None) -> QuerySession: logger.debug(f"Session pool is not large enough: {self._current_size} < {self._size}, will create new one.") finish = time.monotonic() - time_left = timeout - (finish - start) if timeout is not None else None + time_left = max(0, timeout - (finish - start)) if timeout is not None else None session = self._create_new_session(time_left) self._current_size += 1 @@ -125,6 +150,9 @@ def acquire(self, timeout: Optional[float] = None) -> QuerySession: def release(self, session: QuerySession) -> None: """Release a session back to Session Pool.""" + record_query_session_count(-1, self._metrics_pool_name, "used") + session._metrics_state = "idle" + record_query_session_count(1, self._metrics_pool_name, "idle") self._queue.put_nowait(session) logger.debug("Session returned to queue: %s", session.session_id) @@ -317,6 +345,7 @@ def stop(self, timeout=None): break logger.debug("All session were deleted.") + remove_query_session_pool_metrics(self._metrics_pool_name) finally: if acquired: self._lock.release() diff --git a/ydb/query/session.py b/ydb/query/session.py index a9c1b4a50..78d229ab2 100644 --- a/ydb/query/session.py +++ b/ydb/query/session.py @@ -19,6 +19,7 @@ from .. import _apis, issues, _utilities from ..opentelemetry.tracing import SpanName, create_ydb_span, set_peer_attributes, span_finish_callback +from ..opentelemetry.metrics import record_query_session_count from ..settings import BaseRequestSettings from ..connection import _RpcState as RpcState, EndpointKey from .._grpc.grpcwrapper import common_utils @@ -94,6 +95,9 @@ class BaseQuerySession(abc.ABC, Generic[DriverT]): _peer: Optional[tuple] = None _closed: bool = False _invalidated: bool = False + _metrics_counted: bool = False + _metrics_pool_name: Optional[str] = None + _metrics_state: str = "used" def __init__(self, driver: DriverT, settings: Optional[base.QueryClientSettings] = None): self._driver = driver @@ -106,6 +110,9 @@ def __init__(self, driver: DriverT, settings: Optional[base.QueryClientSettings] ) self._last_query_stats = None + self._metrics_counted = False + self._metrics_pool_name = None + self._metrics_state = "used" @property def _driver_config(self) -> Optional["DriverConfig"]: @@ -159,6 +166,13 @@ def _check_session_ready_to_use(self) -> None: def _close_session(self, invalidate: bool = False) -> None: if self._closed: return + if self._metrics_counted: + record_query_session_count( + -1, + pool_name=getattr(self, "_metrics_pool_name", None), + state=getattr(self, "_metrics_state", "used"), + ) + self._metrics_counted = False if invalidate: self._invalidated = True self._closed = True @@ -418,10 +432,17 @@ def create(self, settings: Optional[BaseRequestSettings] = None) -> "QuerySessio if self._closed: raise RuntimeError("Session is already closed.") - with create_ydb_span(SpanName.CREATE_SESSION, self._driver_config).attach_context() as span: + with create_ydb_span("ydb.CreateSession", self._driver_config).attach_context() as span: self._create_call(settings=settings) set_peer_attributes(span, self._peer) self._attach() + if not getattr(self, "_metrics_counted", False): + record_query_session_count( + 1, + pool_name=getattr(self, "_metrics_pool_name", None), + state=getattr(self, "_metrics_state", "used"), + ) + self._metrics_counted = True return self diff --git a/ydb/retries.py b/ydb/retries.py index 4b7c137f3..5765d50e7 100644 --- a/ydb/retries.py +++ b/ydb/retries.py @@ -7,6 +7,7 @@ from . import issues from ._errors import check_retriable_error +from .opentelemetry.metrics import record_retry_metrics from .opentelemetry.tracing import SpanName, create_span as _create_span @@ -164,21 +165,28 @@ def retry_operation_sync( **kwargs: Any, ) -> Any: backoff_ms: Optional[int] = None + attempts = 0 + start_time = time.monotonic() @functools.wraps(callee) def traced_callee(*a: Any, **kw: Any) -> Any: + nonlocal attempts + attempts += 1 with _create_span(SpanName.TRY, _try_span_attrs(backoff_ms)): return callee(*a, **kw) - with _create_span(SpanName.RUN_WITH_RETRY): - for next_opt in retry_operation_impl(traced_callee, retry_settings, *args, **kwargs): - if isinstance(next_opt, YdbRetryOperationSleepOpt): - backoff_ms = int(next_opt.timeout * 1000) - if next_opt.timeout > 0: - time.sleep(next_opt.timeout) - else: - return next_opt.result - return None + try: + with _create_span(SpanName.RUN_WITH_RETRY): + for next_opt in retry_operation_impl(traced_callee, retry_settings, *args, **kwargs): + if isinstance(next_opt, YdbRetryOperationSleepOpt): + backoff_ms = int(next_opt.timeout * 1000) + if next_opt.timeout > 0: + time.sleep(next_opt.timeout) + else: + return next_opt.result + return None + finally: + record_retry_metrics(time.monotonic() - start_time, attempts) async def retry_operation_async( # pylint: disable=W1113 @@ -200,20 +208,31 @@ async def retry_operation_async( # pylint: disable=W1113 Returns awaitable result of coroutine. If retries are not succussful exception is raised. """ backoff_ms: Optional[int] = None - with _create_span(SpanName.RUN_WITH_RETRY): - for next_opt in retry_operation_impl(callee, retry_settings, *args, **kwargs): - if isinstance(next_opt, YdbRetryOperationSleepOpt): - backoff_ms = int(next_opt.timeout * 1000) - if next_opt.timeout > 0: - await asyncio.sleep(next_opt.timeout) - else: - with _create_span(SpanName.TRY, _try_span_attrs(backoff_ms)) as try_span: + attempts = 0 + start_time = time.monotonic() + + @functools.wraps(callee) + async def traced_callee(*a: Any, **kw: Any) -> Any: + nonlocal attempts + attempts += 1 + with _create_span(SpanName.TRY, _try_span_attrs(backoff_ms)): + return await callee(*a, **kw) + + try: + with _create_span(SpanName.RUN_WITH_RETRY): + for next_opt in retry_operation_impl(traced_callee, retry_settings, *args, **kwargs): + if isinstance(next_opt, YdbRetryOperationSleepOpt): + backoff_ms = int(next_opt.timeout * 1000) + if next_opt.timeout > 0: + await asyncio.sleep(next_opt.timeout) + else: try: return await next_opt.result except BaseException as e: # pylint: disable=W0703 - try_span.set_error(e) next_opt.set_exception(e) - return None + return None + finally: + record_retry_metrics(time.monotonic() - start_time, attempts) def ydb_retry(