diff --git a/.changelog/4621.added b/.changelog/4621.added new file mode 100644 index 0000000000..d17f9f9b99 --- /dev/null +++ b/.changelog/4621.added @@ -0,0 +1 @@ +`opentelemetry-instrumentation-grpc`: add rpc.client.call.duration and rpc.server.call.duration metrics for both sync and async components \ No newline at end of file diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index 3b0ef08703..94f65727cd 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -285,6 +285,7 @@ async def serve(): from opentelemetry.instrumentation.grpc.version import __version__ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.metrics import get_meter # pylint:disable=import-outside-toplevel # pylint:disable=import-self @@ -326,6 +327,7 @@ def instrumentation_dependencies(self) -> Collection[str]: def _instrument(self, **kwargs): self._original_func = grpc.server tracer_provider = kwargs.get("tracer_provider") + meter_provider = kwargs.get("meter_provider") def server(*args, **kwargs): if "interceptors" in kwargs and kwargs["interceptors"]: @@ -334,13 +336,17 @@ def server(*args, **kwargs): kwargs["interceptors"].insert( 0, server_interceptor( - tracer_provider=tracer_provider, filter_=self._filter + tracer_provider=tracer_provider, + filter_=self._filter, + meter_provider=meter_provider, ), ) else: kwargs["interceptors"] = [ server_interceptor( - tracer_provider=tracer_provider, filter_=self._filter + tracer_provider=tracer_provider, + filter_=self._filter, + meter_provider=meter_provider, ) ] @@ -380,6 +386,7 @@ def instrumentation_dependencies(self) -> Collection[str]: def _instrument(self, **kwargs): self._original_func = grpc.aio.server tracer_provider = kwargs.get("tracer_provider") + meter_provider = kwargs.get("meter_provider") def server(*args, **kwargs): if "interceptors" in kwargs and kwargs["interceptors"]: @@ -388,13 +395,17 @@ def server(*args, **kwargs): kwargs["interceptors"].insert( 0, aio_server_interceptor( - tracer_provider=tracer_provider, filter_=self._filter + tracer_provider=tracer_provider, + filter_=self._filter, + meter_provider=meter_provider, ), ) else: kwargs["interceptors"] = [ aio_server_interceptor( - tracer_provider=tracer_provider, filter_=self._filter + tracer_provider=tracer_provider, + filter_=self._filter, + meter_provider=meter_provider, ) ] return self._original_func(*args, **kwargs) @@ -457,6 +468,8 @@ def instrumentation_dependencies(self) -> Collection[str]: def _instrument(self, **kwargs): self._request_hook = kwargs.get("request_hook") self._response_hook = kwargs.get("response_hook") + self._tracer_provider = kwargs.get("tracer_provider") + self._meter_provider = kwargs.get("meter_provider") for ctype in self._which_channel(kwargs): _wrap( "grpc", @@ -470,16 +483,16 @@ def _uninstrument(self, **kwargs): def wrapper_fn(self, original_func, instance, args, kwargs): channel = original_func(*args, **kwargs) - tracer_provider = kwargs.get("tracer_provider") - request_hook = self._request_hook - response_hook = self._response_hook + target = args[0] if args else kwargs.get("target") return intercept_channel( channel, client_interceptor( - tracer_provider=tracer_provider, + tracer_provider=self._tracer_provider, filter_=self._filter, - request_hook=request_hook, - response_hook=response_hook, + request_hook=self._request_hook, + response_hook=self._response_hook, + meter_provider=self._meter_provider, + target=target, ), ) @@ -511,25 +524,21 @@ def __init__(self, filter_=None): def instrumentation_dependencies(self) -> Collection[str]: return _instruments - def _add_interceptors(self, tracer_provider, kwargs): + def _add_interceptors(self, tracer_provider, meter_provider, target, kwargs): + interceptors = aio_client_interceptors( + tracer_provider=tracer_provider, + filter_=self._filter, + request_hook=self._request_hook, + response_hook=self._response_hook, + meter_provider=meter_provider, + target=target, + ) if "interceptors" in kwargs and kwargs["interceptors"]: - kwargs["interceptors"] = list(kwargs["interceptors"]) - kwargs["interceptors"] = ( - aio_client_interceptors( - tracer_provider=tracer_provider, - filter_=self._filter, - request_hook=self._request_hook, - response_hook=self._response_hook, - ) - + kwargs["interceptors"] + kwargs["interceptors"] = interceptors + list( + kwargs["interceptors"] ) else: - kwargs["interceptors"] = aio_client_interceptors( - tracer_provider=tracer_provider, - filter_=self._filter, - request_hook=self._request_hook, - response_hook=self._response_hook, - ) + kwargs["interceptors"] = interceptors return kwargs @@ -539,15 +548,20 @@ def _instrument(self, **kwargs): self._request_hook = kwargs.get("request_hook") self._response_hook = kwargs.get("response_hook") tracer_provider = kwargs.get("tracer_provider") + meter_provider = kwargs.get("meter_provider") def insecure(*args, **kwargs): - kwargs = self._add_interceptors(tracer_provider, kwargs) - + target = args[0] if args else kwargs.get("target") + kwargs = self._add_interceptors( + tracer_provider, meter_provider, target, kwargs + ) return self._original_insecure(*args, **kwargs) def secure(*args, **kwargs): - kwargs = self._add_interceptors(tracer_provider, kwargs) - + target = args[0] if args else kwargs.get("target") + kwargs = self._add_interceptors( + tracer_provider, meter_provider, target, kwargs + ) return self._original_secure(*args, **kwargs) grpc.aio.insecure_channel = insecure @@ -559,7 +573,12 @@ def _uninstrument(self, **kwargs): def client_interceptor( - tracer_provider=None, filter_=None, request_hook=None, response_hook=None + tracer_provider=None, + filter_=None, + request_hook=None, + response_hook=None, + meter_provider=None, + target=None, ): """Create a gRPC client channel interceptor. @@ -570,6 +589,10 @@ def client_interceptor( matches the condition. Default is None and intercept all requests. + meter_provider: The meter provider to use for metrics. + + target: The target address of the channel (e.g. "host:port"). + Returns: An invocation-side interceptor object. """ @@ -582,15 +605,23 @@ def client_interceptor( schema_url="https://opentelemetry.io/schemas/1.11.0", ) + meter = get_meter( + __name__, + __version__, + meter_provider, + ) + return _client.OpenTelemetryClientInterceptor( tracer, filter_=filter_, request_hook=request_hook, response_hook=response_hook, + meter=meter, + target=target, ) -def server_interceptor(tracer_provider=None, filter_=None): +def server_interceptor(tracer_provider=None, filter_=None, meter_provider=None): """Create a gRPC server interceptor. Args: @@ -600,6 +631,8 @@ def server_interceptor(tracer_provider=None, filter_=None): matches the condition. Default is None and intercept all requests. + meter_provider: The meter provider to use for metrics. + Returns: A service-side interceptor object. """ @@ -612,17 +645,34 @@ def server_interceptor(tracer_provider=None, filter_=None): schema_url="https://opentelemetry.io/schemas/1.11.0", ) - return _server.OpenTelemetryServerInterceptor(tracer, filter_=filter_) + meter = get_meter( + __name__, + __version__, + meter_provider, + ) + + return _server.OpenTelemetryServerInterceptor( + tracer, filter_=filter_, meter=meter + ) def aio_client_interceptors( - tracer_provider=None, filter_=None, request_hook=None, response_hook=None + tracer_provider=None, + filter_=None, + request_hook=None, + response_hook=None, + meter_provider=None, + target=None, ): """Create a gRPC client channel interceptor. Args: tracer: The tracer to use to create client-side spans. + meter_provider: The meter provider to use for metrics. + + target: The target address of the channel (e.g. "host:port"). + Returns: An invocation-side interceptor object. """ @@ -635,40 +685,38 @@ def aio_client_interceptors( schema_url="https://opentelemetry.io/schemas/1.11.0", ) + meter = get_meter( + __name__, + __version__, + meter_provider, + ) + + common_kwargs = { + "filter_": filter_, + "request_hook": request_hook, + "response_hook": response_hook, + "meter": meter, + "target": target, + } + return [ - _aio_client.UnaryUnaryAioClientInterceptor( - tracer, - filter_=filter_, - request_hook=request_hook, - response_hook=response_hook, - ), - _aio_client.UnaryStreamAioClientInterceptor( - tracer, - filter_=filter_, - request_hook=request_hook, - response_hook=response_hook, - ), - _aio_client.StreamUnaryAioClientInterceptor( - tracer, - filter_=filter_, - request_hook=request_hook, - response_hook=response_hook, - ), + _aio_client.UnaryUnaryAioClientInterceptor(tracer, **common_kwargs), + _aio_client.UnaryStreamAioClientInterceptor(tracer, **common_kwargs), + _aio_client.StreamUnaryAioClientInterceptor(tracer, **common_kwargs), _aio_client.StreamStreamAioClientInterceptor( - tracer, - filter_=filter_, - request_hook=request_hook, - response_hook=response_hook, + tracer, **common_kwargs ), ] -def aio_server_interceptor(tracer_provider=None, filter_=None): +def aio_server_interceptor(tracer_provider=None, filter_=None, meter_provider=None): """Create a gRPC aio server interceptor. Args: tracer: The tracer to use to create server-side spans. + meter_provider: The meter provider to use for metrics. + Returns: A service-side interceptor object. """ @@ -681,8 +729,14 @@ def aio_server_interceptor(tracer_provider=None, filter_=None): schema_url="https://opentelemetry.io/schemas/1.11.0", ) + meter = get_meter( + __name__, + __version__, + meter_provider, + ) + return _aio_server.OpenTelemetryAioServerInterceptor( - tracer, filter_=filter_ + tracer, filter_=filter_, meter=meter ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py index cd08d8300a..6bcc93bc80 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_client.py @@ -3,6 +3,7 @@ import functools import logging +import time import grpc from grpc.aio import ClientCallDetails, Metadata @@ -92,7 +93,10 @@ def _start_interceptor_span(self, method): set_status_on_exception=False, ) - async def _wrap_unary_response(self, continuation, span): + async def _wrap_unary_response( + self, continuation, span, method, start_time + ): + status_code = grpc.StatusCode.OK try: call = await continuation() @@ -103,6 +107,8 @@ async def _wrap_unary_response(self, continuation, span): code = await call.code() details = await call.details() + status_code = code + call.add_done_callback( _unary_done_callback( span, code, details, self._call_response_hook @@ -111,20 +117,27 @@ async def _wrap_unary_response(self, continuation, span): return call except grpc.aio.AioRpcError as exc: + status_code = exc.code() self.add_error_details_to_span(span, exc) raise exc + finally: + self._record_duration(method, start_time, status_code) - async def _wrap_stream_response(self, span, call): + async def _wrap_stream_response(self, span, call, method, start_time): + status_code = grpc.StatusCode.OK try: async for response in call: if self._response_hook: self._call_response_hook(span, response) yield response except Exception as exc: + if isinstance(exc, grpc.aio.AioRpcError): + status_code = exc.code() self.add_error_details_to_span(span, exc) raise exc finally: span.end() + self._record_duration(method, start_time, status_code) def tracing_skipped(self, client_call_details): return ( @@ -146,9 +159,12 @@ async def intercept_unary_unary( if self.tracing_skipped(client_call_details): return await continuation(client_call_details, request) - with self._start_interceptor_span( - client_call_details.method, - ) as span: + start_time = time.perf_counter() + method = client_call_details.method + if isinstance(method, bytes): + method = method.decode() + + with self._start_interceptor_span(method) as span: new_details = self.propagate_trace_in_details(client_call_details) if self._request_hook: @@ -158,7 +174,8 @@ async def intercept_unary_unary( continuation, new_details, request ) return await self._wrap_unary_response( - continuation_with_args, span + continuation_with_args, span, + method=method, start_time=start_time, ) @@ -172,15 +189,21 @@ async def intercept_unary_stream( if self.tracing_skipped(client_call_details): return await continuation(client_call_details, request) - with self._start_interceptor_span( - client_call_details.method, - ) as span: + start_time = time.perf_counter() + method = client_call_details.method + if isinstance(method, bytes): + method = method.decode() + + with self._start_interceptor_span(method) as span: new_details = self.propagate_trace_in_details(client_call_details) resp = await continuation(new_details, request) if self._request_hook: self._call_request_hook(span, request) - return self._wrap_stream_response(span, resp) + return self._wrap_stream_response( + span, resp, + method=method, start_time=start_time, + ) class StreamUnaryAioClientInterceptor( @@ -193,16 +216,20 @@ async def intercept_stream_unary( if self.tracing_skipped(client_call_details): return await continuation(client_call_details, request_iterator) - with self._start_interceptor_span( - client_call_details.method, - ) as span: + start_time = time.perf_counter() + method = client_call_details.method + if isinstance(method, bytes): + method = method.decode() + + with self._start_interceptor_span(method) as span: new_details = self.propagate_trace_in_details(client_call_details) continuation_with_args = functools.partial( continuation, new_details, request_iterator ) return await self._wrap_unary_response( - continuation_with_args, span + continuation_with_args, span, + method=method, start_time=start_time, ) @@ -216,11 +243,17 @@ async def intercept_stream_stream( if self.tracing_skipped(client_call_details): return await continuation(client_call_details, request_iterator) - with self._start_interceptor_span( - client_call_details.method, - ) as span: + start_time = time.perf_counter() + method = client_call_details.method + if isinstance(method, bytes): + method = method.decode() + + with self._start_interceptor_span(method) as span: new_details = self.propagate_trace_in_details(client_call_details) resp = await continuation(new_details, request_iterator) - return self._wrap_stream_response(span, resp) + return self._wrap_stream_response( + span, resp, + method=method, start_time=start_time, + ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py index c7e073a4bc..212f29e28e 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_aio_server.py @@ -1,6 +1,8 @@ # Copyright The OpenTelemetry Authors # SPDX-License-Identifier: Apache-2.0 +import time + import grpc import grpc.aio @@ -93,6 +95,8 @@ def telemetry_wrapper(behavior, request_streaming, response_streaming): def _intercept_aio_server_unary(self, behavior, handler_call_details): async def _unary_interceptor(request_or_iterator, context): + start_time = time.perf_counter() + with self._set_remote_context(context): with self._start_span( handler_call_details, @@ -103,6 +107,7 @@ async def _unary_interceptor(request_or_iterator, context): context = _OpenTelemetryAioServicerContext(context, span) # And now we run the actual RPC. + metric_status = None try: return await behavior(request_or_iterator, context) @@ -113,12 +118,23 @@ async def _unary_interceptor(request_or_iterator, context): # pylint:disable=unidiomatic-typecheck if type(error) != Exception: # noqa: E721 span.record_exception(error) + if context._self_code == grpc.StatusCode.OK: + metric_status = grpc.StatusCode.UNKNOWN raise error + finally: + self._record_duration( + handler_call_details, + start_time, + metric_status or context._self_code, + ) + return _unary_interceptor def _intercept_aio_server_stream(self, behavior, handler_call_details): async def _stream_interceptor(request_or_iterator, context): + start_time = time.perf_counter() + with self._set_remote_context(context): with self._start_span( handler_call_details, @@ -127,6 +143,7 @@ async def _stream_interceptor(request_or_iterator, context): ) as span: context = _OpenTelemetryAioServicerContext(context, span) + metric_status = None try: async for response in behavior( request_or_iterator, context @@ -137,6 +154,15 @@ async def _stream_interceptor(request_or_iterator, context): # pylint:disable=unidiomatic-typecheck if type(error) != Exception: # noqa: E721 span.record_exception(error) + if context._self_code == grpc.StatusCode.OK: + metric_status = grpc.StatusCode.UNKNOWN raise error + finally: + self._record_duration( + handler_call_details, + start_time, + metric_status or context._self_code, + ) + return _stream_interceptor diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py index 832a5d5835..877d5b1692 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_client.py @@ -9,8 +9,10 @@ """Implementation of the invocation-side open-telemetry interceptor.""" import logging +import time from collections import OrderedDict from typing import Callable, MutableMapping +from urllib.parse import urlparse import grpc @@ -20,16 +22,61 @@ from opentelemetry.instrumentation.utils import is_instrumentation_enabled from opentelemetry.propagate import inject from opentelemetry.propagators.textmap import Setter +from opentelemetry.semconv._incubating.attributes.error_attributes import ( + ERROR_TYPE, +) from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( RPC_GRPC_STATUS_CODE, RPC_METHOD, + RPC_RESPONSE_STATUS_CODE, RPC_SERVICE, RPC_SYSTEM, + RPC_SYSTEM_NAME, + RpcSystemNameValues, +) +from opentelemetry.semconv._incubating.attributes.server_attributes import ( + SERVER_ADDRESS, + SERVER_PORT, +) +from opentelemetry.semconv._incubating.metrics.rpc_metrics import ( + RPC_CLIENT_CALL_DURATION, ) from opentelemetry.trace.status import Status, StatusCode logger = logging.getLogger(__name__) +_DEFAULT_RPC_METHOD = "_OTHER" + +_RPC_DURATION_BUCKET_BOUNDARIES = ( + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, + 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10, +) + + +def _create_duration_histogram(meter): + return meter.create_histogram( + name=RPC_CLIENT_CALL_DURATION, + description="Measures the duration of an outgoing Remote Procedure Call (RPC).", + unit="s", + explicit_bucket_boundaries_advisory=_RPC_DURATION_BUCKET_BOUNDARIES, + ) + + +def _parse_target(target): + if not target: + return None, None + if ":///" in target: + target = target.split("///", 1)[1] + try: + parsed = urlparse(f"//{target}") + host = parsed.hostname + port = parsed.port + except ValueError: + return None, None + if not host: + return None, None + return host, port + class _CarrierSetter(Setter): """We use a custom setter in order to be able to lower case @@ -77,12 +124,17 @@ class OpenTelemetryClientInterceptor( grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor ): def __init__( - self, tracer, filter_=None, request_hook=None, response_hook=None + self, tracer, filter_=None, request_hook=None, response_hook=None, + meter=None, target=None, ): self._tracer = tracer self._filter = filter_ self._request_hook = request_hook self._response_hook = response_hook + self._duration_histogram = ( + _create_duration_histogram(meter) if meter else None + ) + self._server_address, self._server_port = _parse_target(target) def _start_span(self, method, **kwargs): service, meth = method.lstrip("/").split("/", 1) @@ -101,13 +153,16 @@ def _start_span(self, method, **kwargs): ) # pylint:disable=no-self-use - def _trace_result(self, span, rpc_info, result): + def _trace_result(self, span, rpc_info, result, method, start_time): # If the RPC is called asynchronously, add a callback to end the span # when the future is done, else end the span immediately if isinstance(result, grpc.Future): result.add_done_callback( _make_future_done_callback(span, rpc_info) ) + result.add_done_callback( + self._make_future_duration_callback(method, start_time) + ) return result response = result # Handle the case when the RPC is initiated via the with_call @@ -120,8 +175,39 @@ def _trace_result(self, span, rpc_info, result): if self._response_hook: self._call_response_hook(span, response) span.end() + self._record_duration(method, start_time, grpc.StatusCode.OK) return result + def _build_metric_attributes(self, method, status_code): + full_method = method.lstrip("/") if method else _DEFAULT_RPC_METHOD + attrs = { + RPC_SYSTEM_NAME: RpcSystemNameValues.GRPC.value, + RPC_METHOD: full_method, + RPC_RESPONSE_STATUS_CODE: status_code.name, + } + if self._server_address: + attrs[SERVER_ADDRESS] = self._server_address + if self._server_port is not None: + attrs[SERVER_PORT] = self._server_port + if status_code != grpc.StatusCode.OK: + attrs[ERROR_TYPE] = status_code.name + return attrs + + def _record_duration(self, method, start_time, status_code): + if self._duration_histogram is None: + return + elapsed = time.perf_counter() - start_time + attrs = self._build_metric_attributes(method, status_code) + self._duration_histogram.record(elapsed, attributes=attrs) + + def _make_future_duration_callback(self, method, start_time): + def callback(response_future): + code = response_future.code() + status_code = code if code is not None else grpc.StatusCode.OK + self._record_duration(method, start_time, status_code) + + return callback + def _intercept(self, request, metadata, client_info, invoker): if not is_instrumentation_enabled(): return invoker(request, metadata) @@ -130,6 +216,9 @@ def _intercept(self, request, metadata, client_info, invoker): mutable_metadata = OrderedDict() else: mutable_metadata = OrderedDict(metadata) + + start_time = time.perf_counter() + with self._start_span( client_info.full_method, end_on_exit=False, @@ -152,10 +241,13 @@ def _intercept(self, request, metadata, client_info, invoker): result = invoker(request, metadata) except Exception as exc: if isinstance(exc, grpc.RpcError): + status_code = exc.code() span.set_attribute( RPC_GRPC_STATUS_CODE, - exc.code().value[0], + status_code.value[0], ) + else: + status_code = grpc.StatusCode.UNKNOWN span.set_status( Status( status_code=StatusCode.ERROR, @@ -163,11 +255,17 @@ def _intercept(self, request, metadata, client_info, invoker): ) ) span.record_exception(exc) + self._record_duration( + client_info.full_method, start_time, status_code + ) raise exc finally: if result is None: span.end() - return self._trace_result(span, rpc_info, result) + return self._trace_result( + span, rpc_info, result, + client_info.full_method, start_time, + ) def _call_request_hook(self, span, request): if not callable(self._request_hook): @@ -195,6 +293,9 @@ def _intercept_server_stream( else: mutable_metadata = OrderedDict(metadata) + start_time = time.perf_counter() + status_code = grpc.StatusCode.OK + with self._start_span(client_info.full_method) as span: inject(mutable_metadata, setter=_carrier_setter) metadata = tuple(mutable_metadata.items()) @@ -210,9 +311,14 @@ def _intercept_server_stream( try: yield from invoker(request_or_iterator, metadata) except grpc.RpcError as err: + status_code = err.code() span.set_status(Status(StatusCode.ERROR)) - span.set_attribute(RPC_GRPC_STATUS_CODE, err.code().value[0]) + span.set_attribute(RPC_GRPC_STATUS_CODE, status_code.value[0]) raise err + finally: + self._record_duration( + client_info.full_method, start_time, status_code + ) def intercept_stream( self, request_or_iterator, metadata, client_info, invoker diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index e55e6c0994..6ee84fa0a0 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -11,6 +11,7 @@ """ import logging +import time from contextlib import contextmanager from urllib.parse import unquote @@ -19,6 +20,9 @@ from opentelemetry import trace from opentelemetry.context import attach, detach from opentelemetry.propagate import extract +from opentelemetry.semconv._incubating.attributes.error_attributes import ( + ERROR_TYPE, +) from opentelemetry.semconv._incubating.attributes.net_attributes import ( NET_PEER_IP, NET_PEER_NAME, @@ -27,14 +31,36 @@ from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( RPC_GRPC_STATUS_CODE, RPC_METHOD, + RPC_RESPONSE_STATUS_CODE, RPC_SERVICE, RPC_SYSTEM, + RPC_SYSTEM_NAME, + RpcSystemNameValues, +) +from opentelemetry.semconv._incubating.metrics.rpc_metrics import ( + RPC_SERVER_CALL_DURATION, ) from ._utilities import _server_status logger = logging.getLogger(__name__) +_DEFAULT_RPC_METHOD = "_OTHER" + +_RPC_DURATION_BUCKET_BOUNDARIES = ( + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, + 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10, +) + + +def _create_duration_histogram(meter): + return meter.create_histogram( + name=RPC_SERVER_CALL_DURATION, + description="Measures the duration of an incoming Remote Procedure Call (RPC).", + unit="s", + explicit_bucket_boundaries_advisory=_RPC_DURATION_BUCKET_BOUNDARIES, + ) + # wrap an RPC call # see https://github.com/grpc/grpc/issues/18191 @@ -184,9 +210,12 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor): """ - def __init__(self, tracer, filter_=None): + def __init__(self, tracer, filter_=None, meter=None): self._tracer = tracer self._filter = filter_ + self._duration_histogram = ( + _create_duration_histogram(meter) if meter else None + ) @contextmanager def _set_remote_context(self, servicer_context): @@ -267,6 +296,29 @@ def _start_span( set_status_on_exception=set_status_on_exception, ) + def _build_metric_attributes(self, handler_call_details, status_code): + method = handler_call_details.method + full_method = method.lstrip("/") if method else _DEFAULT_RPC_METHOD + attrs = { + RPC_SYSTEM_NAME: RpcSystemNameValues.GRPC.value, + RPC_METHOD: full_method, + RPC_RESPONSE_STATUS_CODE: status_code.name, + } + if status_code != grpc.StatusCode.OK: + attrs[ERROR_TYPE] = status_code.name + return attrs + + def _record_duration( + self, handler_call_details, start_time, status_code + ): + if self._duration_histogram is None: + return + elapsed = time.perf_counter() - start_time + attrs = self._build_metric_attributes( + handler_call_details, status_code + ) + self._duration_histogram.record(elapsed, attributes=attrs) + def intercept_service(self, continuation, handler_call_details): if self._filter is not None and not self._filter(handler_call_details): return continuation(handler_call_details) @@ -282,6 +334,8 @@ def telemetry_interceptor(request_or_iterator, context): context, ) + start_time = time.perf_counter() + with self._set_remote_context(context): with self._start_span( handler_call_details, @@ -292,6 +346,7 @@ def telemetry_interceptor(request_or_iterator, context): context = _OpenTelemetryServicerContext(context, span) # And now we run the actual RPC. + metric_status = None try: return behavior(request_or_iterator, context) @@ -302,8 +357,17 @@ def telemetry_interceptor(request_or_iterator, context): # pylint:disable=unidiomatic-typecheck if type(error) != Exception: # noqa: E721 span.record_exception(error) + if context._code == grpc.StatusCode.OK: + metric_status = grpc.StatusCode.UNKNOWN raise error + finally: + self._record_duration( + handler_call_details, + start_time, + metric_status or context._code, + ) + return telemetry_interceptor return _wrap_rpc_behavior( @@ -316,12 +380,15 @@ def telemetry_interceptor(request_or_iterator, context): def _intercept_server_stream( self, behavior, handler_call_details, request_or_iterator, context ): + start_time = time.perf_counter() + with self._set_remote_context(context): with self._start_span( handler_call_details, context, set_status_on_exception=False ) as span: context = _OpenTelemetryServicerContext(context, span) + metric_status = None try: yield from behavior(request_or_iterator, context) @@ -329,4 +396,13 @@ def _intercept_server_stream( # pylint:disable=unidiomatic-typecheck if type(error) != Exception: # noqa: E721 span.record_exception(error) + if context._code == grpc.StatusCode.OK: + metric_status = grpc.StatusCode.UNKNOWN raise error + + finally: + self._record_duration( + handler_call_details, + start_time, + metric_status or context._code, + ) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_metrics.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_metrics.py new file mode 100644 index 0000000000..83d21958b9 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_client_interceptor_metrics.py @@ -0,0 +1,155 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +from unittest import IsolatedAsyncioTestCase + +import grpc + +from opentelemetry.instrumentation.grpc import GrpcAioInstrumentorClient +from opentelemetry.semconv._incubating.attributes.error_attributes import ( + ERROR_TYPE, +) +from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( + RPC_METHOD, + RPC_RESPONSE_STATUS_CODE, + RPC_SYSTEM_NAME, + RpcSystemNameValues, +) +from opentelemetry.semconv._incubating.attributes.server_attributes import ( + SERVER_ADDRESS, + SERVER_PORT, +) +from opentelemetry.semconv._incubating.metrics.rpc_metrics import ( + RPC_CLIENT_CALL_DURATION, +) +from opentelemetry.test.test_base import TestBase + +from ._aio_client import server_streaming_method, simple_method +from ._server import create_test_server +from .protobuf import test_server_pb2_grpc + + +class TestAioClientInterceptorMetrics(TestBase, IsolatedAsyncioTestCase): + def setUp(self): + super().setUp() + GrpcAioInstrumentorClient().instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + self.server = create_test_server(25565) + self.server.start() + + def tearDown(self): + super().tearDown() + GrpcAioInstrumentorClient().uninstrument() + self.server.stop(None) + + async def test_unary_call_records_duration_metric(self): + """Aio client RPC produces an rpc.client.call.duration histogram.""" + channel = grpc.aio.insecure_channel("localhost:25565") + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + + try: + await simple_method(stub) + finally: + await channel.close() + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_CLIENT_CALL_DURATION), + None, + ) + + self.assertIsNotNone( + duration_metric, + f"Expected metric '{RPC_CLIENT_CALL_DURATION}' not found. " + f"Got: {[m.name for m in metrics]}", + ) + self.assertEqual(duration_metric.unit, "s") + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual(attrs[RPC_METHOD], "GRPCTestServer/SimpleMethod") + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "OK") + self.assertEqual(attrs[SERVER_ADDRESS], "localhost") + self.assertEqual(attrs[SERVER_PORT], 25565) + self.assertNotIn(ERROR_TYPE, attrs) + + async def test_error_call_records_status_in_metric(self): + """Aio client metric records the gRPC error status code on failure.""" + channel = grpc.aio.insecure_channel("localhost:25565") + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + + try: + with self.assertRaises(grpc.aio.AioRpcError): + await simple_method(stub, error=True) + finally: + await channel.close() + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_CLIENT_CALL_DURATION), + None, + ) + + self.assertIsNotNone(duration_metric) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual(attrs[RPC_METHOD], "GRPCTestServer/SimpleMethod") + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "INVALID_ARGUMENT") + self.assertEqual(attrs[ERROR_TYPE], "INVALID_ARGUMENT") + + async def test_server_streaming_records_duration_metric(self): + """Aio client metric is recorded for a server-streaming RPC.""" + channel = grpc.aio.insecure_channel("localhost:25565") + stub = test_server_pb2_grpc.GRPCTestServerStub(channel) + + try: + async for _ in server_streaming_method(stub): + pass + finally: + await channel.close() + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_CLIENT_CALL_DURATION), + None, + ) + + self.assertIsNotNone( + duration_metric, + f"Expected metric '{RPC_CLIENT_CALL_DURATION}' not found " + f"for streaming RPC. Got: {[m.name for m in metrics]}", + ) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual( + attrs[RPC_METHOD], "GRPCTestServer/ServerStreamingMethod" + ) + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "OK") diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_metrics.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_metrics.py new file mode 100644 index 0000000000..3e37e5d012 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_aio_server_interceptor_metrics.py @@ -0,0 +1,234 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +from unittest import IsolatedAsyncioTestCase + +import grpc +import grpc.aio + +from opentelemetry.instrumentation.grpc import aio_server_interceptor +from opentelemetry.semconv._incubating.attributes.error_attributes import ( + ERROR_TYPE, +) +from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( + RPC_METHOD, + RPC_RESPONSE_STATUS_CODE, + RPC_SYSTEM_NAME, + RpcSystemNameValues, +) +from opentelemetry.semconv._incubating.metrics.rpc_metrics import ( + RPC_SERVER_CALL_DURATION, +) +from opentelemetry.test.test_base import TestBase + +from .protobuf.test_server_pb2 import Request, Response +from .protobuf.test_server_pb2_grpc import ( + GRPCTestServerServicer, + add_GRPCTestServerServicer_to_server, +) + + +class Servicer(GRPCTestServerServicer): + async def SimpleMethod(self, request, context): + return Response( + server_id=request.client_id, + response_data=request.request_data, + ) + + async def ServerStreamingMethod(self, request, context): + for data in ("one", "two", "three"): + yield Response( + server_id=request.client_id, + response_data=data, + ) + + +class TestAioServerInterceptorMetrics(TestBase, IsolatedAsyncioTestCase): + async def test_unary_call_records_duration_metric(self): + """Aio server interceptor records rpc.server.call.duration on unary RPC.""" + interceptor = aio_server_interceptor( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + server = grpc.aio.server(interceptors=[interceptor]) + add_GRPCTestServerServicer_to_server(Servicer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.aio.insecure_channel(f"localhost:{port:d}") + + await server.start() + try: + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + await channel.unary_unary(rpc_call)(msg) + finally: + await channel.close() + await server.stop(None) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_SERVER_CALL_DURATION), + None, + ) + + self.assertIsNotNone( + duration_metric, + f"Expected metric '{RPC_SERVER_CALL_DURATION}' not found. " + f"Got: {[m.name for m in metrics]}", + ) + self.assertEqual(duration_metric.unit, "s") + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual(attrs[RPC_METHOD], "GRPCTestServer/SimpleMethod") + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "OK") + self.assertNotIn(ERROR_TYPE, attrs) + + async def test_error_call_records_status_in_metric(self): + """Aio server metric records gRPC error status code on abort.""" + + class ErrorServicer(GRPCTestServerServicer): + async def SimpleMethod(self, request, context): + await context.abort(grpc.StatusCode.INTERNAL, "test failure") + + interceptor = aio_server_interceptor( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + server = grpc.aio.server(interceptors=[interceptor]) + add_GRPCTestServerServicer_to_server(ErrorServicer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.aio.insecure_channel(f"localhost:{port:d}") + + await server.start() + try: + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + with self.assertRaises(grpc.aio.AioRpcError): + await channel.unary_unary(rpc_call)(msg) + finally: + await channel.close() + await server.stop(None) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_SERVER_CALL_DURATION), + None, + ) + + self.assertIsNotNone(duration_metric) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual(attrs[RPC_METHOD], "GRPCTestServer/SimpleMethod") + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "INTERNAL") + self.assertEqual(attrs[ERROR_TYPE], "INTERNAL") + + async def test_uncaught_exception_records_unknown_status(self): + """Uncaught handler exception records UNKNOWN status in metric.""" + + class CrashingServicer(GRPCTestServerServicer): + async def SimpleMethod(self, request, context): + raise RuntimeError("unexpected crash") + + interceptor = aio_server_interceptor( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + server = grpc.aio.server(interceptors=[interceptor]) + add_GRPCTestServerServicer_to_server(CrashingServicer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.aio.insecure_channel(f"localhost:{port:d}") + + await server.start() + try: + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + with self.assertRaises(grpc.aio.AioRpcError): + await channel.unary_unary(rpc_call)(msg) + finally: + await channel.close() + await server.stop(None) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_SERVER_CALL_DURATION), + None, + ) + + self.assertIsNotNone(duration_metric) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + attrs = dict(point.attributes) + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "UNKNOWN") + self.assertEqual(attrs[ERROR_TYPE], "UNKNOWN") + + async def test_streaming_call_records_duration_metric(self): + """Aio server interceptor records metric on a streaming RPC.""" + interceptor = aio_server_interceptor( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + server = grpc.aio.server(interceptors=[interceptor]) + add_GRPCTestServerServicer_to_server(Servicer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.aio.insecure_channel(f"localhost:{port:d}") + + await server.start() + try: + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + async for _ in channel.unary_stream(rpc_call)(msg): + pass + finally: + await channel.close() + await server.stop(None) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_SERVER_CALL_DURATION), + None, + ) + + self.assertIsNotNone(duration_metric) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual( + attrs[RPC_METHOD], "GRPCTestServer/ServerStreamingMethod" + ) + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "OK") diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_metrics.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_metrics.py new file mode 100644 index 0000000000..e41de2ba7e --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_client_interceptor_metrics.py @@ -0,0 +1,172 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +import grpc + +from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient +from opentelemetry.semconv._incubating.attributes.error_attributes import ( + ERROR_TYPE, +) +from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( + RPC_METHOD, + RPC_RESPONSE_STATUS_CODE, + RPC_SYSTEM_NAME, + RpcSystemNameValues, +) +from opentelemetry.semconv._incubating.attributes.server_attributes import ( + SERVER_ADDRESS, + SERVER_PORT, +) +from opentelemetry.semconv._incubating.metrics.rpc_metrics import ( + RPC_CLIENT_CALL_DURATION, +) +from opentelemetry.test.test_base import TestBase + +from ._client import ( + server_streaming_method, + simple_method, + simple_method_future, +) +from ._server import create_test_server +from .protobuf import test_server_pb2_grpc + + +class TestClientInterceptorMetrics(TestBase): + def setUp(self): + super().setUp() + GrpcInstrumentorClient().instrument( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + self.server = create_test_server(25565) + self.server.start() + self.channel = grpc.insecure_channel("localhost:25565") + self._stub = test_server_pb2_grpc.GRPCTestServerStub(self.channel) + + def tearDown(self): + super().tearDown() + GrpcInstrumentorClient().uninstrument() + self.server.stop(None) + self.channel.close() + + def test_unary_call_records_duration_metric(self): + """A unary client RPC produces an rpc.client.call.duration histogram.""" + simple_method(self._stub) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_CLIENT_CALL_DURATION), + None, + ) + + self.assertIsNotNone( + duration_metric, + f"Expected metric '{RPC_CLIENT_CALL_DURATION}' not found. " + f"Got: {[m.name for m in metrics]}", + ) + self.assertEqual(duration_metric.unit, "s") + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + self.assertEqual( + point.explicit_bounds, + (0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10), + ) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual(attrs[RPC_METHOD], "GRPCTestServer/SimpleMethod") + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "OK") + self.assertEqual(attrs[SERVER_ADDRESS], "localhost") + self.assertEqual(attrs[SERVER_PORT], 25565) + self.assertNotIn(ERROR_TYPE, attrs) + + def test_error_call_records_status_in_metric(self): + """Client metric records the gRPC error status code on failure.""" + with self.assertRaises(grpc.RpcError): + simple_method(self._stub, error=True) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_CLIENT_CALL_DURATION), + None, + ) + + self.assertIsNotNone(duration_metric) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual(attrs[RPC_METHOD], "GRPCTestServer/SimpleMethod") + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "INVALID_ARGUMENT") + self.assertEqual(attrs[ERROR_TYPE], "INVALID_ARGUMENT") + + def test_server_streaming_records_duration_metric(self): + """Client metric is recorded for a server-streaming RPC.""" + server_streaming_method(self._stub) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_CLIENT_CALL_DURATION), + None, + ) + + self.assertIsNotNone( + duration_metric, + f"Expected metric '{RPC_CLIENT_CALL_DURATION}' not found " + f"for streaming RPC. Got: {[m.name for m in metrics]}", + ) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual( + attrs[RPC_METHOD], "GRPCTestServer/ServerStreamingMethod" + ) + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "OK") + + def test_future_call_records_correct_status(self): + """Future-based call defers metric recording until completion.""" + future = simple_method_future(self._stub, error=True) + with self.assertRaises(grpc.RpcError): + future.result() + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_CLIENT_CALL_DURATION), + None, + ) + + self.assertIsNotNone(duration_metric) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + + attrs = dict(point.attributes) + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "INVALID_ARGUMENT") + self.assertEqual(attrs[ERROR_TYPE], "INVALID_ARGUMENT") diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_metrics.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_metrics.py new file mode 100644 index 0000000000..bf8dc96af2 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor_metrics.py @@ -0,0 +1,257 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +import contextlib +from concurrent import futures + +import grpc + +from opentelemetry.instrumentation.grpc import server_interceptor +from opentelemetry.semconv._incubating.attributes.error_attributes import ( + ERROR_TYPE, +) +from opentelemetry.semconv._incubating.attributes.rpc_attributes import ( + RPC_METHOD, + RPC_RESPONSE_STATUS_CODE, + RPC_SYSTEM_NAME, + RpcSystemNameValues, +) +from opentelemetry.semconv._incubating.metrics.rpc_metrics import ( + RPC_SERVER_CALL_DURATION, +) +from opentelemetry.test.test_base import TestBase + +from .protobuf.test_server_pb2 import Request, Response +from .protobuf.test_server_pb2_grpc import ( + GRPCTestServerServicer, + add_GRPCTestServerServicer_to_server, +) + + +class Servicer(GRPCTestServerServicer): + def SimpleMethod(self, request, context): + return Response( + server_id=request.client_id, + response_data=request.request_data, + ) + + +class TestServerInterceptorMetrics(TestBase): + @contextlib.contextmanager + def server(self, max_workers=1, interceptors=None): + with futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=interceptors or [], + ) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + yield server, channel + + def test_unary_call_records_duration_metric(self): + """A unary server RPC produces an rpc.server.call.duration histogram.""" + interceptor = server_interceptor( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + with self.server( + max_workers=1, interceptors=[interceptor] + ) as (server, channel): + add_GRPCTestServerServicer_to_server(Servicer(), server) + + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + channel.unary_unary(rpc_call)(msg) + finally: + server.stop(None) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_SERVER_CALL_DURATION), + None, + ) + + self.assertIsNotNone( + duration_metric, + f"Expected metric '{RPC_SERVER_CALL_DURATION}' not found. " + f"Got: {[m.name for m in metrics]}", + ) + self.assertEqual(duration_metric.unit, "s") + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + self.assertEqual( + point.explicit_bounds, + (0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10), + ) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual(attrs[RPC_METHOD], "GRPCTestServer/SimpleMethod") + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "OK") + self.assertNotIn(ERROR_TYPE, attrs) + + def test_error_call_records_status_in_metric(self): + """Server metric records the gRPC error status code on abort.""" + + class ErrorServicer(GRPCTestServerServicer): + def SimpleMethod(self, request, context): + context.abort(grpc.StatusCode.INTERNAL, "test failure") + + interceptor = server_interceptor( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + with self.server( + max_workers=1, interceptors=[interceptor] + ) as (server, channel): + add_GRPCTestServerServicer_to_server(ErrorServicer(), server) + + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + with self.assertRaises(grpc.RpcError): + channel.unary_unary(rpc_call)(msg) + finally: + server.stop(None) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_SERVER_CALL_DURATION), + None, + ) + + self.assertIsNotNone(duration_metric) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual(attrs[RPC_METHOD], "GRPCTestServer/SimpleMethod") + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "INTERNAL") + self.assertEqual(attrs[ERROR_TYPE], "INTERNAL") + + def test_uncaught_exception_records_unknown_status(self): + """Uncaught handler exception records UNKNOWN status in metric.""" + + class CrashingServicer(GRPCTestServerServicer): + def SimpleMethod(self, request, context): + raise RuntimeError("unexpected crash") + + interceptor = server_interceptor( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + with self.server( + max_workers=1, interceptors=[interceptor] + ) as (server, channel): + add_GRPCTestServerServicer_to_server(CrashingServicer(), server) + + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + with self.assertRaises(grpc.RpcError): + channel.unary_unary(rpc_call)(msg) + finally: + server.stop(None) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_SERVER_CALL_DURATION), + None, + ) + + self.assertIsNotNone(duration_metric) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + attrs = dict(point.attributes) + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "UNKNOWN") + self.assertEqual(attrs[ERROR_TYPE], "UNKNOWN") + + def test_streaming_call_records_duration_metric(self): + """Server interceptor records metric on a streaming RPC.""" + + class StreamingServicer(GRPCTestServerServicer): + def ServerStreamingMethod(self, request, context): + for data in ("one", "two", "three"): + yield Response( + server_id=request.client_id, + response_data=data, + ) + + interceptor = server_interceptor( + tracer_provider=self.tracer_provider, + meter_provider=self.meter_provider, + ) + + with self.server( + max_workers=1, interceptors=[interceptor] + ) as (server, channel): + add_GRPCTestServerServicer_to_server( + StreamingServicer(), server + ) + + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + list(channel.unary_stream(rpc_call)(msg)) + finally: + server.stop(None) + + metrics = self.get_sorted_metrics() + duration_metric = next( + (m for m in metrics if m.name == RPC_SERVER_CALL_DURATION), + None, + ) + + self.assertIsNotNone( + duration_metric, + f"Expected metric '{RPC_SERVER_CALL_DURATION}' not found " + f"for streaming RPC. Got: {[m.name for m in metrics]}", + ) + + data_points = list(duration_metric.data.data_points) + self.assertEqual(len(data_points), 1) + + point = data_points[0] + self.assertEqual(point.count, 1) + self.assertGreater(point.sum, 0) + + attrs = dict(point.attributes) + self.assertEqual( + attrs[RPC_SYSTEM_NAME], RpcSystemNameValues.GRPC.value + ) + self.assertEqual( + attrs[RPC_METHOD], "GRPCTestServer/ServerStreamingMethod" + ) + self.assertEqual(attrs[RPC_RESPONSE_STATUS_CODE], "OK")