From 96852c23523c569df1a7140962d6c253272b36ea Mon Sep 17 00:00:00 2001 From: Lorenzo Ronzani Date: Fri, 22 May 2026 08:02:24 +0000 Subject: [PATCH 01/11] Implementing first version of metrics for gRPC server --- .../instrumentation/grpc/__init__.py | 15 +- .../instrumentation/grpc/_server.py | 70 +++++- .../tests/test_server_interceptor_metrics.py | 214 ++++++++++++++++++ 3 files changed, 296 insertions(+), 3 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_metrics.py diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index 3b0ef08703..85dfad241a 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -285,6 +285,7 @@ async def serve(): from opentelemetry.instrumentation.grpc.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.metrics import get_meter # pylint:disable=import-outside-toplevel # pylint:disable=import-self @@ -590,7 +591,7 @@ def client_interceptor( ) -def server_interceptor(tracer_provider=None, filter_=None): +def server_interceptor(tracer_provider=None, filter_=None, meter_provider=None): """Create a gRPC server interceptor. Args: @@ -600,6 +601,8 @@ def server_interceptor(tracer_provider=None, filter_=None): matches the condition. Default is None and intercept all requests. + meter_provider: The meter provider to use for metrics. + Returns: A service-side interceptor object. """ @@ -612,7 +615,15 @@ def server_interceptor(tracer_provider=None, filter_=None): schema_url="https://opentelemetry.io/schemas/1.11.0", ) - return _server.OpenTelemetryServerInterceptor(tracer, filter_=filter_) + meter = get_meter( + __name__, + __version__, + meter_provider, + ) + + return _server.OpenTelemetryServerInterceptor( + tracer, filter_=filter_, meter=meter + ) def aio_client_interceptors( diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index e55e6c0994..4c41799602 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -11,6 +11,7 @@ """ import logging +import time from contextlib import contextmanager from urllib.parse import unquote @@ -19,6 +20,9 @@ from opentelemetry import trace from opentelemetry.context import attach, detach from opentelemetry.propagate import extract +from opentelemetry.semconv._incubating.attributes.error_attributes import ( + ERROR_TYPE, +) from opentelemetry.semconv._incubating.attributes.net_attributes import ( NET_PEER_IP, NET_PEER_NAME, @@ -27,14 +31,34 @@ from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( RPC_GRPC_STATUS_CODE, RPC_METHOD, + RPC_RESPONSE_STATUS_CODE, RPC_SERVICE, RPC_SYSTEM, + RPC_SYSTEM_NAME, + RpcSystemNameValues, +) +from opentelemetry.semconv._incubating.metrics.rpc_metrics import ( + RPC_SERVER_CALL_DURATION, ) from ._utilities import _server_status logger = logging.getLogger(__name__) +_RPC_DURATION_BUCKET_BOUNDARIES = ( + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, + 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10, +) + + +def _create_duration_histogram(meter): + return meter.create_histogram( + name=RPC_SERVER_CALL_DURATION, + description="Measures the duration of an incoming Remote Procedure Call (RPC).", + unit="s", + explicit_bucket_boundaries_advisory=_RPC_DURATION_BUCKET_BOUNDARIES, + ) + # wrap an RPC call # see https://github.com/grpc/grpc/issues/18191 @@ -184,9 +208,12 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor): """ - def __init__(self, tracer, filter_=None): + def __init__(self, tracer, filter_=None, meter=None): self._tracer = tracer self._filter = filter_ + self._duration_histogram = ( + _create_duration_histogram(meter) if meter else None + ) @contextmanager def _set_remote_context(self, servicer_context): @@ -267,6 +294,29 @@ def _start_span( set_status_on_exception=set_status_on_exception, ) + def _build_metric_attributes(self, handler_call_details, status_code): + method = handler_call_details.method + full_method = method.lstrip("/") if method else "_OTHER" + attrs = { + RPC_SYSTEM_NAME: RpcSystemNameValues.GRPC.value, + RPC_METHOD: full_method, + RPC_RESPONSE_STATUS_CODE: status_code.name, + } + if status_code != grpc.StatusCode.OK: + attrs[ERROR_TYPE] = status_code.name + return attrs + + def _record_duration( + self, handler_call_details, start_time, status_code + ): + if self._duration_histogram is None: + return + elapsed = time.perf_counter() - start_time + attrs = self._build_metric_attributes( + handler_call_details, status_code + ) + self._duration_histogram.record(elapsed, attributes=attrs) + def intercept_service(self, continuation, handler_call_details): if self._filter is not None and not self._filter(handler_call_details): return continuation(handler_call_details) @@ -282,6 +332,8 @@ def telemetry_interceptor(request_or_iterator, context): context, ) + start_time = time.perf_counter() + with self._set_remote_context(context): with self._start_span( handler_call_details, @@ -304,6 +356,13 @@ def telemetry_interceptor(request_or_iterator, context): span.record_exception(error) raise error + finally: + self._record_duration( + handler_call_details, + start_time, + context._code, + ) + return telemetry_interceptor return _wrap_rpc_behavior( @@ -316,6 +375,8 @@ def telemetry_interceptor(request_or_iterator, context): def _intercept_server_stream( self, behavior, handler_call_details, request_or_iterator, context ): + start_time = time.perf_counter() + with self._set_remote_context(context): with self._start_span( handler_call_details, context, set_status_on_exception=False @@ -330,3 +391,10 @@ def _intercept_server_stream( if type(error) != Exception: # noqa: E721 span.record_exception(error) raise error + + finally: + self._record_duration( + handler_call_details, + start_time, + context._code, + ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_metrics.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_metrics.py new file mode 100644 index 0000000000..ed675dbc8f --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_metrics.py @@ -0,0 +1,214 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +import contextlib +from concurrent import futures + +import grpc + +from opentelemetry.instrumentation.grpc import server_interceptor +from opentelemetry.semconv._incubating.attributes.error_attributes import ( + ERROR_TYPE, +) +from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( + RPC_METHOD, + RPC_RESPONSE_STATUS_CODE, + RPC_SYSTEM_NAME, + RpcSystemNameValues, +) +from opentelemetry.semconv._incubating.metrics.rpc_metrics import ( + RPC_SERVER_CALL_DURATION, +) +from opentelemetry.test.test_base import TestBase + +from .protobuf.test_server_pb2 import Request, Response +from .protobuf.test_server_pb2_grpc import ( + GRPCTestServerServicer, + add_GRPCTestServerServicer_to_server, +) + + +class Servicer(GRPCTestServerServicer): + def SimpleMethod(self, request, context): + return Response( + server_id=request.client_id, + response_data=request.request_data, + ) + + +class TestServerInterceptorMetrics(TestBase): + @contextlib.contextmanager + def server(self, max_workers=1, interceptors=None): + with futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=interceptors or [], + ) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + yield server, channel + + def test_unary_call_records_duration_metric(self): + """A unary server RPC produces an rpc.server.call.duration histogram.""" + interceptor = server_interceptor( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + with self.server( + max_workers=1, interceptors=[interceptor] + ) as (server, channel): + add_GRPCTestServerServicer_to_server(Servicer(), server) + + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + channel.unary_unary(rpc_call)(msg) + finally: + server.stop(None) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_SERVER_CALL_DURATION), + None, + ) + + self.assertIsNotNone( + duration_metric, + f"Expected metric '{RPC_SERVER_CALL_DURATION}' not found. " + f"Got: {[m.name for m in metrics]}", + ) + self.assertEqual(duration_metric.unit, "s") + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + self.assertEqual( + point.explicit_bounds, + (0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10), + ) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual(attrs[RPC_METHOD], "GRPCTestServer/SimpleMethod") + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "OK") + self.assertNotIn(ERROR_TYPE, attrs) + + def test_error_call_records_status_in_metric(self): + """Server metric records the gRPC error status code on abort.""" + + class ErrorServicer(GRPCTestServerServicer): + def SimpleMethod(self, request, context): + context.abort(grpc.StatusCode.INTERNAL, "test failure") + + interceptor = server_interceptor( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + with self.server( + max_workers=1, interceptors=[interceptor] + ) as (server, channel): + add_GRPCTestServerServicer_to_server(ErrorServicer(), server) + + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + with self.assertRaises(Exception): + channel.unary_unary(rpc_call)(msg) + finally: + server.stop(None) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_SERVER_CALL_DURATION), + None, + ) + + self.assertIsNotNone(duration_metric) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual(attrs[RPC_METHOD], "GRPCTestServer/SimpleMethod") + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "INTERNAL") + self.assertEqual(attrs[ERROR_TYPE], "INTERNAL") + + def test_streaming_call_records_duration_metric(self): + """Server interceptor records metric on a streaming RPC.""" + + class StreamingServicer(GRPCTestServerServicer): + def ServerStreamingMethod(self, request, context): + for data in ("one", "two", "three"): + yield Response( + server_id=request.client_id, + response_data=data, + ) + + interceptor = server_interceptor( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + with self.server( + max_workers=1, interceptors=[interceptor] + ) as (server, channel): + add_GRPCTestServerServicer_to_server( + StreamingServicer(), server + ) + + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + list(channel.unary_stream(rpc_call)(msg)) + finally: + server.stop(None) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_SERVER_CALL_DURATION), + None, + ) + + self.assertIsNotNone( + duration_metric, + f"Expected metric '{RPC_SERVER_CALL_DURATION}' not found " + f"for streaming RPC. Got: {[m.name for m in metrics]}", + ) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual( + attrs[RPC_METHOD], "GRPCTestServer/ServerStreamingMethod" + ) + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "OK") From f1c46a00c0662e78600e6681f45b6567561d1aa7 Mon Sep 17 00:00:00 2001 From: Lorenzo Ronzani Date: Fri, 22 May 2026 08:49:48 +0000 Subject: [PATCH 02/11] created a constant --- .../src/opentelemetry/instrumentation/grpc/_server.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 4c41799602..686d5e2f00 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -45,6 +45,8 @@ logger = logging.getLogger(__name__) +_DEFAULT_RPC_METHOD = "_OTHER" + _RPC_DURATION_BUCKET_BOUNDARIES = ( 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10, @@ -296,7 +298,7 @@ def _start_span( def _build_metric_attributes(self, handler_call_details, status_code): method = handler_call_details.method - full_method = method.lstrip("/") if method else "_OTHER" + full_method = method.lstrip("/") if method else _DEFAULT_RPC_METHOD attrs = { RPC_SYSTEM_NAME: RpcSystemNameValues.GRPC.value, RPC_METHOD: full_method, From 543acb5a7083b2318100b003d149b85b14249f64 Mon Sep 17 00:00:00 2001 From: Lorenzo Ronzani Date: Fri, 22 May 2026 09:05:38 +0000 Subject: [PATCH 03/11] Implemented metrics into client --- .../instrumentation/grpc/__init__.py | 33 +++- .../instrumentation/grpc/_client.py | 91 ++++++++++- .../tests/test_client_interceptor_metrics.py | 144 ++++++++++++++++++ 3 files changed, 258 insertions(+), 10 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_metrics.py diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index 85dfad241a..7f25290bd7 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -458,6 +458,8 @@ def instrumentation_dependencies(self) -> Collection[str]: def _instrument(self, **kwargs): self._request_hook = kwargs.get("request_hook") self._response_hook = kwargs.get("response_hook") + self._tracer_provider = kwargs.get("tracer_provider") + self._meter_provider = kwargs.get("meter_provider") for ctype in self._which_channel(kwargs): _wrap( "grpc", @@ -471,16 +473,16 @@ def _uninstrument(self, **kwargs): def wrapper_fn(self, original_func, instance, args, kwargs): channel = original_func(*args, **kwargs) - tracer_provider = kwargs.get("tracer_provider") - request_hook = self._request_hook - response_hook = self._response_hook + target = args[0] if args else None return intercept_channel( channel, client_interceptor( - tracer_provider=tracer_provider, + tracer_provider=self._tracer_provider, filter_=self._filter, - request_hook=request_hook, - response_hook=response_hook, + request_hook=self._request_hook, + response_hook=self._response_hook, + meter_provider=self._meter_provider, + target=target, ), ) @@ -560,7 +562,12 @@ def _uninstrument(self, **kwargs): def client_interceptor( - tracer_provider=None, filter_=None, request_hook=None, response_hook=None + tracer_provider=None, + filter_=None, + request_hook=None, + response_hook=None, + meter_provider=None, + target=None, ): """Create a gRPC client channel interceptor. @@ -571,6 +578,10 @@ def client_interceptor( matches the condition. Default is None and intercept all requests. + meter_provider: The meter provider to use for metrics. + + target: The target address of the channel (e.g. "host:port"). + Returns: An invocation-side interceptor object. """ @@ -583,11 +594,19 @@ def client_interceptor( schema_url="https://opentelemetry.io/schemas/1.11.0", ) + meter = get_meter( + __name__, + __version__, + meter_provider, + ) + return _client.OpenTelemetryClientInterceptor( tracer, filter_=filter_, request_hook=request_hook, response_hook=response_hook, + meter=meter, + target=target, ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index 832a5d5835..d796e7fe5f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -9,6 +9,7 @@ """Implementation of the invocation-side open-telemetry interceptor.""" import logging +import time from collections import OrderedDict from typing import Callable, MutableMapping @@ -20,16 +21,57 @@ from opentelemetry.instrumentation.utils import is_instrumentation_enabled from opentelemetry.propagate import inject from opentelemetry.propagators.textmap import Setter +from opentelemetry.semconv._incubating.attributes.error_attributes import ( + ERROR_TYPE, +) from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( RPC_GRPC_STATUS_CODE, RPC_METHOD, + RPC_RESPONSE_STATUS_CODE, RPC_SERVICE, RPC_SYSTEM, + RPC_SYSTEM_NAME, + RpcSystemNameValues, +) +from opentelemetry.semconv._incubating.attributes.server_attributes import ( + SERVER_ADDRESS, + SERVER_PORT, +) +from opentelemetry.semconv._incubating.metrics.rpc_metrics import ( + RPC_CLIENT_CALL_DURATION, ) from opentelemetry.trace.status import Status, StatusCode logger = logging.getLogger(__name__) +_DEFAULT_RPC_METHOD = "_OTHER" + +_RPC_DURATION_BUCKET_BOUNDARIES = ( + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, + 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10, +) + + +def _create_duration_histogram(meter): + return meter.create_histogram( + name=RPC_CLIENT_CALL_DURATION, + description="Measures the duration of an outgoing Remote Procedure Call (RPC).", + unit="s", + explicit_bucket_boundaries_advisory=_RPC_DURATION_BUCKET_BOUNDARIES, + ) + + +def _parse_target(target): + if not target: + return None, None + host, _, port = target.rpartition(":") + if host and port: + try: + return host, int(port) + except ValueError: + pass + return target, None + class _CarrierSetter(Setter): """We use a custom setter in order to be able to lower case @@ -77,12 +119,17 @@ class OpenTelemetryClientInterceptor( grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor ): def __init__( - self, tracer, filter_=None, request_hook=None, response_hook=None + self, tracer, filter_=None, request_hook=None, response_hook=None, + meter=None, target=None, ): self._tracer = tracer self._filter = filter_ self._request_hook = request_hook self._response_hook = response_hook + self._duration_histogram = ( + _create_duration_histogram(meter) if meter else None + ) + self._server_address, self._server_port = _parse_target(target) def _start_span(self, method, **kwargs): service, meth = method.lstrip("/").split("/", 1) @@ -122,6 +169,28 @@ def _trace_result(self, span, rpc_info, result): span.end() return result + def _build_metric_attributes(self, method, status_code): + full_method = method.lstrip("/") if method else _DEFAULT_RPC_METHOD + attrs = { + RPC_SYSTEM_NAME: RpcSystemNameValues.GRPC.value, + RPC_METHOD: full_method, + RPC_RESPONSE_STATUS_CODE: status_code.name, + } + if self._server_address: + attrs[SERVER_ADDRESS] = self._server_address + if self._server_port is not None: + attrs[SERVER_PORT] = self._server_port + if status_code != grpc.StatusCode.OK: + attrs[ERROR_TYPE] = status_code.name + return attrs + + def _record_duration(self, method, start_time, status_code): + if self._duration_histogram is None: + return + elapsed = time.perf_counter() - start_time + attrs = self._build_metric_attributes(method, status_code) + self._duration_histogram.record(elapsed, attributes=attrs) + def _intercept(self, request, metadata, client_info, invoker): if not is_instrumentation_enabled(): return invoker(request, metadata) @@ -130,6 +199,10 @@ def _intercept(self, request, metadata, client_info, invoker): mutable_metadata = OrderedDict() else: mutable_metadata = OrderedDict(metadata) + + start_time = time.perf_counter() + status_code = grpc.StatusCode.OK + with self._start_span( client_info.full_method, end_on_exit=False, @@ -152,9 +225,10 @@ def _intercept(self, request, metadata, client_info, invoker): result = invoker(request, metadata) except Exception as exc: if isinstance(exc, grpc.RpcError): + status_code = exc.code() span.set_attribute( RPC_GRPC_STATUS_CODE, - exc.code().value[0], + status_code.value[0], ) span.set_status( Status( @@ -167,6 +241,9 @@ def _intercept(self, request, metadata, client_info, invoker): finally: if result is None: span.end() + self._record_duration( + client_info.full_method, start_time, status_code + ) return self._trace_result(span, rpc_info, result) def _call_request_hook(self, span, request): @@ -195,6 +272,9 @@ def _intercept_server_stream( else: mutable_metadata = OrderedDict(metadata) + start_time = time.perf_counter() + status_code = grpc.StatusCode.OK + with self._start_span(client_info.full_method) as span: inject(mutable_metadata, setter=_carrier_setter) metadata = tuple(mutable_metadata.items()) @@ -210,9 +290,14 @@ def _intercept_server_stream( try: yield from invoker(request_or_iterator, metadata) except grpc.RpcError as err: + status_code = err.code() span.set_status(Status(StatusCode.ERROR)) - span.set_attribute(RPC_GRPC_STATUS_CODE, err.code().value[0]) + span.set_attribute(RPC_GRPC_STATUS_CODE, status_code.value[0]) raise err + finally: + self._record_duration( + client_info.full_method, start_time, status_code + ) def intercept_stream( self, request_or_iterator, metadata, client_info, invoker diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_metrics.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_metrics.py new file mode 100644 index 0000000000..c7f6403fe0 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_metrics.py @@ -0,0 +1,144 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +import grpc + +from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient +from opentelemetry.semconv._incubating.attributes.error_attributes import ( + ERROR_TYPE, +) +from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( + RPC_METHOD, + RPC_RESPONSE_STATUS_CODE, + RPC_SYSTEM_NAME, + RpcSystemNameValues, +) +from opentelemetry.semconv._incubating.attributes.server_attributes import ( + SERVER_ADDRESS, + SERVER_PORT, +) +from opentelemetry.semconv._incubating.metrics.rpc_metrics import ( + RPC_CLIENT_CALL_DURATION, +) +from opentelemetry.test.test_base import TestBase + +from ._client import server_streaming_method, simple_method +from ._server import create_test_server +from .protobuf import test_server_pb2_grpc + + +class TestClientInterceptorMetrics(TestBase): + def setUp(self): + super().setUp() + GrpcInstrumentorClient().instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + self.server = create_test_server(25565) + self.server.start() + self.channel = grpc.insecure_channel("localhost:25565") + self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) + + def tearDown(self): + super().tearDown() + GrpcInstrumentorClient().uninstrument() + self.server.stop(None) + self.channel.close() + + def test_unary_call_records_duration_metric(self): + """A unary client RPC produces an rpc.client.call.duration histogram.""" + simple_method(self._stub) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_CLIENT_CALL_DURATION), + None, + ) + + self.assertIsNotNone( + duration_metric, + f"Expected metric '{RPC_CLIENT_CALL_DURATION}' not found. " + f"Got: {[m.name for m in metrics]}", + ) + self.assertEqual(duration_metric.unit, "s") + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + self.assertEqual( + point.explicit_bounds, + (0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10), + ) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual(attrs[RPC_METHOD], "GRPCTestServer/SimpleMethod") + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "OK") + self.assertEqual(attrs[SERVER_ADDRESS], "localhost") + self.assertEqual(attrs[SERVER_PORT], 25565) + self.assertNotIn(ERROR_TYPE, attrs) + + def test_error_call_records_status_in_metric(self): + """Client metric records the gRPC error status code on failure.""" + with self.assertRaises(grpc.RpcError): + simple_method(self._stub, error=True) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_CLIENT_CALL_DURATION), + None, + ) + + self.assertIsNotNone(duration_metric) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual(attrs[RPC_METHOD], "GRPCTestServer/SimpleMethod") + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "INVALID_ARGUMENT") + self.assertEqual(attrs[ERROR_TYPE], "INVALID_ARGUMENT") + + def test_server_streaming_records_duration_metric(self): + """Client metric is recorded for a server-streaming RPC.""" + server_streaming_method(self._stub) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_CLIENT_CALL_DURATION), + None, + ) + + self.assertIsNotNone( + duration_metric, + f"Expected metric '{RPC_CLIENT_CALL_DURATION}' not found " + f"for streaming RPC. Got: {[m.name for m in metrics]}", + ) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual( + attrs[RPC_METHOD], "GRPCTestServer/ServerStreamingMethod" + ) + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "OK") From e8a48c99f95e7d3b609078879f38dcf421103d19 Mon Sep 17 00:00:00 2001 From: Lorenzo Ronzani Date: Fri, 22 May 2026 09:19:58 +0000 Subject: [PATCH 04/11] Added aio clients and servers --- .../instrumentation/grpc/__init__.py | 104 +++++----- .../instrumentation/grpc/_aio_client.py | 65 ++++-- .../instrumentation/grpc/_aio_server.py | 20 ++ .../instrumentation/grpc/_client.py | 2 + .../test_aio_client_interceptor_metrics.py | 155 ++++++++++++++ .../test_aio_server_interceptor_metrics.py | 190 ++++++++++++++++++ 6 files changed, 473 insertions(+), 63 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_metrics.py create mode 100644 instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_metrics.py diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index 7f25290bd7..d238353246 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -514,25 +514,21 @@ def __init__(self, filter_=None): def instrumentation_dependencies(self) -> Collection[str]: return _instruments - def _add_interceptors(self, tracer_provider, kwargs): + def _add_interceptors(self, tracer_provider, meter_provider, target, kwargs): + interceptors = aio_client_interceptors( + tracer_provider=tracer_provider, + filter_=self._filter, + request_hook=self._request_hook, + response_hook=self._response_hook, + meter_provider=meter_provider, + target=target, + ) if "interceptors" in kwargs and kwargs["interceptors"]: - kwargs["interceptors"] = list(kwargs["interceptors"]) - kwargs["interceptors"] = ( - aio_client_interceptors( - tracer_provider=tracer_provider, - filter_=self._filter, - request_hook=self._request_hook, - response_hook=self._response_hook, - ) - + kwargs["interceptors"] + kwargs["interceptors"] = interceptors + list( + kwargs["interceptors"] ) else: - kwargs["interceptors"] = aio_client_interceptors( - tracer_provider=tracer_provider, - filter_=self._filter, - request_hook=self._request_hook, - response_hook=self._response_hook, - ) + kwargs["interceptors"] = interceptors return kwargs @@ -542,15 +538,20 @@ def _instrument(self, **kwargs): self._request_hook = kwargs.get("request_hook") self._response_hook = kwargs.get("response_hook") tracer_provider = kwargs.get("tracer_provider") + meter_provider = kwargs.get("meter_provider") def insecure(*args, **kwargs): - kwargs = self._add_interceptors(tracer_provider, kwargs) - + target = args[0] if args else None + kwargs = self._add_interceptors( + tracer_provider, meter_provider, target, kwargs + ) return self._original_insecure(*args, **kwargs) def secure(*args, **kwargs): - kwargs = self._add_interceptors(tracer_provider, kwargs) - + target = args[0] if args else None + kwargs = self._add_interceptors( + tracer_provider, meter_provider, target, kwargs + ) return self._original_secure(*args, **kwargs) grpc.aio.insecure_channel = insecure @@ -646,13 +647,22 @@ def server_interceptor(tracer_provider=None, filter_=None, meter_provider=None): def aio_client_interceptors( - tracer_provider=None, filter_=None, request_hook=None, response_hook=None + tracer_provider=None, + filter_=None, + request_hook=None, + response_hook=None, + meter_provider=None, + target=None, ): """Create a gRPC client channel interceptor. Args: tracer: The tracer to use to create client-side spans. + meter_provider: The meter provider to use for metrics. + + target: The target address of the channel (e.g. "host:port"). + Returns: An invocation-side interceptor object. """ @@ -665,40 +675,38 @@ def aio_client_interceptors( schema_url="https://opentelemetry.io/schemas/1.11.0", ) + meter = get_meter( + __name__, + __version__, + meter_provider, + ) + + common_kwargs = { + "filter_": filter_, + "request_hook": request_hook, + "response_hook": response_hook, + "meter": meter, + "target": target, + } + return [ - _aio_client.UnaryUnaryAioClientInterceptor( - tracer, - filter_=filter_, - request_hook=request_hook, - response_hook=response_hook, - ), - _aio_client.UnaryStreamAioClientInterceptor( - tracer, - filter_=filter_, - request_hook=request_hook, - response_hook=response_hook, - ), - _aio_client.StreamUnaryAioClientInterceptor( - tracer, - filter_=filter_, - request_hook=request_hook, - response_hook=response_hook, - ), + _aio_client.UnaryUnaryAioClientInterceptor(tracer, **common_kwargs), + _aio_client.UnaryStreamAioClientInterceptor(tracer, **common_kwargs), + _aio_client.StreamUnaryAioClientInterceptor(tracer, **common_kwargs), _aio_client.StreamStreamAioClientInterceptor( - tracer, - filter_=filter_, - request_hook=request_hook, - response_hook=response_hook, + tracer, **common_kwargs ), ] -def aio_server_interceptor(tracer_provider=None, filter_=None): +def aio_server_interceptor(tracer_provider=None, filter_=None, meter_provider=None): """Create a gRPC aio server interceptor. Args: tracer: The tracer to use to create server-side spans. + meter_provider: The meter provider to use for metrics. + Returns: A service-side interceptor object. """ @@ -711,8 +719,14 @@ def aio_server_interceptor(tracer_provider=None, filter_=None): schema_url="https://opentelemetry.io/schemas/1.11.0", ) + meter = get_meter( + __name__, + __version__, + meter_provider, + ) + return _aio_server.OpenTelemetryAioServerInterceptor( - tracer, filter_=filter_ + tracer, filter_=filter_, meter=meter ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py index cd08d8300a..e556834c59 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py @@ -3,6 +3,7 @@ import functools import logging +import time import grpc from grpc.aio import ClientCallDetails, Metadata @@ -92,7 +93,10 @@ def _start_interceptor_span(self, method): set_status_on_exception=False, ) - async def _wrap_unary_response(self, continuation, span): + async def _wrap_unary_response( + self, continuation, span, method=None, start_time=None + ): + status_code = grpc.StatusCode.OK try: call = await continuation() @@ -103,6 +107,8 @@ async def _wrap_unary_response(self, continuation, span): code = await call.code() details = await call.details() + status_code = code + call.add_done_callback( _unary_done_callback( span, code, details, self._call_response_hook @@ -111,20 +117,31 @@ async def _wrap_unary_response(self, continuation, span): return call except grpc.aio.AioRpcError as exc: + status_code = exc.code() self.add_error_details_to_span(span, exc) raise exc + finally: + if start_time is not None: + self._record_duration(method, start_time, status_code) - async def _wrap_stream_response(self, span, call): + async def _wrap_stream_response( + self, span, call, method=None, start_time=None + ): + status_code = grpc.StatusCode.OK try: async for response in call: if self._response_hook: self._call_response_hook(span, response) yield response except Exception as exc: + if isinstance(exc, grpc.aio.AioRpcError): + status_code = exc.code() self.add_error_details_to_span(span, exc) raise exc finally: span.end() + if start_time is not None: + self._record_duration(method, start_time, status_code) def tracing_skipped(self, client_call_details): return ( @@ -146,9 +163,10 @@ async def intercept_unary_unary( if self.tracing_skipped(client_call_details): return await continuation(client_call_details, request) - with self._start_interceptor_span( - client_call_details.method, - ) as span: + start_time = time.perf_counter() + method = client_call_details.method + + with self._start_interceptor_span(method) as span: new_details = self.propagate_trace_in_details(client_call_details) if self._request_hook: @@ -158,7 +176,8 @@ async def intercept_unary_unary( continuation, new_details, request ) return await self._wrap_unary_response( - continuation_with_args, span + continuation_with_args, span, + method=method, start_time=start_time, ) @@ -172,15 +191,19 @@ async def intercept_unary_stream( if self.tracing_skipped(client_call_details): return await continuation(client_call_details, request) - with self._start_interceptor_span( - client_call_details.method, - ) as span: + start_time = time.perf_counter() + method = client_call_details.method + + with self._start_interceptor_span(method) as span: new_details = self.propagate_trace_in_details(client_call_details) resp = await continuation(new_details, request) if self._request_hook: self._call_request_hook(span, request) - return self._wrap_stream_response(span, resp) + return self._wrap_stream_response( + span, resp, + method=method, start_time=start_time, + ) class StreamUnaryAioClientInterceptor( @@ -193,16 +216,18 @@ async def intercept_stream_unary( if self.tracing_skipped(client_call_details): return await continuation(client_call_details, request_iterator) - with self._start_interceptor_span( - client_call_details.method, - ) as span: + start_time = time.perf_counter() + method = client_call_details.method + + with self._start_interceptor_span(method) as span: new_details = self.propagate_trace_in_details(client_call_details) continuation_with_args = functools.partial( continuation, new_details, request_iterator ) return await self._wrap_unary_response( - continuation_with_args, span + continuation_with_args, span, + method=method, start_time=start_time, ) @@ -216,11 +241,15 @@ async def intercept_stream_stream( if self.tracing_skipped(client_call_details): return await continuation(client_call_details, request_iterator) - with self._start_interceptor_span( - client_call_details.method, - ) as span: + start_time = time.perf_counter() + method = client_call_details.method + + with self._start_interceptor_span(method) as span: new_details = self.propagate_trace_in_details(client_call_details) resp = await continuation(new_details, request_iterator) - return self._wrap_stream_response(span, resp) + return self._wrap_stream_response( + span, resp, + method=method, start_time=start_time, + ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py index c7e073a4bc..4907916f3c 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py @@ -1,6 +1,8 @@ # Copyright The OpenTelemetry Authors # SPDX-License-Identifier: Apache-2.0 +import time + import grpc import grpc.aio @@ -93,6 +95,8 @@ def telemetry_wrapper(behavior, request_streaming, response_streaming): def _intercept_aio_server_unary(self, behavior, handler_call_details): async def _unary_interceptor(request_or_iterator, context): + start_time = time.perf_counter() + with self._set_remote_context(context): with self._start_span( handler_call_details, @@ -115,10 +119,19 @@ async def _unary_interceptor(request_or_iterator, context): span.record_exception(error) raise error + finally: + self._record_duration( + handler_call_details, + start_time, + context._self_code, + ) + return _unary_interceptor def _intercept_aio_server_stream(self, behavior, handler_call_details): async def _stream_interceptor(request_or_iterator, context): + start_time = time.perf_counter() + with self._set_remote_context(context): with self._start_span( handler_call_details, @@ -139,4 +152,11 @@ async def _stream_interceptor(request_or_iterator, context): span.record_exception(error) raise error + finally: + self._record_duration( + handler_call_details, + start_time, + context._self_code, + ) + return _stream_interceptor diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index d796e7fe5f..7b752a0ecb 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -170,6 +170,8 @@ def _trace_result(self, span, rpc_info, result): return result def _build_metric_attributes(self, method, status_code): + if isinstance(method, bytes): + method = method.decode() full_method = method.lstrip("/") if method else _DEFAULT_RPC_METHOD attrs = { RPC_SYSTEM_NAME: RpcSystemNameValues.GRPC.value, diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_metrics.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_metrics.py new file mode 100644 index 0000000000..83d21958b9 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_metrics.py @@ -0,0 +1,155 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +from unittest import IsolatedAsyncioTestCase + +import grpc + +from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorClient +from opentelemetry.semconv._incubating.attributes.error_attributes import ( + ERROR_TYPE, +) +from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( + RPC_METHOD, + RPC_RESPONSE_STATUS_CODE, + RPC_SYSTEM_NAME, + RpcSystemNameValues, +) +from opentelemetry.semconv._incubating.attributes.server_attributes import ( + SERVER_ADDRESS, + SERVER_PORT, +) +from opentelemetry.semconv._incubating.metrics.rpc_metrics import ( + RPC_CLIENT_CALL_DURATION, +) +from opentelemetry.test.test_base import TestBase + +from ._aio_client import server_streaming_method, simple_method +from ._server import create_test_server +from .protobuf import test_server_pb2_grpc + + +class TestAioClientInterceptorMetrics(TestBase, IsolatedAsyncioTestCase): + def setUp(self): + super().setUp() + GrpcAioInstrumentorClient().instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + self.server = create_test_server(25565) + self.server.start() + + def tearDown(self): + super().tearDown() + GrpcAioInstrumentorClient().uninstrument() + self.server.stop(None) + + async def test_unary_call_records_duration_metric(self): + """Aio client RPC produces an rpc.client.call.duration histogram.""" + channel = grpc.aio.insecure_channel("localhost:25565") + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + + try: + await simple_method(stub) + finally: + await channel.close() + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_CLIENT_CALL_DURATION), + None, + ) + + self.assertIsNotNone( + duration_metric, + f"Expected metric '{RPC_CLIENT_CALL_DURATION}' not found. " + f"Got: {[m.name for m in metrics]}", + ) + self.assertEqual(duration_metric.unit, "s") + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual(attrs[RPC_METHOD], "GRPCTestServer/SimpleMethod") + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "OK") + self.assertEqual(attrs[SERVER_ADDRESS], "localhost") + self.assertEqual(attrs[SERVER_PORT], 25565) + self.assertNotIn(ERROR_TYPE, attrs) + + async def test_error_call_records_status_in_metric(self): + """Aio client metric records the gRPC error status code on failure.""" + channel = grpc.aio.insecure_channel("localhost:25565") + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + + try: + with self.assertRaises(grpc.aio.AioRpcError): + await simple_method(stub, error=True) + finally: + await channel.close() + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_CLIENT_CALL_DURATION), + None, + ) + + self.assertIsNotNone(duration_metric) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual(attrs[RPC_METHOD], "GRPCTestServer/SimpleMethod") + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "INVALID_ARGUMENT") + self.assertEqual(attrs[ERROR_TYPE], "INVALID_ARGUMENT") + + async def test_server_streaming_records_duration_metric(self): + """Aio client metric is recorded for a server-streaming RPC.""" + channel = grpc.aio.insecure_channel("localhost:25565") + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + + try: + async for _ in server_streaming_method(stub): + pass + finally: + await channel.close() + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_CLIENT_CALL_DURATION), + None, + ) + + self.assertIsNotNone( + duration_metric, + f"Expected metric '{RPC_CLIENT_CALL_DURATION}' not found " + f"for streaming RPC. Got: {[m.name for m in metrics]}", + ) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual( + attrs[RPC_METHOD], "GRPCTestServer/ServerStreamingMethod" + ) + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "OK") diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_metrics.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_metrics.py new file mode 100644 index 0000000000..879a0a4771 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_metrics.py @@ -0,0 +1,190 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +from unittest import IsolatedAsyncioTestCase + +import grpc +import grpc.aio + +from opentelemetry.instrumentation.grpc import aio_server_interceptor +from opentelemetry.semconv._incubating.attributes.error_attributes import ( + ERROR_TYPE, +) +from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( + RPC_METHOD, + RPC_RESPONSE_STATUS_CODE, + RPC_SYSTEM_NAME, + RpcSystemNameValues, +) +from opentelemetry.semconv._incubating.metrics.rpc_metrics import ( + RPC_SERVER_CALL_DURATION, +) +from opentelemetry.test.test_base import TestBase + +from .protobuf.test_server_pb2 import Request, Response +from .protobuf.test_server_pb2_grpc import ( + GRPCTestServerServicer, + add_GRPCTestServerServicer_to_server, +) + + +class Servicer(GRPCTestServerServicer): + async def SimpleMethod(self, request, context): + return Response( + server_id=request.client_id, + response_data=request.request_data, + ) + + async def ServerStreamingMethod(self, request, context): + for data in ("one", "two", "three"): + yield Response( + server_id=request.client_id, + response_data=data, + ) + + +class TestAioServerInterceptorMetrics(TestBase, IsolatedAsyncioTestCase): + async def test_unary_call_records_duration_metric(self): + """Aio server interceptor records rpc.server.call.duration on unary RPC.""" + interceptor = aio_server_interceptor( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + server = grpc.aio.server(interceptors=[interceptor]) + add_GRPCTestServerServicer_to_server(Servicer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.aio.insecure_channel(f"localhost:{port:d}") + + await server.start() + try: + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + await channel.unary_unary(rpc_call)(msg) + finally: + await channel.close() + await server.stop(None) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_SERVER_CALL_DURATION), + None, + ) + + self.assertIsNotNone( + duration_metric, + f"Expected metric '{RPC_SERVER_CALL_DURATION}' not found. " + f"Got: {[m.name for m in metrics]}", + ) + self.assertEqual(duration_metric.unit, "s") + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual(attrs[RPC_METHOD], "GRPCTestServer/SimpleMethod") + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "OK") + self.assertNotIn(ERROR_TYPE, attrs) + + async def test_error_call_records_status_in_metric(self): + """Aio server metric records gRPC error status code on abort.""" + + class ErrorServicer(GRPCTestServerServicer): + async def SimpleMethod(self, request, context): + await context.abort(grpc.StatusCode.INTERNAL, "test failure") + + interceptor = aio_server_interceptor( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + server = grpc.aio.server(interceptors=[interceptor]) + add_GRPCTestServerServicer_to_server(ErrorServicer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.aio.insecure_channel(f"localhost:{port:d}") + + await server.start() + try: + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + with self.assertRaises(grpc.aio.AioRpcError): + await channel.unary_unary(rpc_call)(msg) + finally: + await channel.close() + await server.stop(None) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_SERVER_CALL_DURATION), + None, + ) + + self.assertIsNotNone(duration_metric) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual(attrs[RPC_METHOD], "GRPCTestServer/SimpleMethod") + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "INTERNAL") + self.assertEqual(attrs[ERROR_TYPE], "INTERNAL") + + async def test_streaming_call_records_duration_metric(self): + """Aio server interceptor records metric on a streaming RPC.""" + interceptor = aio_server_interceptor( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + server = grpc.aio.server(interceptors=[interceptor]) + add_GRPCTestServerServicer_to_server(Servicer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.aio.insecure_channel(f"localhost:{port:d}") + + await server.start() + try: + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + async for _ in channel.unary_stream(rpc_call)(msg): + pass + finally: + await channel.close() + await server.stop(None) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_SERVER_CALL_DURATION), + None, + ) + + self.assertIsNotNone(duration_metric) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual( + attrs[RPC_METHOD], "GRPCTestServer/ServerStreamingMethod" + ) + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "OK") From 53e4b081fc8b87e45d86fff4d7ae3d480222bcf1 Mon Sep 17 00:00:00 2001 From: Lorenzo Ronzani Date: Fri, 22 May 2026 09:26:58 +0000 Subject: [PATCH 05/11] Edited client --- .../src/opentelemetry/instrumentation/grpc/_aio_client.py | 8 ++++++++ .../src/opentelemetry/instrumentation/grpc/_client.py | 2 -- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py index e556834c59..4f6b942774 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py @@ -165,6 +165,8 @@ async def intercept_unary_unary( start_time = time.perf_counter() method = client_call_details.method + if isinstance(method, bytes): + method = method.decode() with self._start_interceptor_span(method) as span: new_details = self.propagate_trace_in_details(client_call_details) @@ -193,6 +195,8 @@ async def intercept_unary_stream( start_time = time.perf_counter() method = client_call_details.method + if isinstance(method, bytes): + method = method.decode() with self._start_interceptor_span(method) as span: new_details = self.propagate_trace_in_details(client_call_details) @@ -218,6 +222,8 @@ async def intercept_stream_unary( start_time = time.perf_counter() method = client_call_details.method + if isinstance(method, bytes): + method = method.decode() with self._start_interceptor_span(method) as span: new_details = self.propagate_trace_in_details(client_call_details) @@ -243,6 +249,8 @@ async def intercept_stream_stream( start_time = time.perf_counter() method = client_call_details.method + if isinstance(method, bytes): + method = method.decode() with self._start_interceptor_span(method) as span: new_details = self.propagate_trace_in_details(client_call_details) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index 7b752a0ecb..d796e7fe5f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -170,8 +170,6 @@ def _trace_result(self, span, rpc_info, result): return result def _build_metric_attributes(self, method, status_code): - if isinstance(method, bytes): - method = method.decode() full_method = method.lstrip("/") if method else _DEFAULT_RPC_METHOD attrs = { RPC_SYSTEM_NAME: RpcSystemNameValues.GRPC.value, From 55dd9c0d51336c52c1ea1d8fcacc0ef062b86450 Mon Sep 17 00:00:00 2001 From: Lorenzo Ronzani Date: Fri, 22 May 2026 09:45:26 +0000 Subject: [PATCH 06/11] Removed None --- .../instrumentation/grpc/_aio_client.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py index 4f6b942774..6bcc93bc80 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py @@ -94,7 +94,7 @@ def _start_interceptor_span(self, method): ) async def _wrap_unary_response( - self, continuation, span, method=None, start_time=None + self, continuation, span, method, start_time ): status_code = grpc.StatusCode.OK try: @@ -121,12 +121,9 @@ async def _wrap_unary_response( self.add_error_details_to_span(span, exc) raise exc finally: - if start_time is not None: - self._record_duration(method, start_time, status_code) + self._record_duration(method, start_time, status_code) - async def _wrap_stream_response( - self, span, call, method=None, start_time=None - ): + async def _wrap_stream_response(self, span, call, method, start_time): status_code = grpc.StatusCode.OK try: async for response in call: @@ -140,8 +137,7 @@ async def _wrap_stream_response( raise exc finally: span.end() - if start_time is not None: - self._record_duration(method, start_time, status_code) + self._record_duration(method, start_time, status_code) def tracing_skipped(self, client_call_details): return ( From cf6e4b331f9ec0b7c57d8bf1e21d5907595bd015 Mon Sep 17 00:00:00 2001 From: Lorenzo Ronzani Date: Fri, 22 May 2026 09:54:42 +0000 Subject: [PATCH 07/11] Added changelog edit --- .changelog/4621.added | 1 + 1 file changed, 1 insertion(+) create mode 100644 .changelog/4621.added diff --git a/.changelog/4621.added b/.changelog/4621.added new file mode 100644 index 0000000000..d17f9f9b99 --- /dev/null +++ b/.changelog/4621.added @@ -0,0 +1 @@ +`opentelemetry-instrumentation-grpc`: add rpc.client.call.duration and rpc.server.call.duration metrics for both sync and async components \ No newline at end of file From 519fec7ba6a92059ccacb18aac94a564d780ea13 Mon Sep 17 00:00:00 2001 From: Lorenzo Ronzani Date: Wed, 27 May 2026 06:33:56 +0000 Subject: [PATCH 08/11] Integrated first 3 comments --- .../instrumentation/grpc/_aio_server.py | 4 ++ .../instrumentation/grpc/_client.py | 28 +++++++++--- .../instrumentation/grpc/_server.py | 4 ++ .../test_aio_server_interceptor_metrics.py | 44 +++++++++++++++++++ .../tests/test_client_interceptor_metrics.py | 26 ++++++++++- .../tests/test_server_interceptor_metrics.py | 43 ++++++++++++++++++ 6 files changed, 142 insertions(+), 7 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py index 4907916f3c..0209cc612b 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py @@ -117,6 +117,8 @@ async def _unary_interceptor(request_or_iterator, context): # pylint:disable=unidiomatic-typecheck if type(error) != Exception: # noqa: E721 span.record_exception(error) + if context._self_code == grpc.StatusCode.OK: + context._self_code = grpc.StatusCode.UNKNOWN raise error finally: @@ -150,6 +152,8 @@ async def _stream_interceptor(request_or_iterator, context): # pylint:disable=unidiomatic-typecheck if type(error) != Exception: # noqa: E721 span.record_exception(error) + if context._self_code == grpc.StatusCode.OK: + context._self_code = grpc.StatusCode.UNKNOWN raise error finally: diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index d796e7fe5f..6c302b0ad9 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -148,13 +148,16 @@ def _start_span(self, method, **kwargs): ) # pylint:disable=no-self-use - def _trace_result(self, span, rpc_info, result): + def _trace_result(self, span, rpc_info, result, method, start_time): # If the RPC is called asynchronously, add a callback to end the span # when the future is done, else end the span immediately if isinstance(result, grpc.Future): result.add_done_callback( _make_future_done_callback(span, rpc_info) ) + result.add_done_callback( + self._make_future_duration_callback(method, start_time) + ) return result response = result # Handle the case when the RPC is initiated via the with_call @@ -167,6 +170,7 @@ def _trace_result(self, span, rpc_info, result): if self._response_hook: self._call_response_hook(span, response) span.end() + self._record_duration(method, start_time, grpc.StatusCode.OK) return result def _build_metric_attributes(self, method, status_code): @@ -191,6 +195,14 @@ def _record_duration(self, method, start_time, status_code): attrs = self._build_metric_attributes(method, status_code) self._duration_histogram.record(elapsed, attributes=attrs) + def _make_future_duration_callback(self, method, start_time): + def callback(response_future): + code = response_future.code() + status_code = code if code is not None else grpc.StatusCode.OK + self._record_duration(method, start_time, status_code) + + return callback + def _intercept(self, request, metadata, client_info, invoker): if not is_instrumentation_enabled(): return invoker(request, metadata) @@ -201,7 +213,6 @@ def _intercept(self, request, metadata, client_info, invoker): mutable_metadata = OrderedDict(metadata) start_time = time.perf_counter() - status_code = grpc.StatusCode.OK with self._start_span( client_info.full_method, @@ -230,6 +241,8 @@ def _intercept(self, request, metadata, client_info, invoker): RPC_GRPC_STATUS_CODE, status_code.value[0], ) + else: + status_code = grpc.StatusCode.UNKNOWN span.set_status( Status( status_code=StatusCode.ERROR, @@ -237,14 +250,17 @@ def _intercept(self, request, metadata, client_info, invoker): ) ) span.record_exception(exc) + self._record_duration( + client_info.full_method, start_time, status_code + ) raise exc finally: if result is None: span.end() - self._record_duration( - client_info.full_method, start_time, status_code - ) - return self._trace_result(span, rpc_info, result) + return self._trace_result( + span, rpc_info, result, + client_info.full_method, start_time, + ) def _call_request_hook(self, span, request): if not callable(self._request_hook): diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 686d5e2f00..94cf2897cd 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -356,6 +356,8 @@ def telemetry_interceptor(request_or_iterator, context): # pylint:disable=unidiomatic-typecheck if type(error) != Exception: # noqa: E721 span.record_exception(error) + if context._code == grpc.StatusCode.OK: + context._code = grpc.StatusCode.UNKNOWN raise error finally: @@ -392,6 +394,8 @@ def _intercept_server_stream( # pylint:disable=unidiomatic-typecheck if type(error) != Exception: # noqa: E721 span.record_exception(error) + if context._code == grpc.StatusCode.OK: + context._code = grpc.StatusCode.UNKNOWN raise error finally: diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_metrics.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_metrics.py index 879a0a4771..3e37e5d012 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_metrics.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_metrics.py @@ -142,6 +142,50 @@ async def SimpleMethod(self, request, context): self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "INTERNAL") self.assertEqual(attrs[ERROR_TYPE], "INTERNAL") + async def test_uncaught_exception_records_unknown_status(self): + """Uncaught handler exception records UNKNOWN status in metric.""" + + class CrashingServicer(GRPCTestServerServicer): + async def SimpleMethod(self, request, context): + raise RuntimeError("unexpected crash") + + interceptor = aio_server_interceptor( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + server = grpc.aio.server(interceptors=[interceptor]) + add_GRPCTestServerServicer_to_server(CrashingServicer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.aio.insecure_channel(f"localhost:{port:d}") + + await server.start() + try: + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + with self.assertRaises(grpc.aio.AioRpcError): + await channel.unary_unary(rpc_call)(msg) + finally: + await channel.close() + await server.stop(None) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_SERVER_CALL_DURATION), + None, + ) + + self.assertIsNotNone(duration_metric) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + attrs = dict(point.attributes) + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "UNKNOWN") + self.assertEqual(attrs[ERROR_TYPE], "UNKNOWN") + async def test_streaming_call_records_duration_metric(self): """Aio server interceptor records metric on a streaming RPC.""" interceptor = aio_server_interceptor( diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_metrics.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_metrics.py index c7f6403fe0..6b0249c236 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_metrics.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_metrics.py @@ -22,7 +22,7 @@ ) from opentelemetry.test.test_base import TestBase -from ._client import server_streaming_method, simple_method +from ._client import server_streaming_method, simple_method, simple_method_future from ._server import create_test_server from .protobuf import test_server_pb2_grpc @@ -142,3 +142,27 @@ def test_server_streaming_records_duration_metric(self): attrs[RPC_METHOD], "GRPCTestServer/ServerStreamingMethod" ) self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "OK") + + def test_future_call_records_correct_status(self): + """Future-based call defers metric recording until completion.""" + future = simple_method_future(self._stub, error=True) + with self.assertRaises(grpc.RpcError): + future.result() + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_CLIENT_CALL_DURATION), + None, + ) + + self.assertIsNotNone(duration_metric) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + + attrs = dict(point.attributes) + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "INVALID_ARGUMENT") + self.assertEqual(attrs[ERROR_TYPE], "INVALID_ARGUMENT") diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_metrics.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_metrics.py index ed675dbc8f..b1a6f35276 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_metrics.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_metrics.py @@ -153,6 +153,49 @@ def SimpleMethod(self, request, context): self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "INTERNAL") self.assertEqual(attrs[ERROR_TYPE], "INTERNAL") + def test_uncaught_exception_records_unknown_status(self): + """Uncaught handler exception records UNKNOWN status in metric.""" + + class CrashingServicer(GRPCTestServerServicer): + def SimpleMethod(self, request, context): + raise RuntimeError("unexpected crash") + + interceptor = server_interceptor( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + with self.server( + max_workers=1, interceptors=[interceptor] + ) as (server, channel): + add_GRPCTestServerServicer_to_server(CrashingServicer(), server) + + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + with self.assertRaises(Exception): + channel.unary_unary(rpc_call)(msg) + finally: + server.stop(None) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_SERVER_CALL_DURATION), + None, + ) + + self.assertIsNotNone(duration_metric) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + attrs = dict(point.attributes) + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "UNKNOWN") + self.assertEqual(attrs[ERROR_TYPE], "UNKNOWN") + def test_streaming_call_records_duration_metric(self): """Server interceptor records metric on a streaming RPC.""" From 2702b3fba2a7e95912ef621bc700d763f8d68ae6 Mon Sep 17 00:00:00 2001 From: Lorenzo Ronzani Date: Wed, 27 May 2026 06:34:37 +0000 Subject: [PATCH 09/11] ruff --- .../tests/test_client_interceptor_metrics.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_metrics.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_metrics.py index 6b0249c236..e41de2ba7e 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_metrics.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_metrics.py @@ -22,7 +22,11 @@ ) from opentelemetry.test.test_base import TestBase -from ._client import server_streaming_method, simple_method, simple_method_future +from ._client import ( + server_streaming_method, + simple_method, + simple_method_future, +) from ._server import create_test_server from .protobuf import test_server_pb2_grpc From 48a90e2461e1e6dc74db6b777cd315f2862d2373 Mon Sep 17 00:00:00 2001 From: Lorenzo Ronzani Date: Wed, 27 May 2026 06:42:45 +0000 Subject: [PATCH 10/11] Integrated comments --- .../instrumentation/grpc/__init__.py | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index d238353246..94f65727cd 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -327,6 +327,7 @@ def instrumentation_dependencies(self) -> Collection[str]: def _instrument(self, **kwargs): self._original_func = grpc.server tracer_provider = kwargs.get("tracer_provider") + meter_provider = kwargs.get("meter_provider") def server(*args, **kwargs): if "interceptors" in kwargs and kwargs["interceptors"]: @@ -335,13 +336,17 @@ def server(*args, **kwargs): kwargs["interceptors"].insert( 0, server_interceptor( - tracer_provider=tracer_provider, filter_=self._filter + tracer_provider=tracer_provider, + filter_=self._filter, + meter_provider=meter_provider, ), ) else: kwargs["interceptors"] = [ server_interceptor( - tracer_provider=tracer_provider, filter_=self._filter + tracer_provider=tracer_provider, + filter_=self._filter, + meter_provider=meter_provider, ) ] @@ -381,6 +386,7 @@ def instrumentation_dependencies(self) -> Collection[str]: def _instrument(self, **kwargs): self._original_func = grpc.aio.server tracer_provider = kwargs.get("tracer_provider") + meter_provider = kwargs.get("meter_provider") def server(*args, **kwargs): if "interceptors" in kwargs and kwargs["interceptors"]: @@ -389,13 +395,17 @@ def server(*args, **kwargs): kwargs["interceptors"].insert( 0, aio_server_interceptor( - tracer_provider=tracer_provider, filter_=self._filter + tracer_provider=tracer_provider, + filter_=self._filter, + meter_provider=meter_provider, ), ) else: kwargs["interceptors"] = [ aio_server_interceptor( - tracer_provider=tracer_provider, filter_=self._filter + tracer_provider=tracer_provider, + filter_=self._filter, + meter_provider=meter_provider, ) ] return self._original_func(*args, **kwargs) @@ -473,7 +483,7 @@ def _uninstrument(self, **kwargs): def wrapper_fn(self, original_func, instance, args, kwargs): channel = original_func(*args, **kwargs) - target = args[0] if args else None + target = args[0] if args else kwargs.get("target") return intercept_channel( channel, client_interceptor( @@ -541,14 +551,14 @@ def _instrument(self, **kwargs): meter_provider = kwargs.get("meter_provider") def insecure(*args, **kwargs): - target = args[0] if args else None + target = args[0] if args else kwargs.get("target") kwargs = self._add_interceptors( tracer_provider, meter_provider, target, kwargs ) return self._original_insecure(*args, **kwargs) def secure(*args, **kwargs): - target = args[0] if args else None + target = args[0] if args else kwargs.get("target") kwargs = self._add_interceptors( tracer_provider, meter_provider, target, kwargs ) From 5219282fd0d8de1f579e693b93e58458887c7cfa Mon Sep 17 00:00:00 2001 From: Lorenzo Ronzani Date: Wed, 27 May 2026 07:07:20 +0000 Subject: [PATCH 11/11] Integrated comments --- .../instrumentation/grpc/_aio_server.py | 10 ++++++---- .../instrumentation/grpc/_client.py | 19 ++++++++++++------- .../instrumentation/grpc/_server.py | 10 ++++++---- .../tests/test_server_interceptor_metrics.py | 4 ++-- 4 files changed, 26 insertions(+), 17 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py index 0209cc612b..212f29e28e 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py @@ -107,6 +107,7 @@ async def _unary_interceptor(request_or_iterator, context): context = _OpenTelemetryAioServicerContext(context, span) # And now we run the actual RPC. + metric_status = None try: return await behavior(request_or_iterator, context) @@ -118,14 +119,14 @@ async def _unary_interceptor(request_or_iterator, context): if type(error) != Exception: # noqa: E721 span.record_exception(error) if context._self_code == grpc.StatusCode.OK: - context._self_code = grpc.StatusCode.UNKNOWN + metric_status = grpc.StatusCode.UNKNOWN raise error finally: self._record_duration( handler_call_details, start_time, - context._self_code, + metric_status or context._self_code, ) return _unary_interceptor @@ -142,6 +143,7 @@ async def _stream_interceptor(request_or_iterator, context): ) as span: context = _OpenTelemetryAioServicerContext(context, span) + metric_status = None try: async for response in behavior( request_or_iterator, context @@ -153,14 +155,14 @@ async def _stream_interceptor(request_or_iterator, context): if type(error) != Exception: # noqa: E721 span.record_exception(error) if context._self_code == grpc.StatusCode.OK: - context._self_code = grpc.StatusCode.UNKNOWN + metric_status = grpc.StatusCode.UNKNOWN raise error finally: self._record_duration( handler_call_details, start_time, - context._self_code, + metric_status or context._self_code, ) return _stream_interceptor diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index 6c302b0ad9..877d5b1692 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -12,6 +12,7 @@ import time from collections import OrderedDict from typing import Callable, MutableMapping +from urllib.parse import urlparse import grpc @@ -64,13 +65,17 @@ def _create_duration_histogram(meter): def _parse_target(target): if not target: return None, None - host, _, port = target.rpartition(":") - if host and port: - try: - return host, int(port) - except ValueError: - pass - return target, None + if ":///" in target: + target = target.split("///", 1)[1] + try: + parsed = urlparse(f"//{target}") + host = parsed.hostname + port = parsed.port + except ValueError: + return None, None + if not host: + return None, None + return host, port class _CarrierSetter(Setter): diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 94cf2897cd..6ee84fa0a0 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -346,6 +346,7 @@ def telemetry_interceptor(request_or_iterator, context): context = _OpenTelemetryServicerContext(context, span) # And now we run the actual RPC. + metric_status = None try: return behavior(request_or_iterator, context) @@ -357,14 +358,14 @@ def telemetry_interceptor(request_or_iterator, context): if type(error) != Exception: # noqa: E721 span.record_exception(error) if context._code == grpc.StatusCode.OK: - context._code = grpc.StatusCode.UNKNOWN + metric_status = grpc.StatusCode.UNKNOWN raise error finally: self._record_duration( handler_call_details, start_time, - context._code, + metric_status or context._code, ) return telemetry_interceptor @@ -387,6 +388,7 @@ def _intercept_server_stream( ) as span: context = _OpenTelemetryServicerContext(context, span) + metric_status = None try: yield from behavior(request_or_iterator, context) @@ -395,12 +397,12 @@ def _intercept_server_stream( if type(error) != Exception: # noqa: E721 span.record_exception(error) if context._code == grpc.StatusCode.OK: - context._code = grpc.StatusCode.UNKNOWN + metric_status = grpc.StatusCode.UNKNOWN raise error finally: self._record_duration( handler_call_details, start_time, - context._code, + metric_status or context._code, ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_metrics.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_metrics.py index b1a6f35276..bf8dc96af2 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_metrics.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_metrics.py @@ -125,7 +125,7 @@ def SimpleMethod(self, request, context): msg = request.SerializeToString() try: server.start() - with self.assertRaises(Exception): + with self.assertRaises(grpc.RpcError): channel.unary_unary(rpc_call)(msg) finally: server.stop(None) @@ -175,7 +175,7 @@ def SimpleMethod(self, request, context): msg = request.SerializeToString() try: server.start() - with self.assertRaises(Exception): + with self.assertRaises(grpc.RpcError): channel.unary_unary(rpc_call)(msg) finally: server.stop(None)