Skip to content

Commit

Permalink
moved openetlemetry service context into utilities
Browse files Browse the repository at this point in the history
  • Loading branch information
Corvin Lasogga authored and CoLa5 committed Aug 14, 2022
1 parent 8a05970 commit 0065180
Show file tree
Hide file tree
Showing 2 changed files with 224 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import copy
import logging
from contextlib import contextmanager
from typing import Callable, Dict, Iterable, Iterator, Generator, NoReturn, Optional
from typing import Callable, Iterator, Generator, Optional

import grpc

from opentelemetry import metrics, trace
from opentelemetry.context import attach, detach
from opentelemetry.instrumentation.grpc._types import Metadata, ProtoMessage, ProtoMessageOrIterator
from opentelemetry.instrumentation.grpc._utilities import _EventMetricRecorder, _MetricKind
from opentelemetry.instrumentation.grpc._types import ProtoMessage, ProtoMessageOrIterator
from opentelemetry.instrumentation.grpc._utilities import _EventMetricRecorder, _MetricKind, _OpenTelemetryServicerContext
from opentelemetry.propagate import extract
from opentelemetry.semconv.trace import MessageTypeValues, RpcSystemValues, SpanAttributes
from opentelemetry.trace.status import Status, StatusCode
Expand Down Expand Up @@ -79,150 +79,6 @@ def _wrap_rpc_behavior(
)


# pylint:disable=abstract-method
class _OpenTelemetryServicerContext(grpc.ServicerContext):

def __init__(
self,
servicer_context: grpc.ServicerContext,
active_span: trace.Span
) -> None:
self._servicer_context = servicer_context
self._active_span = active_span
self._code = grpc.StatusCode.OK
self._details = None
super().__init__()

def __getattr__(self, attr):
return getattr(self._servicer_context, attr)

# Interface of grpc.RpcContext

# pylint: disable=invalid-name
def add_callback(self, fn: Callable[[], None]) -> None:
return self._servicer_context.add_callback(fn)

def cancel(self) -> None:
self._code = grpc.StatusCode.CANCELLED
self._details = grpc.StatusCode.CANCELLED.value[1]
self._active_span.set_attribute(
SpanAttributes.RPC_GRPC_STATUS_CODE, self._code.value[0]
)
self._active_span.set_status(
Status(
status_code=StatusCode.ERROR,
description=f"{self._code}: {self._details}",
)
)
return self._servicer_context.cancel()

def is_active(self) -> bool:
return self._servicer_context.is_active()

def time_remaining(self) -> Optional[float]:
return self._servicer_context.time_remaining()

# Interface of grpc.ServicerContext

def abort(self, code: grpc.StatusCode, details: str) -> NoReturn:
if not hasattr(self._servicer_context, "abort"):
raise RuntimeError(
"abort() is not supported with the installed version of grpcio"
)
self._code = code
self._details = details
self._active_span.set_attribute(
SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0]
)
self._active_span.set_status(
Status(
status_code=StatusCode.ERROR,
description=f"{code}: {details}",
)
)
return self._servicer_context.abort(code, details)

def abort_with_status(self, status: grpc.Status) -> NoReturn:
if not hasattr(self._servicer_context, "abort_with_status"):
raise RuntimeError(
"abort_with_status() is not supported with the installed "
"version of grpcio"
)
return self._servicer_context.abort_with_status(status)

def auth_context(self) -> Dict[str, Iterable[bytes]]:
return self._servicer_context.auth_context()

def code(self) -> grpc.StatusCode:
if not hasattr(self._servicer_context, "code"):
raise RuntimeError(
"code() is not supported with the installed version of grpcio"
)
return self._servicer_context.code()

def details(self) -> str:
if not hasattr(self._servicer_context, "details"):
raise RuntimeError(
"details() is not supported with the installed version of "
"grpcio"
)
return self._servicer_context.details()

def disable_next_message_compression(self) -> None:
return self._service_context.disable_next_message_compression()

def invocation_metadata(self) -> Metadata:
return self._servicer_context.invocation_metadata()

def peer(self) -> str:
return self._servicer_context.peer()

def peer_identities(self) -> Optional[Iterable[bytes]]:
return self._servicer_context.peer_identities()

def peer_identity_key(self) -> Optional[str]:
return self._servicer_context.peer_identity_key()

def send_initial_metadata(self, initial_metadata: Metadata) -> None:
return self._servicer_context.send_initial_metadata(initial_metadata)

