diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index f6de2a2d76..f67f938d53 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -67,8 +67,13 @@ def run(): import grpc - from opentelemetry import trace + from opentelemetry import metrics, trace from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import ( + ConsoleMetricExporter, + PeriodicExportingMetricReader, + ) from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ( ConsoleSpanExporter, @@ -80,6 +85,12 @@ def run(): except ImportError: from gen import helloworld_pb2, helloworld_pb2_grpc + exporter = ConsoleMetricExporter() + reader = PeriodicExportingMetricReader(exporter) + metrics.set_meter_provider( + MeterProvider(metric_readers=[reader]) + ) + trace.set_tracer_provider(TracerProvider()) trace.get_tracer_provider().add_span_processor( SimpleSpanProcessor(ConsoleSpanExporter()) @@ -123,7 +134,7 @@ def serve(): import grpc # pylint:disable=import-self from wrapt import wrap_function_wrapper as _wrap -from opentelemetry import trace +from opentelemetry import metrics, trace from opentelemetry.instrumentation.grpc.grpcext import intercept_channel from opentelemetry.instrumentation.grpc.package import _instruments from opentelemetry.instrumentation.grpc.version import __version__ @@ -153,17 +164,24 @@ def instrumentation_dependencies(self) -> Collection[str]: def _instrument(self, **kwargs): self._original_func = grpc.server + meter_provider = kwargs.get("meter_provider") tracer_provider = kwargs.get("tracer_provider") def server(*args, **kwargs): if "interceptors" in kwargs: # add our interceptor as the first kwargs["interceptors"].insert( - 0, server_interceptor(tracer_provider=tracer_provider) + 0, server_interceptor( + meter_provider=meter_provider, + tracer_provider=tracer_provider + ) ) else: kwargs["interceptors"] = [ - server_interceptor(tracer_provider=tracer_provider) + server_interceptor( + meter_provider=meter_provider, + tracer_provider=tracer_provider + ) ] return self._original_func(*args, **kwargs) @@ -240,17 +258,19 @@ def client_interceptor(tracer_provider=None): return _client.OpenTelemetryClientInterceptor(tracer) -def server_interceptor(tracer_provider=None): +def server_interceptor(meter_provider=None, tracer_provider=None): """Create a gRPC server interceptor. Args: - tracer: The tracer to use to create server-side spans. + meter_provider: The meter provider which allows acess to the meter. + tracer_provider: The tracer provider which allows acess to the tracer. Returns: A service-side interceptor object. """ from . import _server + meter = metrics.get_meter(__name__, __version__, meter_provider) tracer = trace.get_tracer(__name__, __version__, tracer_provider) - return _server.OpenTelemetryServerInterceptor(tracer) + return _server.OpenTelemetryServerInterceptor(meter, tracer) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 9329eb7077..1bcaeb776d 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -29,12 +29,21 @@ from opentelemetry import trace from opentelemetry.context import attach, detach from opentelemetry.propagate import extract -from opentelemetry.semconv.trace import RpcSystemValues, SpanAttributes +from opentelemetry.semconv.trace import MessageTypeValues, RpcSystemValues, SpanAttributes from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.util._time import _time_ns + logger = logging.getLogger(__name__) +_MESSAGE = "message" +"""event name of a message.""" + +_RPC_USER_AGENT = "rpc.user_agent" +"""span attribute for RPC user agent.""" + + # wrap an RPC call # see https://github.com/grpc/grpc/issues/18191 def _wrap_rpc_behavior(handler, continuation): @@ -63,8 +72,25 @@ def _wrap_rpc_behavior(handler, continuation): ) +def _add_message_event( + span, + message_type, + message_size_by, + message_id=1 +): + span.add_event( + _MESSAGE, + { + SpanAttributes.MESSAGE_TYPE: message_type, + SpanAttributes.MESSAGE_ID: message_id, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: message_size_by, + } + ) + + # pylint:disable=abstract-method class _OpenTelemetryServicerContext(grpc.ServicerContext): + def __init__(self, servicer_context, active_span): self._servicer_context = servicer_context self._active_span = active_span @@ -83,7 +109,7 @@ def time_remaining(self, *args, **kwargs): def cancel(self, *args, **kwargs): self._code = grpc.StatusCode.CANCELLED - self._details = self._code.value[1] + self._details = grpc.StatusCode.CANCELLED.value[1] self._active_span.set_attribute( SpanAttributes.RPC_GRPC_STATUS_CODE, self._code.value[0] ) @@ -101,8 +127,8 @@ def add_callback(self, *args, **kwargs): def disable_next_message_compression(self): return self._service_context.disable_next_message_compression() - def invocation_metadata(self, *args, **kwargs): - return self._servicer_context.invocation_metadata(*args, **kwargs) + def invocation_metadata(self): + return self._servicer_context.invocation_metadata() def peer(self): return self._servicer_context.peer() @@ -205,24 +231,53 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor): Usage:: + meter = some OpenTelemetry meter tracer = some OpenTelemetry tracer interceptors = [ - OpenTelemetryServerInterceptor(tracer), + OpenTelemetryServerInterceptor(meter, tracer), ] server = grpc.server( futures.ThreadPoolExecutor(max_workers=concurrency), - interceptors = interceptors) + interceptors = interceptors + ) """ - def __init__(self, tracer): + def __init__(self, meter, tracer): + self._meter = meter self._tracer = tracer + self._duration_histogram = self._meter.create_histogram( + name="rpc.server.duration", + unit="ms", + description="measures the duration of the inbound rpc", + ) + self._request_size_histogram = self._meter.create_histogram( + name="rpc.server.request.size", + unit="By", + description="measures size of RPC request messages (uncompressed)", + ) + self._response_size_histogram = self._meter.create_histogram( + name="rpc.server.response.size", + unit="By", + description="measures size of RPC response messages (uncompressed)", + ) + self._requests_per_rpc_histogram = self._meter.create_histogram( + name="rpc.server.requests_per_rpc", + unit="requests", + description="measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs", + ) + self._responses_per_rpc_histogram = self._meter.create_histogram( + name="rpc.server.responses_per_rpc", + unit="responses", + description="measures the number of messages sent per RPC. Should be 1 for all non-streaming RPCs", + ) + @contextmanager - def _set_remote_context(self, servicer_context): - metadata = servicer_context.invocation_metadata() + def _set_remote_context(self, context): + metadata = context.invocation_metadata() if metadata: md_dict = {md.key: md.value for md in metadata} ctx = extract(md_dict) @@ -234,32 +289,26 @@ def _set_remote_context(self, servicer_context): else: yield - def _start_span( - self, handler_call_details, context, set_status_on_exception=False - ): - + def _create_attributes(self, context, full_method): # standard attributes attributes = { SpanAttributes.RPC_SYSTEM: RpcSystemValues.GRPC.value, SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[0], } - # if we have details about the call, split into service and method - if handler_call_details.method: - service, method = handler_call_details.method.lstrip("/").split( - "/", 1 - ) - attributes.update( - { - SpanAttributes.RPC_SERVICE: service, - SpanAttributes.RPC_METHOD: method, - } - ) + # add service and method attributes + service, method = full_method.lstrip("/").split("/", 1) + attributes.update( + { + SpanAttributes.RPC_SERVICE: service, + SpanAttributes.RPC_METHOD: method, + } + ) # add some attributes from the metadata metadata = dict(context.invocation_metadata()) if "user-agent" in metadata: - attributes["rpc.user_agent"] = metadata["user-agent"] + attributes[_RPC_USER_AGENT] = metadata["user-agent"] # Split up the peer to keep with how other telemetry sources # do it. This looks like: @@ -285,72 +334,159 @@ def _start_span( except IndexError: logger.warning("Failed to parse peer address '%s'", context.peer()) - return self._tracer.start_as_current_span( - name=handler_call_details.method, - kind=trace.SpanKind.SERVER, - attributes=attributes, - set_status_on_exception=set_status_on_exception, - ) + return attributes def intercept_service(self, continuation, handler_call_details): def telemetry_wrapper(behavior, request_streaming, response_streaming): def telemetry_interceptor(request_or_iterator, context): - - # handle streaming responses specially - if response_streaming: - return self._intercept_server_stream( - behavior, - handler_call_details, - request_or_iterator, - context, - ) - with self._set_remote_context(context): - with self._start_span( - handler_call_details, - context, - set_status_on_exception=False, + attributes = self._create_attributes(context, handler_call_details.method) + + with self._tracer.start_as_current_span( + name=handler_call_details.method, + kind=trace.SpanKind.SERVER, + attributes=attributes, + end_on_exit=False, + record_exception=False, + set_status_on_exception=False ) as span: - # wrap the context - context = _OpenTelemetryServicerContext(context, span) - # And now we run the actual RPC. try: - return behavior(request_or_iterator, context) - - except Exception as error: + # wrap the context + context = _OpenTelemetryServicerContext(context, span) + + # wrap / log the request (iterator) + if request_streaming: + request_or_iterator = self._log_stream_requests( + request_or_iterator, span, attributes + ) + else: + self._log_unary_request( + request_or_iterator, span, attributes + ) + + # call the actual RPC and track the duration + with self._record_duration(attributes, context): + response_or_iterator = behavior(request_or_iterator, context) + + # wrap / log the response (iterator) + if response_streaming: + response_or_iterator = self._log_stream_responses( + response_or_iterator, span, attributes, context + ) + else: + self._log_unary_response( + response_or_iterator, span, attributes, context + ) + + return response_or_iterator + + except Exception as exc: # Bare exceptions are likely to be gRPC aborts, which # we handle in our context wrapper. # Here, we're interested in uncaught exceptions. # pylint:disable=unidiomatic-typecheck - if type(error) != Exception: - span.record_exception(error) - raise error + if type(exc) != Exception: + span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, + grpc.StatusCode.UNKNOWN.value[0] + ) + span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{type(exc).__name__}: {exc}", + ) + ) + span.record_exception(exc) + raise exc + + finally: + if not response_streaming: + span.end() return telemetry_interceptor return _wrap_rpc_behavior( - continuation(handler_call_details), telemetry_wrapper + continuation(handler_call_details), + telemetry_wrapper + ) + + def _log_unary_request(self, request, active_span, attributes): + message_size_by = request.ByteSize() + _add_message_event( + active_span, MessageTypeValues.RECEIVED.value, message_size_by + ) + self._request_size_histogram.record(message_size_by, attributes) + self._requests_per_rpc_histogram.record(1, attributes) + + def _log_unary_response(self, response, active_span, attributes, context): + message_size_by = response.ByteSize() + _add_message_event( + active_span, MessageTypeValues.SENT.value, message_size_by ) + if context._code != grpc.StatusCode.OK: + attributes[SpanAttributes.RPC_GRPC_STATUS_CODE] = context._code.value[0] + self._response_size_histogram.record(message_size_by, attributes) + self._responses_per_rpc_histogram.record(1, attributes) + + def _log_stream_requests(self, request_iterator, active_span, attributes): + req_id = 1 + for req_id, msg in enumerate(request_iterator, start=1): + message_size_by = msg.ByteSize() + _add_message_event( + active_span, MessageTypeValues.RECEIVED.value, message_size_by, message_id=req_id + ) + self._request_size_histogram.record(message_size_by, attributes) + yield msg + + self._requests_per_rpc_histogram.record(req_id, attributes) + + def _log_stream_responses(self, response_iterator, active_span, attributes, context): + with trace.use_span( + active_span, + end_on_exit=True, + record_exception=False, + set_status_on_exception=False + ): + try: + res_id = 1 + for res_id, msg in enumerate(response_iterator, start=1): + message_size_by = msg.ByteSize() + _add_message_event( + active_span, MessageTypeValues.SENT.value, message_size_by, message_id=res_id + ) + self._response_size_histogram.record(message_size_by, attributes) + yield msg + except Exception as exc: + # Bare exceptions are likely to be gRPC aborts, which + # we handle in our context wrapper. + # Here, we're interested in uncaught exceptions. + # pylint:disable=unidiomatic-typecheck + if type(exc) != Exception: + active_span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, + grpc.StatusCode.UNKNOWN.value[0] + ) + active_span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{type(exc).__name__}: {exc}", + ) + ) + active_span.record_exception(exc) + raise exc + finally: + if context._code != grpc.StatusCode.OK: + attributes[SpanAttributes.RPC_GRPC_STATUS_CODE] = context._code.value[0] + self._responses_per_rpc_histogram.record(res_id, attributes) - # Handle streaming responses separately - we have to do this - # to return a *new* generator or various upstream things - # get confused, or we'll lose the consistent trace - def _intercept_server_stream( - self, behavior, handler_call_details, request_or_iterator, context - ): - - with self._set_remote_context(context): - with self._start_span( - handler_call_details, context, set_status_on_exception=False - ) as span: - context = _OpenTelemetryServicerContext(context, span) - - try: - yield from behavior(request_or_iterator, context) - - except Exception as error: - # pylint:disable=unidiomatic-typecheck - if type(error) != Exception: - span.record_exception(error) - raise error + @contextmanager + def _record_duration(self, attributes, context): + start = _time_ns() + try: + yield + finally: + duration = max(round((_time_ns() - start) * 1000), 0) + if context._code != grpc.StatusCode.OK: + attributes[SpanAttributes.RPC_GRPC_STATUS_CODE] = context._code.value[0] + self._duration_histogram.record(duration, attributes)