From eb64f574ff2c3680eed4d9034e260359e0cd4ff6 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Tue, 15 Dec 2020 17:46:44 -0500 Subject: [PATCH 1/7] Bugfix for #257 - properly stream responses Fixes #257 --- .../instrumentation/grpc/__init__.py | 27 +++++++++++++---- .../instrumentation/grpc/_server.py | 30 +++++++++++++++++++ 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index f963a2102f..ffb86b42f9 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -143,6 +143,9 @@ def serve(): # pylint:disable=import-self # pylint:disable=unused-argument +import logging +log = logging.getLogger(__name__) + class GrpcInstrumentorServer(BaseInstrumentor): """ @@ -183,19 +186,32 @@ class GrpcInstrumentorClient(BaseInstrumentor): grpc_client_instrumentor = GrpcInstrumentorClient() grpc.client_instrumentor.instrument() + Instrumetor arguments: + wrap_secure (bool): False to disable wrapping secure channels + wrap_insecure (bool): False to disable wrapping insecure channels + exporter: OpenTelemetry metrics exporter + interval (int): metrics export interval + """ def _instrument(self, **kwargs): exporter = kwargs.get("exporter", None) interval = kwargs.get("interval", 30) - if kwargs.get("channel_type") == "secure": + + # preserve the old argument + if "wrap_secure" not in kwargs and kwargs.get("channel_type", "") == "secure": + kwargs["wrap_secure"] = True + kwargs["wrap_insecure"] = False + + if kwargs.get("wrap_secure", True): + log.info("wrapping secure channels") _wrap( "grpc", "secure_channel", partial(self.wrapper_fn, exporter, interval), ) - - else: + if kwargs.get("wrap_insecure", True): + log.info("wrapping insecure channels") _wrap( "grpc", "insecure_channel", @@ -203,10 +219,11 @@ def _instrument(self, **kwargs): ) def _uninstrument(self, **kwargs): - if kwargs.get("channel_type") == "secure": + if kwargs.get("wrap_secure", True): unwrap(grpc, "secure_channel") - else: + #else: + if kwargs.get("wrap_insecure", True): unwrap(grpc, "insecure_channel") def wrapper_fn( diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py index 3fe859f574..9895b8853f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/_server.py @@ -239,6 +239,15 @@ def intercept_service(self, continuation, handler_call_details): def telemetry_wrapper(behavior, request_streaming, response_streaming): def telemetry_interceptor(request_or_iterator, context): + # handle streaming responses specially + if response_streaming: + return self._intercept_server_stream( + behavior, + handler_call_details, + request_or_iterator, + context, + ) + with self._set_remote_context(context): with self._start_span( handler_call_details, context @@ -249,6 +258,7 @@ def telemetry_interceptor(request_or_iterator, context): # And now we run the actual RPC. try: return behavior(request_or_iterator, context) + except Exception as error: # Bare exceptions are likely to be gRPC aborts, which # we handle in our context wrapper. @@ -263,3 +273,23 @@ def telemetry_interceptor(request_or_iterator, context): return _wrap_rpc_behavior( continuation(handler_call_details), telemetry_wrapper ) + + # Handle streaming responses separately - we have to do this + # to return a *new* generator or various upstream things + # get confused, or we'll lose the consistent trace + def _intercept_server_stream( + self, behavior, handler_call_details, request_or_iterator, context + ): + + with self._set_remote_context(context): + with self._start_span(handler_call_details, context) as span: + context = _OpenTelemetryServicerContext(context, span) + + try: + yield from behavior(request_or_iterator, context) + + except Exception as error: + # pylint:disable=unidiomatic-typecheck + if type(error) != Exception: + span.record_exception(error) + raise error From cd5679506f4baef50d0ae927bec32b2935f86074 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Tue, 15 Dec 2020 23:46:33 -0500 Subject: [PATCH 2/7] Add tests for streaming, multiple spans Add tests for unary_stream calls. Add tests for verifing child spans get picked up and are partof the same trace properly. --- .../tests/test_server_interceptor.py | 226 +++++++++++++++++- 1 file changed, 218 insertions(+), 8 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py index cb61043c15..9b7768903d 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py @@ -31,6 +31,13 @@ from opentelemetry.trace.status import StatusCode +from .protobuf.test_server_pb2_grpc import ( + GRPCTestServerServicer, + add_GRPCTestServerServicer_to_server, +) +from .protobuf.test_server_pb2 import Request, Response + + class UnaryUnaryMethodHandler(grpc.RpcMethodHandler): def __init__(self, handler): self.request_streaming = False @@ -51,6 +58,21 @@ def service(self, handler_call_details): return UnaryUnaryMethodHandler(self._unary_unary_handler) +class Servicer(GRPCTestServerServicer): + """Our test servicer""" + + def SimpleMethod(self, request, context): + return Response( + server_id=request.client_id, response_data=request.request_data, + ) + + def ServerStreamingMethod(self, request, context): + for data in ("one", "two", "three"): + yield Response( + server_id=request.client_id, response_data=data, + ) + + class TestOpenTelemetryServerInterceptor(TestBase): def test_instrumentor(self): def handler(request, context): @@ -134,25 +156,143 @@ def test_create_span(self): # Intercept gRPC calls... interceptor = server_interceptor() - # No-op RPC handler - def handler(request, context): - return b"" + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(Servicer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + channel.unary_unary(rpc_call)(msg) + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + span, opentelemetry.instrumentation.grpc + ) + + # Check attributes + self.assert_span_has_attributes( + span, + { + "net.peer.ip": "[::1]", + "net.peer.name": "localhost", + "rpc.method": "SimpleMethod", + "rpc.service": "GRPCTestServer", + "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], + }, + ) + + def test_create_two_spans(self): + """Verify that the interceptor captures sub spans within the given + trace""" + + class TwoSpanServicer(GRPCTestServerServicer): + def SimpleMethod(self, request, context): + + # create another span + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span("child") as child: + child.add_event("child event") + + return Response( + server_id=request.client_id, + response_data=request.request_data, + ) + + # Intercept gRPC calls... + interceptor = server_interceptor() + + # setup the server server = grpc.server( futures.ThreadPoolExecutor(max_workers=1), options=(("grpc.so_reuseport", 0),), interceptors=[interceptor], ) + add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) - server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + # setup the RPC + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + channel.unary_unary(rpc_call)(msg) + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 2) + child_span = spans_list[0] + parent_span = spans_list[1] + + self.assertEqual(parent_span.name, rpc_call) + self.assertIs(parent_span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + parent_span, opentelemetry.instrumentation.grpc + ) + # Check attributes + self.assert_span_has_attributes( + parent_span, + { + "net.peer.ip": "[::1]", + "net.peer.name": "localhost", + "rpc.method": "SimpleMethod", + "rpc.service": "GRPCTestServer", + "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], + }, + ) + + # Check the child span + self.assertEqual(child_span.name, "child") + self.assertEqual(parent_span.context.trace_id, child_span.context.trace_id) + + def test_create_span_streaming(self): + """Check that the interceptor wraps calls with spans server-side, on a + streaming call.""" + + # Intercept gRPC calls... + interceptor = server_interceptor() + + # setup the server + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(Servicer(), server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel("localhost:{:d}".format(port)) - rpc_call = "TestServicer/handler" + # setup the RPC + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() try: server.start() - channel.unary_unary(rpc_call)(b"") + list(channel.unary_stream(rpc_call)(msg)) finally: server.stop(None) @@ -174,13 +314,83 @@ def handler(request, context): { "net.peer.ip": "[::1]", "net.peer.name": "localhost", - "rpc.method": "handler", - "rpc.service": "TestServicer", + "rpc.method": "ServerStreamingMethod", + "rpc.service": "GRPCTestServer", "rpc.system": "grpc", "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], }, ) + def test_create_two_spans_streaming(self): + """Verify that the interceptor captures sub spans in a + streaming call, within the given trace""" + + class TwoSpanServicer(GRPCTestServerServicer): + def ServerStreamingMethod(self, request, context): + + # create another span + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span("child") as child: + child.add_event("child event") + + for data in ("one", "two", "three"): + yield Response( + server_id=request.client_id, response_data=data, + ) + + # Intercept gRPC calls... + interceptor = server_interceptor() + + # setup the server + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(TwoSpanServicer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + # setup the RPC + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: + server.start() + list(channel.unary_stream(rpc_call)(msg)) + finally: + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 2) + child_span = spans_list[0] + parent_span = spans_list[1] + + self.assertEqual(parent_span.name, rpc_call) + self.assertIs(parent_span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.check_span_instrumentation_info( + parent_span, opentelemetry.instrumentation.grpc + ) + + # Check attributes + self.assert_span_has_attributes( + parent_span, + { + "net.peer.ip": "[::1]", + "net.peer.name": "localhost", + "rpc.method": "ServerStreamingMethod", + "rpc.service": "GRPCTestServer", + "rpc.system": "grpc", + "rpc.grpc.status_code": grpc.StatusCode.OK.value[0], + }, + ) + + # Check the child span + self.assertEqual(child_span.name, "child") + self.assertEqual(parent_span.context.trace_id, child_span.context.trace_id) + def test_span_lifetime(self): """Check that the span is active for the duration of the call.""" From acab44caddaa43b0a71ecb083dc778e700f333e3 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Wed, 16 Dec 2020 00:00:03 -0500 Subject: [PATCH 3/7] remove client changes, those are for another PR --- .../instrumentation/grpc/__init__.py | 27 ++++--------------- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py index ffb86b42f9..f963a2102f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/src/opentelemetry/instrumentation/grpc/__init__.py @@ -143,9 +143,6 @@ def serve(): # pylint:disable=import-self # pylint:disable=unused-argument -import logging -log = logging.getLogger(__name__) - class GrpcInstrumentorServer(BaseInstrumentor): """ @@ -186,32 +183,19 @@ class GrpcInstrumentorClient(BaseInstrumentor): grpc_client_instrumentor = GrpcInstrumentorClient() grpc.client_instrumentor.instrument() - Instrumetor arguments: - wrap_secure (bool): False to disable wrapping secure channels - wrap_insecure (bool): False to disable wrapping insecure channels - exporter: OpenTelemetry metrics exporter - interval (int): metrics export interval - """ def _instrument(self, **kwargs): exporter = kwargs.get("exporter", None) interval = kwargs.get("interval", 30) - - # preserve the old argument - if "wrap_secure" not in kwargs and kwargs.get("channel_type", "") == "secure": - kwargs["wrap_secure"] = True - kwargs["wrap_insecure"] = False - - if kwargs.get("wrap_secure", True): - log.info("wrapping secure channels") + if kwargs.get("channel_type") == "secure": _wrap( "grpc", "secure_channel", partial(self.wrapper_fn, exporter, interval), ) - if kwargs.get("wrap_insecure", True): - log.info("wrapping insecure channels") + + else: _wrap( "grpc", "insecure_channel", @@ -219,11 +203,10 @@ def _instrument(self, **kwargs): ) def _uninstrument(self, **kwargs): - if kwargs.get("wrap_secure", True): + if kwargs.get("channel_type") == "secure": unwrap(grpc, "secure_channel") - #else: - if kwargs.get("wrap_insecure", True): + else: unwrap(grpc, "insecure_channel") def wrapper_fn( From 5dfe1eaa3488c50fc96fca1a03788a6f1a24968d Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Wed, 16 Dec 2020 00:01:27 -0500 Subject: [PATCH 4/7] black --- .../tests/test_server_interceptor.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py index 9b7768903d..3256bd79f1 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py @@ -267,7 +267,9 @@ def SimpleMethod(self, request, context): # Check the child span self.assertEqual(child_span.name, "child") - self.assertEqual(parent_span.context.trace_id, child_span.context.trace_id) + self.assertEqual( + parent_span.context.trace_id, child_span.context.trace_id + ) def test_create_span_streaming(self): """Check that the interceptor wraps calls with spans server-side, on a @@ -389,7 +391,9 @@ def ServerStreamingMethod(self, request, context): # Check the child span self.assertEqual(child_span.name, "child") - self.assertEqual(parent_span.context.trace_id, child_span.context.trace_id) + self.assertEqual( + parent_span.context.trace_id, child_span.context.trace_id + ) def test_span_lifetime(self): """Check that the span is active for the duration of the call.""" From 16dab2689a0614f8d9106ad4cb7aeba279742750 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Wed, 16 Dec 2020 00:06:01 -0500 Subject: [PATCH 5/7] more linting --- .../tests/test_server_interceptor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py index 3256bd79f1..9e91d763a3 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py @@ -30,12 +30,11 @@ from opentelemetry.test.test_base import TestBase from opentelemetry.trace.status import StatusCode - +from .protobuf.test_server_pb2 import Request, Response from .protobuf.test_server_pb2_grpc import ( GRPCTestServerServicer, add_GRPCTestServerServicer_to_server, ) -from .protobuf.test_server_pb2 import Request, Response class UnaryUnaryMethodHandler(grpc.RpcMethodHandler): From 4332e7f82fc759a42c86fc265cfcc3d00369a5f4 Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Wed, 16 Dec 2020 00:22:27 -0500 Subject: [PATCH 6/7] changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 38e65c98d6..14b0a49ca7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -74,6 +74,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246)) - Update TraceState to adhere to specs ([#276](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/276)) +- `opentelemetry-instrumentation-grpc` Fix issue tracking child spans in streaming responses + ([#260](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/260)) ### Removed - Remove Configuration From c3d647e7e38a55f9a02a19bba98f2ad27387e19a Mon Sep 17 00:00:00 2001 From: Michael Stella Date: Thu, 4 Feb 2021 13:54:48 -0500 Subject: [PATCH 7/7] Appease the linter, yet again. --- .../tests/test_server_interceptor.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py index 9e91d763a3..86c7136c89 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py @@ -60,11 +60,13 @@ def service(self, handler_call_details): class Servicer(GRPCTestServerServicer): """Our test servicer""" + # pylint:disable=C0103 def SimpleMethod(self, request, context): return Response( server_id=request.client_id, response_data=request.request_data, ) + # pylint:disable=C0103 def ServerStreamingMethod(self, request, context): for data in ("one", "two", "three"): yield Response( @@ -203,6 +205,7 @@ def test_create_two_spans(self): trace""" class TwoSpanServicer(GRPCTestServerServicer): + # pylint:disable=C0103 def SimpleMethod(self, request, context): # create another span @@ -327,6 +330,7 @@ def test_create_two_spans_streaming(self): streaming call, within the given trace""" class TwoSpanServicer(GRPCTestServerServicer): + # pylint:disable=C0103 def ServerStreamingMethod(self, request, context): # create another span