Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions temporalio/bridge/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class OpenTelemetryConfig:
metric_temporality_delta: bool
durations_as_seconds: bool
http: bool
histogram_bucket_overrides: Mapping[str, Sequence[float]] | None = None


@dataclass(frozen=True)
Expand Down
6 changes: 6 additions & 0 deletions temporalio/bridge/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct OpenTelemetryConfig {
metric_temporality_delta: bool,
durations_as_seconds: bool,
http: bool,
histogram_bucket_overrides: Option<HashMap<String, Vec<f64>>>,
}

#[derive(FromPyObject)]
Expand Down Expand Up @@ -357,6 +358,11 @@ impl TryFrom<MetricsConfig> for Arc<dyn CoreMeter> {
} else {
None
})
.maybe_histogram_bucket_overrides(otel_conf.histogram_bucket_overrides.map(
|overrides| temporalio_common::telemetry::HistogramBucketOverrides {
overrides,
},
))
.build();
Ok(Arc::new(build_otlp_metric_exporter(otel_options).map_err(
|err| PyValueError::new_err(format!("Failed building OTel exporter: {err}")),
Expand Down
6 changes: 6 additions & 0 deletions temporalio/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ class OpenTelemetryConfig:
When enabled, the ``url`` should point to the HTTP endpoint
(e.g. ``"http://localhost:4318/v1/metrics"``).
Defaults to ``False`` (gRPC).
histogram_bucket_overrides: Override the default histogram bucket
boundaries for specific metrics. Keys are metric names and
values are sequences of bucket boundaries (e.g.
``{"workflow_task_schedule_to_start_latency": [0.01, 0.05, 0.1, 0.5, 1.0, 5.0]}``).
"""

url: str
Expand All @@ -345,6 +349,7 @@ class OpenTelemetryConfig:
)
durations_as_seconds: bool = False
http: bool = False
histogram_bucket_overrides: Mapping[str, Sequence[float]] | None = None

def _to_bridge_config(self) -> temporalio.bridge.runtime.OpenTelemetryConfig:
return temporalio.bridge.runtime.OpenTelemetryConfig(
Expand All @@ -360,6 +365,7 @@ def _to_bridge_config(self) -> temporalio.bridge.runtime.OpenTelemetryConfig:
),
durations_as_seconds=self.durations_as_seconds,
http=self.http,
histogram_bucket_overrides=self.histogram_bucket_overrides,
)


Expand Down
104 changes: 103 additions & 1 deletion tests/test_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@
from urllib.request import urlopen

import pytest

from temporalio import workflow
from temporalio.client import Client
from temporalio.runtime import (
LogForwardingConfig,
LoggingConfig,
OpenTelemetryConfig,
PrometheusConfig,
Runtime,
TelemetryConfig,
TelemetryFilter,
_RuntimeRef,
)
from temporalio.worker import Worker

from tests.helpers import (
LogHandler,
assert_eq_eventually,
Expand Down Expand Up @@ -269,6 +270,107 @@ async def check_metrics() -> None:
await assert_eventually(check_metrics)


async def test_opentelemetry_histogram_bucket_overrides(client: Client):
# Set up an OpenTelemetry configuration with custom histogram bucket overrides
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer

from opentelemetry.proto.collector.metrics.v1.metrics_service_pb2 import (
ExportMetricsServiceRequest,
ExportMetricsServiceResponse,
)

special_value = float(1234.5678)
histogram_overrides = {
"temporal_long_request_latency": [special_value / 2, special_value],
"custom_histogram": [special_value / 2, special_value],
}

captured: dict[str, list[float]] = {}
lock = threading.Lock()

class Handler(BaseHTTPRequestHandler):
def log_message(self, *_args):
pass # silence default stderr logging

def do_POST(self):
length = int(self.headers.get("Content-Length", "0"))
req = ExportMetricsServiceRequest()
req.ParseFromString(self.rfile.read(length))
with lock:
for rm in req.resource_metrics:
for sm in rm.scope_metrics:
for m in sm.metrics:
if m.HasField("histogram"):
for dp in m.histogram.data_points:
captured[m.name] = list(dp.explicit_bounds)
body = ExportMetricsServiceResponse().SerializeToString()
self.send_response(200)
self.send_header("Content-Type", "application/x-protobuf")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)

otel_port = find_free_port()
server = HTTPServer(("127.0.0.1", otel_port), Handler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
try:
runtime = Runtime(
telemetry=TelemetryConfig(
metrics=OpenTelemetryConfig(
url=f"http://127.0.0.1:{otel_port}/v1/metrics",
http=True,
metric_periodicity=timedelta(milliseconds=100),
durations_as_seconds=False,
histogram_bucket_overrides=histogram_overrides,
),
),
)

# Create and record to a custom histogram
custom_histogram = runtime.metric_meter.create_histogram(
"custom_histogram", "Custom histogram", "ms"
)
custom_histogram.record(600)

# Run a workflow so built-in histograms (e.g. temporal_long_request_latency)
# are recorded and exported.
client_with_overrides = await Client.connect(
client.service_client.config.target_host,
namespace=client.namespace,
runtime=runtime,
)
task_queue = f"task-queue-{uuid.uuid4()}"
async with Worker(
client_with_overrides,
task_queue=task_queue,
workflows=[HelloWorkflow],
):
assert "Hello, World!" == await client_with_overrides.execute_workflow(
HelloWorkflow.run,
"World",
id=f"workflow-{uuid.uuid4()}",
task_queue=task_queue,
)

async def check_metrics() -> None:
with lock:
snapshot = dict(captured)
for key, buckets in histogram_overrides.items():
assert (
key in snapshot
), f"Missing {key} in captured metrics: {list(snapshot)}"
assert snapshot[key] == pytest.approx(
buckets
), f"Bucket mismatch for {key}: got {snapshot[key]} expected {buckets}"

await assert_eventually(check_metrics)
finally:
server.shutdown()
server.server_close()


def test_runtime_options_invalid_heartbeat() -> None:
with pytest.raises(ValueError):
Runtime(
Expand Down