From 00651800e0dea16254e8d2f1f5b693a17b42e878 Mon Sep 17 00:00:00 2001 From: Corvin Lasogga Date: Sun, 14 Aug 2022 12:44:01 +0200 Subject: [PATCH] moved openetlemetry service context into utilities --- .../instrumentation/grpc/_server.py | 178 ++---------- .../instrumentation/grpc/_utilities.py | 255 ++++++++++++++---- 2 files changed, 224 insertions(+), 209 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index c070b99232..47297c7aa8 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -24,14 +24,14 @@ import copy import logging from contextlib import contextmanager -from typing import Callable, Dict, Iterable, Iterator, Generator, NoReturn, Optional +from typing import Callable, Iterator, Generator, Optional import grpc from opentelemetry import metrics, trace from opentelemetry.context import attach, detach -from opentelemetry.instrumentation.grpc._types import Metadata, ProtoMessage, ProtoMessageOrIterator -from opentelemetry.instrumentation.grpc._utilities import _EventMetricRecorder, _MetricKind +from opentelemetry.instrumentation.grpc._types import ProtoMessage, ProtoMessageOrIterator +from opentelemetry.instrumentation.grpc._utilities import _EventMetricRecorder, _MetricKind, _OpenTelemetryServicerContext from opentelemetry.propagate import extract from opentelemetry.semconv.trace import MessageTypeValues, RpcSystemValues, SpanAttributes from opentelemetry.trace.status import Status, StatusCode @@ -79,150 +79,6 @@ def _wrap_rpc_behavior( ) -# pylint:disable=abstract-method -class _OpenTelemetryServicerContext(grpc.ServicerContext): - - def __init__( - self, - servicer_context: grpc.ServicerContext, - active_span: trace.Span - ) -> None: - self._servicer_context = servicer_context - self._active_span = active_span - self._code = grpc.StatusCode.OK - self._details = None - super().__init__() - - def __getattr__(self, attr): - return getattr(self._servicer_context, attr) - - # Interface of grpc.RpcContext - - # pylint: disable=invalid-name - def add_callback(self, fn: Callable[[], None]) -> None: - return self._servicer_context.add_callback(fn) - - def cancel(self) -> None: - self._code = grpc.StatusCode.CANCELLED - self._details = grpc.StatusCode.CANCELLED.value[1] - self._active_span.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, self._code.value[0] - ) - self._active_span.set_status( - Status( - status_code=StatusCode.ERROR, - description=f"{self._code}: {self._details}", - ) - ) - return self._servicer_context.cancel() - - def is_active(self) -> bool: - return self._servicer_context.is_active() - - def time_remaining(self) -> Optional[float]: - return self._servicer_context.time_remaining() - - # Interface of grpc.ServicerContext - - def abort(self, code: grpc.StatusCode, details: str) -> NoReturn: - if not hasattr(self._servicer_context, "abort"): - raise RuntimeError( - "abort() is not supported with the installed version of grpcio" - ) - self._code = code - self._details = details - self._active_span.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0] - ) - self._active_span.set_status( - Status( - status_code=StatusCode.ERROR, - description=f"{code}: {details}", - ) - ) - return self._servicer_context.abort(code, details) - - def abort_with_status(self, status: grpc.Status) -> NoReturn: - if not hasattr(self._servicer_context, "abort_with_status"): - raise RuntimeError( - "abort_with_status() is not supported with the installed " - "version of grpcio" - ) - return self._servicer_context.abort_with_status(status) - - def auth_context(self) -> Dict[str, Iterable[bytes]]: - return self._servicer_context.auth_context() - - def code(self) -> grpc.StatusCode: - if not hasattr(self._servicer_context, "code"): - raise RuntimeError( - "code() is not supported with the installed version of grpcio" - ) - return self._servicer_context.code() - - def details(self) -> str: - if not hasattr(self._servicer_context, "details"): - raise RuntimeError( - "details() is not supported with the installed version of " - "grpcio" - ) - return self._servicer_context.details() - - def disable_next_message_compression(self) -> None: - return self._service_context.disable_next_message_compression() - - def invocation_metadata(self) -> Metadata: - return self._servicer_context.invocation_metadata() - - def peer(self) -> str: - return self._servicer_context.peer() - - def peer_identities(self) -> Optional[Iterable[bytes]]: - return self._servicer_context.peer_identities() - - def peer_identity_key(self) -> Optional[str]: - return self._servicer_context.peer_identity_key() - - def send_initial_metadata(self, initial_metadata: Metadata) -> None: - return self._servicer_context.send_initial_metadata(initial_metadata) - - def set_code(self, code: grpc.StatusCode) -> None: - self._code = code - # use details if we already have it, otherwise the status description - details = self._details or code.value[1] - self._active_span.set_attribute( - SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0] - ) - if code != grpc.StatusCode.OK: - self._active_span.set_status( - Status( - status_code=StatusCode.ERROR, - description=f"{code}: {details}", - ) - ) - return self._servicer_context.set_code(code) - - def set_compression(self, compression: grpc.Compression) -> None: - return self._servicer_context.set_compression(compression) - - def set_details(self, details: str) -> None: - self._details = details - if self._code != grpc.StatusCode.OK: - self._active_span.set_status( - Status( - status_code=StatusCode.ERROR, - description=f"{self._code}: {details}", - ) - ) - return self._servicer_context.set_details(details) - - def set_trailing_metadata(self, trailing_metadata: Metadata) -> None: - return self._servicer_context.set_trailing_metadata(trailing_metadata) - - def trailing_metadata(self) -> Metadata: - return self._servicer_context.trailing_metadata() - - # pylint:disable=abstract-method # pylint:disable=no-self-use # pylint:disable=unused-argument @@ -378,6 +234,7 @@ def intercept_unary_unary( context: grpc.ServicerContext, full_method: str ) -> ProtoMessage: + with self._set_remote_context(context): metric_attributes = self._create_attributes(context, full_method) span_attributes = copy.deepcopy(metric_attributes) @@ -393,12 +250,11 @@ def intercept_unary_unary( record_exception=False, set_status_on_exception=False ) as span: - with self._record_duration_manager(metric_attributes, context): + # wrap the context + context = _OpenTelemetryServicerContext(context, span) + with self._record_duration_manager(metric_attributes, context): try: - # wrap the context - context = _OpenTelemetryServicerContext(context, span) - # record the request self._record_unary_request( span, @@ -448,6 +304,7 @@ def intercept_unary_stream( context: grpc.ServicerContext, full_method: str ) -> Iterator[ProtoMessage]: + with self._set_remote_context(context): metric_attributes = self._create_attributes(context, full_method) span_attributes = copy.deepcopy(metric_attributes) @@ -463,12 +320,11 @@ def intercept_unary_stream( record_exception=False, set_status_on_exception=False ) as span: + # wrap the context + context = _OpenTelemetryServicerContext(context, span) with self._record_duration_manager(metric_attributes, context): try: - # wrap the context - context = _OpenTelemetryServicerContext(context, span) - # record the request self._record_unary_request( span, @@ -516,6 +372,7 @@ def intercept_stream_unary( context: grpc.ServicerContext, full_method: str ) -> ProtoMessage: + with self._set_remote_context(context): metric_attributes = self._create_attributes(context, full_method) span_attributes = copy.deepcopy(metric_attributes) @@ -531,12 +388,11 @@ def intercept_stream_unary( record_exception=False, set_status_on_exception=False ) as span: - with self._record_duration_manager(metric_attributes, context): + # wrap the context + context = _OpenTelemetryServicerContext(context, span) + with self._record_duration_manager(metric_attributes, context): try: - # wrap the context - context = _OpenTelemetryServicerContext(context, span) - # wrap the request iterator with a recorder request_iterator = self._record_streaming_request( span, @@ -587,6 +443,7 @@ def intercept_stream_stream( context: grpc.ServicerContext, full_method: str ) -> Iterator[ProtoMessage]: + with self._set_remote_context(context): metric_attributes = self._create_attributes(context, full_method) span_attributes = copy.deepcopy(metric_attributes) @@ -602,12 +459,11 @@ def intercept_stream_stream( record_exception=False, set_status_on_exception=False ) as span: + # wrap the context + context = _OpenTelemetryServicerContext(context, span) with self._record_duration_manager(metric_attributes, context): try: - # wrap the context - context = _OpenTelemetryServicerContext(context, span) - # wrap the request iterator with a recorder request_iterator = self._record_streaming_request( span, diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py index 543254c396..3c63f3f2b8 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_utilities.py @@ -18,14 +18,14 @@ from contextlib import contextmanager from enum import Enum from timeit import default_timer -from typing import Generator, Iterator +from typing import Callable, Dict, Generator, Iterable, Iterator, NoReturn, Optional import grpc -from opentelemetry.instrumentation.grpc._types import ProtoMessage -from opentelemetry.metrics import Meter -from opentelemetry.trace import Span +from opentelemetry import metrics, trace +from opentelemetry.instrumentation.grpc._types import ProtoMessage from opentelemetry.semconv.trace import MessageTypeValues, SpanAttributes -from opentelemetry.util.types import Attributes +from opentelemetry.trace.status import Status, StatusCode +from opentelemetry.util.types import Attributes, Metadata _MESSAGE: str = "message" @@ -33,7 +33,7 @@ def _add_message_event( - active_span: Span, + active_span: trace.Span, message_type: str, message_size_by: int, message_id: int = 1 @@ -41,11 +41,11 @@ def _add_message_event( """Adds a message event of an RPC to an active span. Args: - active_span (Span): The active span in which to record the message - as event. + active_span (trace.Span): The active span in which to record the + message as event. message_type (str): The message type value as str, either "SENT" or "RECEIVED". - message_size_by (int): The (uncompressed) message size in bytes as int. + message_size_by (int): The (uncompressed) message size in bytes. message_id (int, optional): The message ID. Defaults to 1. """ @@ -59,6 +59,33 @@ def _add_message_event( ) +def _get_status_code(context: grpc.RpcContext) -> grpc.StatusCode: + """Extracts the status code from a context, even though the context is a + ServicerContext of a grpc version which does not support code(). + + Args: + context (grpc.RpcContext): The context to extract the status code from. + + Returns: + grpc.StatusCode: The extracted status code. + """ + + try: + code = context.code() + except RuntimeError: + if isinstance(code, _OpenTelemetryServicerContext): + code = context._code + elif isinstance(code, grpc.ServicerContext): + code = context._state._code + else: + raise + + if code is not None: + return code + + return grpc.StatusCode.OK + + class _ClientCallDetails( namedtuple( "_ClientCallDetails", @@ -86,7 +113,7 @@ class _EventMetricRecorder: and for recording the duration of a RPC. """ - def __init__(self, meter: Meter, kind: _MetricKind) -> None: + def __init__(self, meter: metrics.Meter, kind: _MetricKind) -> None: """Initializes the _EventMetricRecorder. Args: @@ -129,7 +156,7 @@ def __init__(self, meter: Meter, kind: _MetricKind) -> None: def _record_unary_request( self, - active_span: Span, + active_span: trace.Span, request: ProtoMessage, message_type: MessageTypeValues, metric_attributes: Attributes @@ -141,8 +168,8 @@ def _record_unary_request( histogram. Args: - active_span (Span): The active span in which to record the request - as event. + active_span (trace.Span): The active span in which to record the + request message as event. request (ProtoMessage): The request message. message_type (MessageTypeValues): The message type value. metric_attributes (Attributes): The attributes to record in the @@ -154,22 +181,22 @@ def _record_unary_request( self._request_size_histogram.record(message_size_by, metric_attributes) self._requests_per_rpc_histogram.record(1, metric_attributes) - def _record_response( + def _record_unary_or_streaming_response( self, - active_span: Span, + active_span: trace.Span, response: ProtoMessage, message_type: MessageTypeValues, metric_attributes: Attributes, response_id: int = 1 ) -> None: - """Records a unary OR streaming response. + """Records a unary OR a single, streaming response. The response is recorded as event and its size in the response-size- histogram. Args: - active_span (Span): The active span in which to record the response - as event. + active_span (trace.Span): The active span in which to record the + response message as event. response (ProtoMessage): The response message. message_type (MessageTypeValues): The message type value. metric_attributes (Attributes): The attributes to record in the @@ -188,7 +215,7 @@ def _record_response( message_size_by, metric_attributes ) - def _record_responses_per_rpc( + def _record_num_of_responses_per_rpc( self, responses_per_rpc: int, metric_attributes: Attributes @@ -196,7 +223,6 @@ def _record_responses_per_rpc( """Records the number of responses in the responses-per-RPC-histogram for a streaming response. - Args: responses_per_rpc (int): The number of responses. metric_attributes (Attributes): The attributes to record in the @@ -209,7 +235,7 @@ def _record_responses_per_rpc( def _record_unary_response( self, - active_span: Span, + active_span: trace.Span, response: ProtoMessage, message_type: MessageTypeValues, metric_attributes: Attributes @@ -221,22 +247,22 @@ def _record_unary_response( histogram. Args: - active_span (Span): The active span in which to record the response - as event. + active_span (trace.Span): The active span in which to record the + response message as event. response (ProtoMessage): The response message. message_type (MessageTypeValues): The message type value. metric_attributes (Attributes): The attributes to record in the metrics. """ - self._record_response( + self._record_unary_or_streaming_response( active_span, response, message_type, metric_attributes ) - self._record_responses_per_rpc(1, metric_attributes) + self._record_num_of_responses_per_rpc(1, metric_attributes) def _record_streaming_request( self, - active_span: Span, + active_span: trace.Span, request_iterator: Iterator[ProtoMessage], message_type: MessageTypeValues, metric_attributes: Attributes @@ -248,8 +274,8 @@ def _record_streaming_request( histogram. Args: - active_span (Span): The active span in which to record the request - as event. + active_span (trace.Span): The active span in which to record the + request messages as event. request_iterator (Iterator[ProtoMessage]): The iterator over the request messages. message_type (MessageTypeValues): The message type value. @@ -280,7 +306,7 @@ def _record_streaming_request( def _record_streaming_response( self, - active_span: Span, + active_span: trace.Span, response_iterator: Iterator[ProtoMessage], message_type: MessageTypeValues, metric_attributes: Attributes @@ -292,8 +318,8 @@ def _record_streaming_response( histogram. Args: - active_span (Span): The active span in which to record the response - as event. + active_span (trace.Span): The active span in which to record the + response messages as event. response_iterator (Iterator[ProtoMessage]): The iterator over the response messages. message_type (MessageTypeValues): The message type value. @@ -308,7 +334,7 @@ def _record_streaming_response( try: res_id = 0 for res_id, response in enumerate(response_iterator, start=1): - self._record_response( + self._record_unary_or_streaming_response( active_span, response, message_type, @@ -317,7 +343,7 @@ def _record_streaming_response( ) yield response finally: - self._record_responses_per_rpc(res_id, metric_attributes) + self._record_num_of_responses_per_rpc(res_id, metric_attributes) def _start_duration_measurement(self) -> float: """Starts a duration measurement and returns the start time. @@ -347,14 +373,8 @@ def _record_duration( """ duration = max(round((default_timer() - start_time) * 1000), 0) - if context.code() in (None, grpc.StatusCode.OK): - metric_attributes[SpanAttributes.RPC_GRPC_STATUS_CODE] = ( - grpc.StatusCode.OK.value[0] - ) - else: - metric_attributes[SpanAttributes.RPC_GRPC_STATUS_CODE] = ( - context.code().value[0] - ) + code = _get_status_code(context) + metric_attributes[SpanAttributes.RPC_GRPC_STATUS_CODE] = code.value[0] self._duration_histogram.record(duration, metric_attributes) @contextmanager @@ -381,12 +401,151 @@ def _record_duration_manager( yield finally: duration = max(round((default_timer() - start_time) * 1000), 0) - if context.code() in (None, grpc.StatusCode.OK): - metric_attributes[SpanAttributes.RPC_GRPC_STATUS_CODE] = ( - grpc.StatusCode.OK.value[0] + code = _get_status_code(context) + metric_attributes[SpanAttributes.RPC_GRPC_STATUS_CODE] = ( + code.value[0] + ) + self._duration_histogram.record(duration, metric_attributes) + + +# pylint:disable=abstract-method +class _OpenTelemetryServicerContext(grpc.ServicerContext): + + def __init__( + self, + servicer_context: grpc.ServicerContext, + active_span: trace.Span + ) -> None: + self._servicer_context = servicer_context + self._active_span = active_span + self._code = grpc.StatusCode.OK + self._details = None + super().__init__() + + def __getattr__(self, attr): + return getattr(self._servicer_context, attr) + + # Interface of grpc.RpcContext + + def add_callback(self, callback: Callable[[], None]) -> None: + return self._servicer_context.add_callback(callback) + + def cancel(self) -> bool: + self._code = grpc.StatusCode.CANCELLED + self._details = grpc.StatusCode.CANCELLED.value[1] + self._active_span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, self._code.value[0] + ) + self._active_span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{self._code}: {self._details}", + ) + ) + return self._servicer_context.cancel() + + def is_active(self) -> bool: + return self._servicer_context.is_active() + + def time_remaining(self) -> Optional[float]: + return self._servicer_context.time_remaining() + + # Interface of grpc.ServicerContext + + def abort(self, code: grpc.StatusCode, details: str) -> NoReturn: + if not hasattr(self._servicer_context, "abort"): + raise RuntimeError( + "abort() is not supported with the installed version of grpcio" + ) + self._code = code + self._details = details + self._active_span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0] + ) + self._active_span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{code}: {details}", + ) + ) + return self._servicer_context.abort(code, details) + + def abort_with_status(self, status: grpc.Status) -> NoReturn: + if not hasattr(self._servicer_context, "abort_with_status"): + raise RuntimeError( + "abort_with_status() is not supported with the installed " + "version of grpcio" + ) + return self._servicer_context.abort_with_status(status) + + def auth_context(self) -> Dict[str, Iterable[bytes]]: + return self._servicer_context.auth_context() + + def code(self) -> grpc.StatusCode: + if not hasattr(self._servicer_context, "code"): + raise RuntimeError( + "code() is not supported with the installed version of grpcio" + ) + return self._servicer_context.code() + + def details(self) -> str: + if not hasattr(self._servicer_context, "details"): + raise RuntimeError( + "details() is not supported with the installed version of " + "grpcio" + ) + return self._servicer_context.details() + + def disable_next_message_compression(self) -> None: + return self._service_context.disable_next_message_compression() + + def invocation_metadata(self) -> Metadata: + return self._servicer_context.invocation_metadata() + + def peer(self) -> str: + return self._servicer_context.peer() + + def peer_identities(self) -> Optional[Iterable[bytes]]: + return self._servicer_context.peer_identities() + + def peer_identity_key(self) -> Optional[str]: + return self._servicer_context.peer_identity_key() + + def send_initial_metadata(self, initial_metadata: Metadata) -> None: + return self._servicer_context.send_initial_metadata(initial_metadata) + + def set_code(self, code: grpc.StatusCode) -> None: + self._code = code + # use details if we already have it, otherwise the status description + details = self._details or code.value[1] + self._active_span.set_attribute( + SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0] + ) + if code != grpc.StatusCode.OK: + self._active_span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{code}: {details}", ) - else: - metric_attributes[SpanAttributes.RPC_GRPC_STATUS_CODE] = ( - context.code().value[0] + ) + return self._servicer_context.set_code(code) + + def set_compression(self, compression: grpc.Compression) -> None: + return self._servicer_context.set_compression(compression) + + def set_details(self, details: str) -> None: + self._details = details + if self._code != grpc.StatusCode.OK: + self._active_span.set_status( + Status( + status_code=StatusCode.ERROR, + description=f"{self._code}: {details}", ) - self._duration_histogram.record(duration, metric_attributes) + ) + return self._servicer_context.set_details(details) + + def set_trailing_metadata(self, trailing_metadata: Metadata) -> None: + return self._servicer_context.set_trailing_metadata(trailing_metadata) + + def trailing_metadata(self) -> Metadata: + return self._servicer_context.trailing_metadata()