def set_code(self, code: grpc.StatusCode) -> None:
self._code = code
# use details if we already have it, otherwise the status description
details = self._details or code.value[1]
self._active_span.set_attribute(
SpanAttributes.RPC_GRPC_STATUS_CODE, code.value[0]
)
if code != grpc.StatusCode.OK:
self._active_span.set_status(
Status(
status_code=StatusCode.ERROR,
description=f"{code}: {details}",
)
)
return self._servicer_context.set_code(code)

def set_compression(self, compression: grpc.Compression) -> None:
return self._servicer_context.set_compression(compression)

def set_details(self, details: str) -> None:
self._details = details
if self._code != grpc.StatusCode.OK:
self._active_span.set_status(
Status(
status_code=StatusCode.ERROR,
description=f"{self._code}: {details}",
)
)
return self._servicer_context.set_details(details)

def set_trailing_metadata(self, trailing_metadata: Metadata) -> None:
return self._servicer_context.set_trailing_metadata(trailing_metadata)

def trailing_metadata(self) -> Metadata:
return self._servicer_context.trailing_metadata()


# pylint:disable=abstract-method
# pylint:disable=no-self-use
# pylint:disable=unused-argument
Expand Down Expand Up @@ -378,6 +234,7 @@ def intercept_unary_unary(
context: grpc.ServicerContext,
full_method: str
) -> ProtoMessage:

with self._set_remote_context(context):
metric_attributes = self._create_attributes(context, full_method)
span_attributes = copy.deepcopy(metric_attributes)
Expand All @@ -393,12 +250,11 @@ def intercept_unary_unary(
record_exception=False,
set_status_on_exception=False
) as span:
with self._record_duration_manager(metric_attributes, context):
# wrap the context
context = _OpenTelemetryServicerContext(context, span)

with self._record_duration_manager(metric_attributes, context):
try:
# wrap the context
context = _OpenTelemetryServicerContext(context, span)

# record the request
self._record_unary_request(
span,
Expand Down Expand Up @@ -448,6 +304,7 @@ def intercept_unary_stream(
context: grpc.ServicerContext,
full_method: str
) -> Iterator[ProtoMessage]:

with self._set_remote_context(context):
metric_attributes = self._create_attributes(context, full_method)
span_attributes = copy.deepcopy(metric_attributes)
Expand All @@ -463,12 +320,11 @@ def intercept_unary_stream(
record_exception=False,
set_status_on_exception=False
) as span:
# wrap the context
context = _OpenTelemetryServicerContext(context, span)

with self._record_duration_manager(metric_attributes, context):
try:
# wrap the context
context = _OpenTelemetryServicerContext(context, span)

# record the request
self._record_unary_request(
span,
Expand Down Expand Up @@ -516,6 +372,7 @@ def intercept_stream_unary(
context: grpc.ServicerContext,
full_method: str
) -> ProtoMessage:

with self._set_remote_context(context):
metric_attributes = self._create_attributes(context, full_method)
span_attributes = copy.deepcopy(metric_attributes)
Expand All @@ -531,12 +388,11 @@ def intercept_stream_unary(
record_exception=False,
set_status_on_exception=False
) as span:
with self._record_duration_manager(metric_attributes, context):
# wrap the context
context = _OpenTelemetryServicerContext(context, span)

with self._record_duration_manager(metric_attributes, context):
try:
# wrap the context
context = _OpenTelemetryServicerContext(context, span)

# wrap the request iterator with a recorder
request_iterator = self._record_streaming_request(
span,
Expand Down Expand Up @@ -587,6 +443,7 @@ def intercept_stream_stream(
context: grpc.ServicerContext,
full_method: str
) -> Iterator[ProtoMessage]:

with self._set_remote_context(context):
metric_attributes = self._create_attributes(context, full_method)
span_attributes = copy.deepcopy(metric_attributes)
Expand All @@ -602,12 +459,11 @@ def intercept_stream_stream(
record_exception=False,
set_status_on_exception=False
) as span:
# wrap the context
context = _OpenTelemetryServicerContext(context, span)

with self._record_duration_manager(metric_attributes, context):
try:
# wrap the context
context = _OpenTelemetryServicerContext(context, span)

# wrap the request iterator with a recorder
request_iterator = self._record_streaming_request(
span,
Expand Down
Loading

0 comments on commit 0065180

Please sign in to comment.