diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py index 03e063aa0d..b0669032ca 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py @@ -38,26 +38,6 @@ ) -class UnaryUnaryMethodHandler(grpc.RpcMethodHandler): - def __init__(self, handler): - self.request_streaming = False - self.response_streaming = False - self.request_deserializer = None - self.response_serializer = None - self.unary_unary = handler - self.unary_stream = None - self.stream_unary = None - self.stream_stream = None - - -class UnaryUnaryRpcHandler(grpc.GenericRpcHandler): - def __init__(self, handler): - self._unary_unary_handler = handler - - def service(self, handler_call_details): - return UnaryUnaryMethodHandler(self._unary_unary_handler) - - class Servicer(GRPCTestServerServicer): """Our test servicer""" @@ -78,9 +58,15 @@ def ServerStreamingMethod(self, request, context): class TestOpenTelemetryServerInterceptor(TestBase): + # pylint:disable=C0103 + + def assertEvent(self, event, name, attributes): + self.assertEqual(event.name, name) + for key, val in attributes.items(): + self.assertIn(key, event.attributes, msg=str(event.attributes)) + self.assertEqual(val, event.attributes[key], msg=str(event.attributes)) + def test_instrumentor(self): - def handler(request, context): - return b"" grpc_server_instrumentor = GrpcInstrumentorServer() grpc_server_instrumentor.instrument() @@ -89,16 +75,17 @@ def handler(request, context): executor, options=(("grpc.so_reuseport", 0),), ) - - server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) - + add_GRPCTestServerServicer_to_server(Servicer(), server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") - rpc_call = "TestServicer/handler" + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: server.start() - channel.unary_unary(rpc_call)(b"test") + response = channel.unary_unary(rpc_call)(msg) finally: server.stop(None) @@ -113,14 +100,15 @@ def handler(request, context): span, opentelemetry.instrumentation.grpc ) - # Check attributes + # check attributes self.assertSpanHasAttributes( span, { SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", SpanAttributes.NET_PEER_NAME: "localhost", - SpanAttributes.RPC_METHOD: "handler", - SpanAttributes.RPC_SERVICE: "TestServicer", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ 0 @@ -128,11 +116,30 @@ def handler(request, context): }, ) + # check events + self.assertEqual(len(span.events), 2) + self.assertEvent( + span.events[0], + "message", + { + SpanAttributes.MESSAGE_TYPE: "RECEIVED", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + } + ) + self.assertEvent( + span.events[1], + "message", + { + SpanAttributes.MESSAGE_TYPE: "SENT", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(response) + } + ) + grpc_server_instrumentor.uninstrument() def test_uninstrument(self): - def handler(request, context): - return b"" grpc_server_instrumentor = GrpcInstrumentorServer() grpc_server_instrumentor.instrument() @@ -142,16 +149,16 @@ def handler(request, context): executor, options=(("grpc.so_reuseport", 0),), ) - - server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) - + add_GRPCTestServerServicer_to_server(Servicer(), server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") - rpc_call = "TestServicer/test" + rpc_call = "/GRPCTestServer/SimpleMethod" + msg = Request().SerializeToString() + try: server.start() - channel.unary_unary(rpc_call)(b"test") + channel.unary_unary(rpc_call)(msg) finally: server.stop(None) @@ -177,9 +184,10 @@ def test_create_span(self): rpc_call = "/GRPCTestServer/SimpleMethod" request = Request(client_id=1, request_data="test") msg = request.SerializeToString() + try: server.start() - channel.unary_unary(rpc_call)(msg) + response = channel.unary_unary(rpc_call)(msg) finally: server.stop(None) @@ -200,6 +208,7 @@ def test_create_span(self): span, { SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", SpanAttributes.NET_PEER_NAME: "localhost", SpanAttributes.RPC_METHOD: "SimpleMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", @@ -210,6 +219,27 @@ def test_create_span(self): }, ) + # check events + self.assertEqual(len(span.events), 2) + self.assertEvent( + span.events[0], + "message", + { + SpanAttributes.MESSAGE_TYPE: "RECEIVED", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + } + ) + self.assertEvent( + span.events[1], + "message", + { + SpanAttributes.MESSAGE_TYPE: "SENT", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(response) + } + ) + def test_create_two_spans(self): """Verify that the interceptor captures sub spans within the given trace""" @@ -246,9 +276,10 @@ def SimpleMethod(self, request, context): rpc_call = "/GRPCTestServer/SimpleMethod" request = Request(client_id=1, request_data="test") msg = request.SerializeToString() + try: server.start() - channel.unary_unary(rpc_call)(msg) + response = channel.unary_unary(rpc_call)(msg) finally: server.stop(None) @@ -260,16 +291,17 @@ def SimpleMethod(self, request, context): self.assertEqual(parent_span.name, rpc_call) self.assertIs(parent_span.kind, trace.SpanKind.SERVER) - # Check version and name in span's instrumentation info + # check version and name in span's instrumentation info self.assertEqualSpanInstrumentationInfo( parent_span, opentelemetry.instrumentation.grpc ) - # Check attributes + # check parent attributes self.assertSpanHasAttributes( parent_span, { SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", SpanAttributes.NET_PEER_NAME: "localhost", SpanAttributes.RPC_METHOD: "SimpleMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", @@ -280,12 +312,41 @@ def SimpleMethod(self, request, context): }, ) - # Check the child span + # check parent events + self.assertEqual(len(parent_span.events), 2) + self.assertEvent( + parent_span.events[0], + "message", + { + SpanAttributes.MESSAGE_TYPE: "RECEIVED", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + } + ) + self.assertEvent( + parent_span.events[1], + "message", + { + SpanAttributes.MESSAGE_TYPE: "SENT", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(response) + } + ) + + # check the child span self.assertEqual(child_span.name, "child") self.assertEqual( parent_span.context.trace_id, child_span.context.trace_id ) + # check child event + self.assertEqual(len(child_span.events), 1) + self.assertEvent( + child_span.events[0], + "child event", + {} + ) + def test_create_span_streaming(self): """Check that the interceptor wraps calls with spans server-side, on a streaming call.""" @@ -307,9 +368,10 @@ def test_create_span_streaming(self): rpc_call = "/GRPCTestServer/ServerStreamingMethod" request = Request(client_id=1, request_data="test") msg = request.SerializeToString() + try: server.start() - list(channel.unary_stream(rpc_call)(msg)) + responses = list(channel.unary_stream(rpc_call)(msg)) finally: server.stop(None) @@ -320,16 +382,17 @@ def test_create_span_streaming(self): self.assertEqual(span.name, rpc_call) self.assertIs(span.kind, trace.SpanKind.SERVER) - # Check version and name in span's instrumentation info + # check version and name in span's instrumentation info self.assertEqualSpanInstrumentationInfo( span, opentelemetry.instrumentation.grpc ) - # Check attributes + # check attributes self.assertSpanHasAttributes( span, { SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", SpanAttributes.NET_PEER_NAME: "localhost", SpanAttributes.RPC_METHOD: "ServerStreamingMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", @@ -340,6 +403,30 @@ def test_create_span_streaming(self): }, ) + # check events + self.assertEqual(len(span.events), len(responses) + 1) + self.assertEvent( + span.events[0], + "message", + { + SpanAttributes.MESSAGE_TYPE: "RECEIVED", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + } + ) + for res_id, (event, response) in enumerate( + zip(span.events[1:], responses), start=1 + ): + self.assertEvent( + event, + "message", + { + SpanAttributes.MESSAGE_TYPE: "SENT", + SpanAttributes.MESSAGE_ID: res_id, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(response) + } + ) + def test_create_two_spans_streaming(self): """Verify that the interceptor captures sub spans in a streaming call, within the given trace""" @@ -376,6 +463,7 @@ def ServerStreamingMethod(self, request, context): rpc_call = "/GRPCTestServer/ServerStreamingMethod" request = Request(client_id=1, request_data="test") msg = request.SerializeToString() + try: server.start() list(channel.unary_stream(rpc_call)(msg)) @@ -400,6 +488,7 @@ def ServerStreamingMethod(self, request, context): parent_span, { SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", SpanAttributes.NET_PEER_NAME: "localhost", SpanAttributes.RPC_METHOD: "ServerStreamingMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", @@ -427,7 +516,10 @@ def test_span_lifetime(self): def handler(request, context): nonlocal active_span_in_handler active_span_in_handler = trace.get_current_span() - return b"" + return Response() + + servicer = Servicer() + servicer.SimpleMethod = handler with futures.ThreadPoolExecutor(max_workers=1) as executor: server = grpc.server( @@ -435,15 +527,64 @@ def handler(request, context): options=(("grpc.so_reuseport", 0),), interceptors=[interceptor], ) - server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + add_GRPCTestServerServicer_to_server(servicer, server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + rpc_call = "/GRPCTestServer/SimpleMethod" + msg = Request().SerializeToString() + active_span_before_call = trace.get_current_span() + try: + server.start() + channel.unary_unary(rpc_call)(msg) + finally: + server.stop(None) + active_span_after_call = trace.get_current_span() + + self.assertEqual(active_span_before_call, trace.INVALID_SPAN) + self.assertEqual(active_span_after_call, trace.INVALID_SPAN) + self.assertIsInstance(active_span_in_handler, trace_sdk.Span) + self.assertIsNone(active_span_in_handler.parent) + + def test_span_lifetime_streaming(self): + """Check that the span is active for the duration of the call.""" + + interceptor = server_interceptor() + + # To capture the current span at the time the handler is called + active_span_in_handler = None + + def handler(request, context): + nonlocal active_span_in_handler + active_span_in_handler = trace.get_current_span() + for data in ("one", "two", "three"): + yield Response( + server_id=request.client_id, + response_data=data, + ) + + servicer = Servicer() + servicer.ServerStreamingMethod = handler + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(servicer, server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + active_span_before_call = trace.get_current_span() try: server.start() - channel.unary_unary("TestServicer/handler")(b"") + list(channel.unary_stream(rpc_call)(msg)) finally: server.stop(None) active_span_after_call = trace.get_current_span() @@ -463,7 +604,10 @@ def test_sequential_server_spans(self): def handler(request, context): active_spans_in_handler.append(trace.get_current_span()) - return b"" + return Response() + + servicer = Servicer() + servicer.SimpleMethod = handler with futures.ThreadPoolExecutor(max_workers=1) as executor: server = grpc.server( @@ -471,15 +615,17 @@ def handler(request, context): options=(("grpc.so_reuseport", 0),), interceptors=[interceptor], ) - server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) - + add_GRPCTestServerServicer_to_server(servicer, server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") + rpc_call = "/GRPCTestServer/SimpleMethod" + msg = Request().SerializeToString() + try: server.start() - channel.unary_unary("TestServicer/handler")(b"") - channel.unary_unary("TestServicer/handler")(b"") + response_1 = channel.unary_unary(rpc_call)(msg) + response_2 = channel.unary_unary(rpc_call)(msg) finally: server.stop(None) @@ -490,7 +636,7 @@ def handler(request, context): self.assertNotEqual(span1.context.span_id, span2.context.span_id) self.assertNotEqual(span1.context.trace_id, span2.context.trace_id) - for span in (span1, span2): + for span, response in zip([span1, span2], [response_1, response_2]): # each should be a root span self.assertIsNone(span2.parent) @@ -499,9 +645,10 @@ def handler(request, context): span, { SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", SpanAttributes.NET_PEER_NAME: "localhost", - SpanAttributes.RPC_METHOD: "handler", - SpanAttributes.RPC_SERVICE: "TestServicer", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ 0 @@ -509,6 +656,26 @@ def handler(request, context): }, ) + self.assertEqual(len(span.events), 2) + self.assertEvent( + span.events[0], + "message", + { + SpanAttributes.MESSAGE_TYPE: "RECEIVED", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + } + ) + self.assertEvent( + span.events[1], + "message", + { + SpanAttributes.MESSAGE_TYPE: "SENT", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(response) + } + ) + def test_concurrent_server_spans(self): """Check that concurrent RPC calls don't interfere with each other. @@ -527,7 +694,10 @@ def test_concurrent_server_spans(self): def handler(request, context): latch() active_spans_in_handler.append(trace.get_current_span()) - return b"" + return Response() + + servicer = Servicer() + servicer.SimpleMethod = handler with futures.ThreadPoolExecutor(max_workers=2) as executor: server = grpc.server( @@ -535,21 +705,25 @@ def handler(request, context): options=(("grpc.so_reuseport", 0),), interceptors=[interceptor], ) - server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + add_GRPCTestServerServicer_to_server(servicer, server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + try: server.start() # Interleave calls so spans are active on each thread at the same # time with futures.ThreadPoolExecutor(max_workers=2) as tpe: f1 = tpe.submit( - channel.unary_unary("TestServicer/handler"), b"" + channel.unary_unary(rpc_call), msg ) f2 = tpe.submit( - channel.unary_unary("TestServicer/handler"), b"" + channel.unary_unary(rpc_call), msg ) futures.wait((f1, f2)) finally: @@ -562,18 +736,19 @@ def handler(request, context): self.assertNotEqual(span1.context.span_id, span2.context.span_id) self.assertNotEqual(span1.context.trace_id, span2.context.trace_id) - for span in (span1, span2): + for span, response in zip([span1, span2], [f1.result(), f2.result()]): # each should be a root span self.assertIsNone(span2.parent) - # check attributes + # Check attributes self.assertSpanHasAttributes( span, { SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", SpanAttributes.NET_PEER_NAME: "localhost", - SpanAttributes.RPC_METHOD: "handler", - SpanAttributes.RPC_SERVICE: "TestServicer", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ 0 @@ -581,6 +756,27 @@ def handler(request, context): }, ) + # check events + self.assertEqual(len(span.events), 2) + self.assertEvent( + span.events[0], + "message", + { + SpanAttributes.MESSAGE_TYPE: "RECEIVED", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + } + ) + self.assertEvent( + span.events[1], + "message", + { + SpanAttributes.MESSAGE_TYPE: "SENT", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(response) + } + ) + def test_abort(self): """Check that we can catch an abort properly""" @@ -594,24 +790,116 @@ def test_abort(self): def handler(request, context): context.abort(grpc.StatusCode.FAILED_PRECONDITION, failure_message) + servicer = Servicer() + servicer.SimpleMethod = handler + with futures.ThreadPoolExecutor(max_workers=1) as executor: server = grpc.server( executor, options=(("grpc.so_reuseport", 0),), interceptors=[interceptor], ) + add_GRPCTestServerServicer_to_server(servicer, server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + + server.start() + # unfortunately, these are just bare exceptions in grpc... + with self.assertRaises(Exception): + channel.unary_unary(rpc_call)(msg) + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + # make sure this span errored, with the right status and detail + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertEqual( + span.status.description, + f"{grpc.StatusCode.FAILED_PRECONDITION}: {failure_message}", + ) + + # check attributes + self.assertSpanHasAttributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.FAILED_PRECONDITION.value[ + 0 + ], + }, + ) + + # check events + self.assertEqual(len(span.events), 1) + self.assertEvent( + span.events[0], + "message", + { + SpanAttributes.MESSAGE_TYPE: "RECEIVED", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + } + ) + + def test_abort_streaming(self): + """Check that we can catch an abort of a streaming call properly""" + + # Intercept gRPC calls... + interceptor = server_interceptor() + + # our detailed failure message + failure_message = "This is a test failure" + + # aborting RPC handler + def handler(request, context): + yield Response( + server_id=request.client_id, + response_data="one", + ) + context.abort(grpc.StatusCode.FAILED_PRECONDITION, failure_message) - server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + servicer = Servicer() + servicer.ServerStreamingMethod = handler + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(servicer, server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") - rpc_call = "TestServicer/handler" + # setup the RPC + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() server.start() - # unfortunately, these are just bare exceptions in grpc... + responses = [] with self.assertRaises(Exception): - channel.unary_unary(rpc_call)(b"") + for res in channel.unary_stream(rpc_call)(msg): + responses.append(res) server.stop(None) spans_list = self.memory_exporter.get_finished_spans() @@ -621,7 +909,7 @@ def handler(request, context): self.assertEqual(span.name, rpc_call) self.assertIs(span.kind, trace.SpanKind.SERVER) - # Check version and name in span's instrumentation info + # check version and name in span's instrumentation info self.assertEqualSpanInstrumentationInfo( span, opentelemetry.instrumentation.grpc ) @@ -630,17 +918,18 @@ def handler(request, context): self.assertEqual(span.status.status_code, StatusCode.ERROR) self.assertEqual( span.status.description, - f"{grpc.StatusCode.FAILED_PRECONDITION}:{failure_message}", + f"{grpc.StatusCode.FAILED_PRECONDITION}: {failure_message}", ) - # Check attributes + # check attributes self.assertSpanHasAttributes( span, { SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", SpanAttributes.NET_PEER_NAME: "localhost", - SpanAttributes.RPC_METHOD: "handler", - SpanAttributes.RPC_SERVICE: "TestServicer", + SpanAttributes.RPC_METHOD: "ServerStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.FAILED_PRECONDITION.value[ 0 @@ -648,6 +937,621 @@ def handler(request, context): }, ) + # check events + self.assertEqual(len(span.events), 2) + self.assertEqual(len(responses), 1) + self.assertEvent( + span.events[0], + "message", + { + SpanAttributes.MESSAGE_TYPE: "RECEIVED", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + } + ) + self.assertEvent( + span.events[1], + "message", + { + SpanAttributes.MESSAGE_TYPE: "SENT", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(responses[0]) + } + ) + + def test_cancel(self): + """Check that we can catch a cancellation properly""" + + # Intercept gRPC calls... + interceptor = server_interceptor() + + # aborting RPC handler + def handler(request, context): + context.cancel() + return Response() + + servicer = Servicer() + servicer.SimpleMethod = handler + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(servicer, server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + + server.start() + # unfortunately, these are just bare exceptions in grpc... + with self.assertRaises(Exception): # as cm: + channel.unary_unary(rpc_call)(msg) + # exc = cm.exception + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + # make sure this span errored, with the right status and detail + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertEqual( + span.status.description, + f"{grpc.StatusCode.CANCELLED}: {grpc.StatusCode.CANCELLED.value[1]}", + ) + # self.assertEqual( + # span.status.description, + # f"{exc.code()}: {exc.details()}", + # ) + + # check attributes + self.assertSpanHasAttributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[ + 0 + ], + }, + ) + + # check events + self.assertEqual(len(span.events), 2) + self.assertEvent( + span.events[0], + "message", + { + SpanAttributes.MESSAGE_TYPE: "RECEIVED", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + } + ) + self.assertEvent( + span.events[1], + "message", + { + SpanAttributes.MESSAGE_TYPE: "SENT", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: 0 + } + ) + + def test_cancel_streaming(self): + """Check that we can catch a cancellation of a streaming call properly""" + + # Intercept gRPC calls... + interceptor = server_interceptor() + + # aborting RPC handler + def handler(request, context): + yield Response( + server_id=request.client_id, + response_data="one", + ) + context.cancel() + return + + servicer = Servicer() + servicer.ServerStreamingMethod = handler + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(servicer, server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + # setup the RPC + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + + server.start() + responses = [] + with self.assertRaises(Exception): + for res in channel.unary_stream(rpc_call)(msg): + responses.append(res) + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + # make sure this span errored, with the right status and detail + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertEqual( + span.status.description, + f"{grpc.StatusCode.CANCELLED}: {grpc.StatusCode.CANCELLED.value[1]}", + ) + # self.assertEqual( + # span.status.description, + # f"{exc.code()}: {exc.details()}", + # ) + + # check attributes + self.assertSpanHasAttributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "ServerStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[ + 0 + ], + }, + ) + + # check events + self.assertEqual(len(span.events), 2) + self.assertEqual(len(responses), 1) + self.assertEvent( + span.events[0], + "message", + { + SpanAttributes.MESSAGE_TYPE: "RECEIVED", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + } + ) + self.assertEvent( + span.events[1], + "message", + { + SpanAttributes.MESSAGE_TYPE: "SENT", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(responses[0]) + } + ) + + def test_error(self): + """Check that we can catch an error properly""" + + # Intercept gRPC calls... + interceptor = server_interceptor() + + # our detailed failure message + failure_message = "This is a test failure" + + # error RPC handler + def handler(request, context): + context.set_code(grpc.StatusCode.FAILED_PRECONDITION) + context.set_details(failure_message) + return Response() + + servicer = Servicer() + servicer.SimpleMethod = handler + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(servicer, server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + + server.start() + # unfortunately, these are just bare exceptions in grpc... + with self.assertRaises(Exception): + channel.unary_unary(rpc_call)(msg) + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + # make sure this span errored, with the right status and detail + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertEqual( + span.status.description, + f"{grpc.StatusCode.FAILED_PRECONDITION}: {failure_message}", + ) + + # Check attributes + self.assertSpanHasAttributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.FAILED_PRECONDITION.value[ + 0 + ], + }, + ) + + # Check events + self.assertEqual(len(span.events), 2) + self.assertEvent( + span.events[0], + "message", + { + SpanAttributes.MESSAGE_TYPE: "RECEIVED", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + } + ) + self.assertEvent( + span.events[1], + "message", + { + SpanAttributes.MESSAGE_TYPE: "SENT", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: 0 + } + ) + + def test_error_streaming(self): + """Check that we can catch an error in a streaming call properly""" + + # Intercept gRPC calls... + interceptor = server_interceptor() + + # our detailed failure message + failure_message = "This is a test failure" + + # error RPC handler + def handler(request, context): + yield Response( + server_id=request.client_id, + response_data="one", + ) + context.set_code(grpc.StatusCode.FAILED_PRECONDITION) + context.set_details(failure_message) + return + + servicer = Servicer() + servicer.ServerStreamingMethod = handler + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(servicer, server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + # setup the RPC + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + + server.start() + responses = [] + with self.assertRaises(Exception): + for res in channel.unary_stream(rpc_call)(msg): + responses.append(res) + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + # make sure this span errored, with the right status and detail + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertEqual( + span.status.description, + f"{grpc.StatusCode.FAILED_PRECONDITION}: {failure_message}", + ) + + # Check attributes + self.assertSpanHasAttributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "ServerStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.FAILED_PRECONDITION.value[ + 0 + ], + }, + ) + + # Check events + self.assertEqual(len(span.events), 2) + self.assertEqual(len(responses), 1) + self.assertEvent( + span.events[0], + "message", + { + SpanAttributes.MESSAGE_TYPE: "RECEIVED", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + } + ) + self.assertEvent( + span.events[1], + "message", + { + SpanAttributes.MESSAGE_TYPE: "SENT", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(responses[0]) + } + ) + + def test_raise_exception(self): + """Check that we can catch a raised exception properly""" + + # Intercept gRPC calls... + interceptor = server_interceptor() + + # our detailed error message + err_message = "This is a value error" + + # error RPC handler + def handler(request, context): + raise ValueError(err_message) + + servicer = Servicer() + servicer.SimpleMethod = handler + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(servicer, server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + + server.start() + # unfortunately, these are just bare exceptions in grpc... + with self.assertRaises(Exception): + channel.unary_unary(rpc_call)(msg) + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + # make sure this span errored, with the right status and detail + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertEqual( + span.status.description, + f"{ValueError.__name__}: {err_message}", + ) + + # Check attributes + self.assertSpanHasAttributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.UNKNOWN.value[ + 0 + ], + }, + ) + + # Check events + self.assertEqual(len(span.events), 2) + self.assertEvent( + span.events[0], + "message", + { + SpanAttributes.MESSAGE_TYPE: "RECEIVED", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + } + ) + self.assertEvent( + span.events[1], + "exception", + { + "exception.type": ValueError.__name__, + "exception.message": err_message, + # "exception.stacktrace": "...", + "exception.escaped": str(False), + } + ) + + def test_raise_exception_streaming(self): + """Check that we can catch a raised exception in a streaming call properly""" + + # Intercept gRPC calls... + interceptor = server_interceptor() + + # our detailed error message + err_message = "This is a value error" + + # error RPC handler + def handler(request, context): + yield Response( + server_id=request.client_id, + response_data="one", + ) + raise ValueError(err_message) + + servicer = Servicer() + servicer.ServerStreamingMethod = handler + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(servicer, server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + # setup the RPC + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + request = Request(client_id=1, request_data="test") + msg = request.SerializeToString() + + server.start() + responses = [] + with self.assertRaises(Exception): + for res in channel.unary_stream(rpc_call)(msg): + responses.append(res) + server.stop(None) + + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + self.assertEqual(span.name, rpc_call) + self.assertIs(span.kind, trace.SpanKind.SERVER) + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.grpc + ) + + # make sure this span errored, with the right status and detail + self.assertEqual(span.status.status_code, StatusCode.ERROR) + self.assertEqual( + span.status.description, + f"{ValueError.__name__}: {err_message}", + ) + + # Check attributes + self.assertSpanHasAttributes( + span, + { + SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "ServerStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.UNKNOWN.value[ + 0 + ], + }, + ) + + # Check events + self.assertEqual(len(span.events), 3) + self.assertEqual(len(responses), 1) + self.assertEvent( + span.events[0], + "message", + { + SpanAttributes.MESSAGE_TYPE: "RECEIVED", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + } + ) + self.assertEvent( + span.events[1], + "message", + { + SpanAttributes.MESSAGE_TYPE: "SENT", + SpanAttributes.MESSAGE_ID: 1, + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(responses[0]) + } + ) + self.assertEvent( + span.events[2], + "exception", + { + "exception.type": ValueError.__name__, + "exception.message": err_message, + # "exception.stacktrace": "...", + "exception.escaped": str(False), + } + ) + def get_latch(num): """Get a countdown latch function for use in n threads."""