diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/_server.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/_server.py index 5375253c01..f46cce722f 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/_server.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/_server.py @@ -13,6 +13,7 @@ # limitations under the License. from concurrent import futures +import time import grpc @@ -26,51 +27,98 @@ class TestServer(test_server_pb2_grpc.GRPCTestServerServicer): # pylint: disable=no-self-use def SimpleMethod(self, request, context): - if request.request_data == "error": + if request.request_data == "abort": + context.abort( + grpc.StatusCode.FAILED_PRECONDITION, request.request_data + ) + elif request.request_data == "cancel": + context.cancel() + return test_server_pb2.Response() + elif request.request_data == "error": context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + context.set_details(request.request_data) return test_server_pb2.Response() - response = test_server_pb2.Response( + elif request.request_data == "exception": + raise ValueError(request.request_data) + elif request.request_data == "sleep": + time.sleep(0.5) + + return test_server_pb2.Response( server_id=SERVER_ID, response_data="data" ) - return response def ClientStreamingMethod(self, request_iterator, context): data = list(request_iterator) - if data[0].request_data == "error": + + if data[0].request_data == "abort": + context.abort( + grpc.StatusCode.FAILED_PRECONDITION, data[0].request_data + ) + elif data[0].request_data == "cancel": + context.cancel() + return test_server_pb2.Response() + elif data[0].request_data == "error": context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + context.set_details(data[0].request_data) return test_server_pb2.Response() - response = test_server_pb2.Response( + elif data[0].request_data == "exception": + raise ValueError(data[0].request_data) + elif data[0].request_data == "sleep": + time.sleep(0.5) + + return test_server_pb2.Response( server_id=SERVER_ID, response_data="data" ) - return response def ServerStreamingMethod(self, request, context): - if request.request_data == "error": + yield test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) + + if request.request_data == "abort": context.abort( - code=grpc.StatusCode.INVALID_ARGUMENT, - details="server stream error", + grpc.StatusCode.FAILED_PRECONDITION, request.request_data ) + elif request.request_data == "cancel": + context.cancel() return test_server_pb2.Response() + elif request.request_data == "error": + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + context.set_details(request.request_data) + return test_server_pb2.Response() + elif request.request_data == "exception": + raise ValueError(request.request_data) + elif request.request_data == "sleep": + time.sleep(0.5) - # create a generator - def response_messages(): - for _ in range(5): - response = test_server_pb2.Response( - server_id=SERVER_ID, response_data="data" - ) - yield response - - return response_messages() + for _ in range(5): + yield test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) def BidirectionalStreamingMethod(self, request_iterator, context): data = list(request_iterator) - if data[0].request_data == "error": + + yield test_server_pb2.Response( + server_id=SERVER_ID, response_data="data" + ) + + if data[0].request_data == "abort": context.abort( - code=grpc.StatusCode.INVALID_ARGUMENT, - details="bidirectional error", + grpc.StatusCode.FAILED_PRECONDITION, data[0].request_data ) + elif data[0].request_data == "cancel": + context.cancel() + return + elif data[0].request_data == "error": + context.set_code(grpc.StatusCode.INVALID_ARGUMENT) + context.set_details(data[0].request_data) return + elif data[0].request_data == "exception": + raise ValueError(data[0].request_data) + elif data[0].request_data == "sleep": + time.sleep(0.5) for _ in range(5): yield test_server_pb2.Response( diff --git a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py index b0669032ca..2d06cb92fd 100644 --- a/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py +++ b/instrumentation/opentelemetry-instrumentation-grpc/tests/test_server_interceptor.py @@ -15,18 +15,17 @@ # pylint:disable=unused-argument # pylint:disable=no-self-use +from ftplib import error_perm +import logging import threading from concurrent import futures import grpc +from opentelemetry import trace, metrics import opentelemetry.instrumentation.grpc -from opentelemetry import trace -from opentelemetry.instrumentation.grpc import ( - GrpcInstrumentorServer, - server_interceptor, -) -from opentelemetry.sdk import trace as trace_sdk +from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer, server_interceptor +from opentelemetry.sdk.metrics.export import Histogram, HistogramDataPoint from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.test.test_base import TestBase from opentelemetry.trace import StatusCode @@ -36,25 +35,38 @@ GRPCTestServerServicer, add_GRPCTestServerServicer_to_server, ) - - -class Servicer(GRPCTestServerServicer): - """Our test servicer""" - - # pylint:disable=C0103 - def SimpleMethod(self, request, context): - return Response( - server_id=request.client_id, - response_data=request.request_data, - ) - - # pylint:disable=C0103 - def ServerStreamingMethod(self, request, context): - for data in ("one", "two", "three"): - yield Response( - server_id=request.client_id, - response_data=data, - ) +from ._server import TestServer + + +_expected_metric_names = { + "rpc.server.duration": ( + Histogram, + "ms", + "Measures duration of RPC" + ), + "rpc.server.request.size": ( + Histogram, + "By", + "Measures size of RPC request messages (uncompressed)", + ), + "rpc.server.response.size": ( + Histogram, + "By", + "Measures size of RPC response messages (uncompressed)", + ), + "rpc.server.requests_per_rpc": ( + Histogram, + "1", + "Measures the number of messages received per RPC. " + "Should be 1 for all non-streaming RPCs" + ), + "rpc.server.responses_per_rpc": ( + Histogram, + "1", + "Measures the number of messages sent per RPC. " + "Should be 1 for all non-streaming RPCs", + ), +} class TestOpenTelemetryServerInterceptor(TestBase): @@ -64,7 +76,18 @@ def assertEvent(self, event, name, attributes): self.assertEqual(event.name, name) for key, val in attributes.items(): self.assertIn(key, event.attributes, msg=str(event.attributes)) - self.assertEqual(val, event.attributes[key], msg=str(event.attributes)) + self.assertEqual( + val, event.attributes[key], msg=str(event.attributes) + ) + + def assertEqualMetricInstrumentationScope(self, scope_metrics, module): + self.assertEqual(scope_metrics.scope.name, module.__name__) + self.assertEqual(scope_metrics.scope.version, module.__version__) + + def assertMetricDataPointHasAttributes(self, data_point, attributes): + for key, val in attributes.items(): + self.assertIn(key, data_point.attributes) + self.assertEqual(val, data_point.attributes[key]) def test_instrumentor(self): @@ -75,17 +98,17 @@ def test_instrumentor(self): executor, options=(("grpc.so_reuseport", 0),), ) - add_GRPCTestServerServicer_to_server(Servicer(), server) + add_GRPCTestServerServicer_to_server(TestServer(), server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") rpc_call = "/GRPCTestServer/SimpleMethod" request = Request(client_id=1, request_data="test") - msg = request.SerializeToString() + request_ser = request.SerializeToString() try: server.start() - response = channel.unary_unary(rpc_call)(msg) + response = channel.unary_unary(rpc_call)(request_ser) finally: server.stop(None) @@ -110,9 +133,8 @@ def test_instrumentor(self): SpanAttributes.RPC_METHOD: "SimpleMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ - 0 - ], + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.OK.value[0], }, ) @@ -124,7 +146,7 @@ def test_instrumentor(self): { SpanAttributes.MESSAGE_TYPE: "RECEIVED", SpanAttributes.MESSAGE_ID: 1, - SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(request_ser) } ) self.assertEvent( @@ -149,22 +171,25 @@ def test_uninstrument(self): executor, options=(("grpc.so_reuseport", 0),), ) - add_GRPCTestServerServicer_to_server(Servicer(), server) + add_GRPCTestServerServicer_to_server(TestServer(), server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") rpc_call = "/GRPCTestServer/SimpleMethod" - msg = Request().SerializeToString() + request_ser = Request().SerializeToString() try: server.start() - channel.unary_unary(rpc_call)(msg) + channel.unary_unary(rpc_call)(request_ser) finally: server.stop(None) spans_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans_list), 0) + # metrics_data = self.memory_metrics_reader.get_metrics_data() + # self.assertEqual(len(metrics_data.resource_metrics), 0) + def test_create_span(self): """Check that the interceptor wraps calls with spans server-side.""" @@ -177,17 +202,17 @@ def test_create_span(self): options=(("grpc.so_reuseport", 0),), interceptors=[interceptor], ) - add_GRPCTestServerServicer_to_server(Servicer(), server) + add_GRPCTestServerServicer_to_server(TestServer(), server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") rpc_call = "/GRPCTestServer/SimpleMethod" request = Request(client_id=1, request_data="test") - msg = request.SerializeToString() + request_ser = request.SerializeToString() try: server.start() - response = channel.unary_unary(rpc_call)(msg) + response = channel.unary_unary(rpc_call)(request_ser) finally: server.stop(None) @@ -213,9 +238,8 @@ def test_create_span(self): SpanAttributes.RPC_METHOD: "SimpleMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ - 0 - ], + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.OK.value[0], }, ) @@ -227,7 +251,7 @@ def test_create_span(self): { SpanAttributes.MESSAGE_TYPE: "RECEIVED", SpanAttributes.MESSAGE_ID: 1, - SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(request_ser) } ) self.assertEvent( @@ -275,11 +299,11 @@ def SimpleMethod(self, request, context): # setup the RPC rpc_call = "/GRPCTestServer/SimpleMethod" request = Request(client_id=1, request_data="test") - msg = request.SerializeToString() + request_ser = request.SerializeToString() try: server.start() - response = channel.unary_unary(rpc_call)(msg) + response = channel.unary_unary(rpc_call)(request_ser) finally: server.stop(None) @@ -306,9 +330,8 @@ def SimpleMethod(self, request, context): SpanAttributes.RPC_METHOD: "SimpleMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ - 0 - ], + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.OK.value[0], }, ) @@ -320,7 +343,7 @@ def SimpleMethod(self, request, context): { SpanAttributes.MESSAGE_TYPE: "RECEIVED", SpanAttributes.MESSAGE_ID: 1, - SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(request_ser) } ) self.assertEvent( @@ -347,6 +370,9 @@ def SimpleMethod(self, request, context): {} ) + logging.error("%r", self.memory_metrics_reader) + logging.error("%r", self.memory_metrics_reader.get_metrics_data()) + def test_create_span_streaming(self): """Check that the interceptor wraps calls with spans server-side, on a streaming call.""" @@ -360,18 +386,18 @@ def test_create_span_streaming(self): options=(("grpc.so_reuseport", 0),), interceptors=[interceptor], ) - add_GRPCTestServerServicer_to_server(Servicer(), server) + add_GRPCTestServerServicer_to_server(TestServer(), server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") # setup the RPC rpc_call = "/GRPCTestServer/ServerStreamingMethod" request = Request(client_id=1, request_data="test") - msg = request.SerializeToString() + request_ser = request.SerializeToString() try: server.start() - responses = list(channel.unary_stream(rpc_call)(msg)) + responses = list(channel.unary_stream(rpc_call)(request_ser)) finally: server.stop(None) @@ -397,9 +423,8 @@ def test_create_span_streaming(self): SpanAttributes.RPC_METHOD: "ServerStreamingMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ - 0 - ], + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.OK.value[0], }, ) @@ -411,7 +436,7 @@ def test_create_span_streaming(self): { SpanAttributes.MESSAGE_TYPE: "RECEIVED", SpanAttributes.MESSAGE_ID: 1, - SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(request_ser) } ) for res_id, (event, response) in enumerate( @@ -462,11 +487,11 @@ def ServerStreamingMethod(self, request, context): # setup the RPC rpc_call = "/GRPCTestServer/ServerStreamingMethod" request = Request(client_id=1, request_data="test") - msg = request.SerializeToString() + request_ser = request.SerializeToString() try: server.start() - list(channel.unary_stream(rpc_call)(msg)) + list(channel.unary_stream(rpc_call)(request_ser)) finally: server.stop(None) @@ -493,9 +518,8 @@ def ServerStreamingMethod(self, request, context): SpanAttributes.RPC_METHOD: "ServerStreamingMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ - 0 - ], + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.OK.value[0], }, ) @@ -518,7 +542,7 @@ def handler(request, context): active_span_in_handler = trace.get_current_span() return Response() - servicer = Servicer() + servicer = TestServer() servicer.SimpleMethod = handler with futures.ThreadPoolExecutor(max_workers=1) as executor: @@ -532,19 +556,19 @@ def handler(request, context): channel = grpc.insecure_channel(f"localhost:{port:d}") rpc_call = "/GRPCTestServer/SimpleMethod" - msg = Request().SerializeToString() + request_ser = Request().SerializeToString() active_span_before_call = trace.get_current_span() try: server.start() - channel.unary_unary(rpc_call)(msg) + channel.unary_unary(rpc_call)(request_ser) finally: server.stop(None) active_span_after_call = trace.get_current_span() self.assertEqual(active_span_before_call, trace.INVALID_SPAN) self.assertEqual(active_span_after_call, trace.INVALID_SPAN) - self.assertIsInstance(active_span_in_handler, trace_sdk.Span) + self.assertIsInstance(active_span_in_handler, trace.Span) self.assertIsNone(active_span_in_handler.parent) def test_span_lifetime_streaming(self): @@ -564,7 +588,7 @@ def handler(request, context): response_data=data, ) - servicer = Servicer() + servicer = TestServer() servicer.ServerStreamingMethod = handler with futures.ThreadPoolExecutor(max_workers=1) as executor: @@ -579,19 +603,19 @@ def handler(request, context): rpc_call = "/GRPCTestServer/ServerStreamingMethod" request = Request(client_id=1, request_data="test") - msg = request.SerializeToString() + request_ser = request.SerializeToString() active_span_before_call = trace.get_current_span() try: server.start() - list(channel.unary_stream(rpc_call)(msg)) + list(channel.unary_stream(rpc_call)(request_ser)) finally: server.stop(None) active_span_after_call = trace.get_current_span() self.assertEqual(active_span_before_call, trace.INVALID_SPAN) self.assertEqual(active_span_after_call, trace.INVALID_SPAN) - self.assertIsInstance(active_span_in_handler, trace_sdk.Span) + self.assertIsInstance(active_span_in_handler, trace.Span) self.assertIsNone(active_span_in_handler.parent) def test_sequential_server_spans(self): @@ -606,7 +630,7 @@ def handler(request, context): active_spans_in_handler.append(trace.get_current_span()) return Response() - servicer = Servicer() + servicer = TestServer() servicer.SimpleMethod = handler with futures.ThreadPoolExecutor(max_workers=1) as executor: @@ -620,12 +644,12 @@ def handler(request, context): channel = grpc.insecure_channel(f"localhost:{port:d}") rpc_call = "/GRPCTestServer/SimpleMethod" - msg = Request().SerializeToString() + request_ser = Request().SerializeToString() try: server.start() - response_1 = channel.unary_unary(rpc_call)(msg) - response_2 = channel.unary_unary(rpc_call)(msg) + response_1 = channel.unary_unary(rpc_call)(request_ser) + response_2 = channel.unary_unary(rpc_call)(request_ser) finally: server.stop(None) @@ -650,9 +674,8 @@ def handler(request, context): SpanAttributes.RPC_METHOD: "SimpleMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ - 0 - ], + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.OK.value[0], }, ) @@ -663,7 +686,7 @@ def handler(request, context): { SpanAttributes.MESSAGE_TYPE: "RECEIVED", SpanAttributes.MESSAGE_ID: 1, - SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(request_ser) } ) self.assertEvent( @@ -696,7 +719,7 @@ def handler(request, context): active_spans_in_handler.append(trace.get_current_span()) return Response() - servicer = Servicer() + servicer = TestServer() servicer.SimpleMethod = handler with futures.ThreadPoolExecutor(max_workers=2) as executor: @@ -712,18 +735,18 @@ def handler(request, context): rpc_call = "/GRPCTestServer/SimpleMethod" request = Request(client_id=1, request_data="test") - msg = request.SerializeToString() + request_ser = request.SerializeToString() try: server.start() - # Interleave calls so spans are active on each thread at the same - # time + # Interleave calls so spans are active on each thread at the + # same time with futures.ThreadPoolExecutor(max_workers=2) as tpe: f1 = tpe.submit( - channel.unary_unary(rpc_call), msg + channel.unary_unary(rpc_call), request_ser ) f2 = tpe.submit( - channel.unary_unary(rpc_call), msg + channel.unary_unary(rpc_call), request_ser ) futures.wait((f1, f2)) finally: @@ -750,9 +773,8 @@ def handler(request, context): SpanAttributes.RPC_METHOD: "SimpleMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.OK.value[ - 0 - ], + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.OK.value[0], }, ) @@ -764,7 +786,7 @@ def handler(request, context): { SpanAttributes.MESSAGE_TYPE: "RECEIVED", SpanAttributes.MESSAGE_ID: 1, - SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(request_ser) } ) self.assertEvent( @@ -780,37 +802,29 @@ def handler(request, context): def test_abort(self): """Check that we can catch an abort properly""" + abort_message = "abort" + # Intercept gRPC calls... interceptor = server_interceptor() - # our detailed failure message - failure_message = "This is a test failure" - - # aborting RPC handler - def handler(request, context): - context.abort(grpc.StatusCode.FAILED_PRECONDITION, failure_message) - - servicer = Servicer() - servicer.SimpleMethod = handler - with futures.ThreadPoolExecutor(max_workers=1) as executor: server = grpc.server( executor, options=(("grpc.so_reuseport", 0),), interceptors=[interceptor], ) - add_GRPCTestServerServicer_to_server(servicer, server) + add_GRPCTestServerServicer_to_server(TestServer(), server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") rpc_call = "/GRPCTestServer/SimpleMethod" - request = Request(client_id=1, request_data="test") - msg = request.SerializeToString() + request = Request(client_id=1, request_data=abort_message) + request_ser = request.SerializeToString() server.start() # unfortunately, these are just bare exceptions in grpc... with self.assertRaises(Exception): - channel.unary_unary(rpc_call)(msg) + channel.unary_unary(rpc_call)(request_ser) server.stop(None) spans_list = self.memory_exporter.get_finished_spans() @@ -829,7 +843,7 @@ def handler(request, context): self.assertEqual(span.status.status_code, StatusCode.ERROR) self.assertEqual( span.status.description, - f"{grpc.StatusCode.FAILED_PRECONDITION}: {failure_message}", + f"{grpc.StatusCode.FAILED_PRECONDITION}: {abort_message}", ) # check attributes @@ -842,9 +856,8 @@ def handler(request, context): SpanAttributes.RPC_METHOD: "SimpleMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.FAILED_PRECONDITION.value[ - 0 - ], + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.FAILED_PRECONDITION.value[0], }, ) @@ -856,49 +869,37 @@ def handler(request, context): { SpanAttributes.MESSAGE_TYPE: "RECEIVED", SpanAttributes.MESSAGE_ID: 1, - SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(request_ser) } ) def test_abort_streaming(self): """Check that we can catch an abort of a streaming call properly""" + abort_message = "abort" + # Intercept gRPC calls... interceptor = server_interceptor() - # our detailed failure message - failure_message = "This is a test failure" - - # aborting RPC handler - def handler(request, context): - yield Response( - server_id=request.client_id, - response_data="one", - ) - context.abort(grpc.StatusCode.FAILED_PRECONDITION, failure_message) - - servicer = Servicer() - servicer.ServerStreamingMethod = handler - with futures.ThreadPoolExecutor(max_workers=1) as executor: server = grpc.server( executor, options=(("grpc.so_reuseport", 0),), interceptors=[interceptor], ) - add_GRPCTestServerServicer_to_server(servicer, server) + add_GRPCTestServerServicer_to_server(TestServer(), server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") # setup the RPC rpc_call = "/GRPCTestServer/ServerStreamingMethod" - request = Request(client_id=1, request_data="test") - msg = request.SerializeToString() + request = Request(client_id=1, request_data=abort_message) + request_ser = request.SerializeToString() server.start() responses = [] with self.assertRaises(Exception): - for res in channel.unary_stream(rpc_call)(msg): + for res in channel.unary_stream(rpc_call)(request_ser): responses.append(res) server.stop(None) @@ -918,7 +919,7 @@ def handler(request, context): self.assertEqual(span.status.status_code, StatusCode.ERROR) self.assertEqual( span.status.description, - f"{grpc.StatusCode.FAILED_PRECONDITION}: {failure_message}", + f"{grpc.StatusCode.FAILED_PRECONDITION}: {abort_message}", ) # check attributes @@ -931,9 +932,8 @@ def handler(request, context): SpanAttributes.RPC_METHOD: "ServerStreamingMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.FAILED_PRECONDITION.value[ - 0 - ], + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.FAILED_PRECONDITION.value[0], }, ) @@ -946,7 +946,7 @@ def handler(request, context): { SpanAttributes.MESSAGE_TYPE: "RECEIVED", SpanAttributes.MESSAGE_ID: 1, - SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(request_ser) } ) self.assertEvent( @@ -962,35 +962,29 @@ def handler(request, context): def test_cancel(self): """Check that we can catch a cancellation properly""" + cancel_message = "cancel" + # Intercept gRPC calls... interceptor = server_interceptor() - # aborting RPC handler - def handler(request, context): - context.cancel() - return Response() - - servicer = Servicer() - servicer.SimpleMethod = handler - with futures.ThreadPoolExecutor(max_workers=1) as executor: server = grpc.server( executor, options=(("grpc.so_reuseport", 0),), interceptors=[interceptor], ) - add_GRPCTestServerServicer_to_server(servicer, server) + add_GRPCTestServerServicer_to_server(TestServer(), server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") rpc_call = "/GRPCTestServer/SimpleMethod" - request = Request(client_id=1, request_data="test") - msg = request.SerializeToString() + request = Request(client_id=1, request_data=cancel_message) + request_ser = request.SerializeToString() server.start() # unfortunately, these are just bare exceptions in grpc... with self.assertRaises(Exception): # as cm: - channel.unary_unary(rpc_call)(msg) + channel.unary_unary(rpc_call)(request_ser) # exc = cm.exception server.stop(None) @@ -1027,9 +1021,8 @@ def handler(request, context): SpanAttributes.RPC_METHOD: "SimpleMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[ - 0 - ], + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.CANCELLED.value[0], }, ) @@ -1041,7 +1034,7 @@ def handler(request, context): { SpanAttributes.MESSAGE_TYPE: "RECEIVED", SpanAttributes.MESSAGE_ID: 1, - SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(request_ser) } ) self.assertEvent( @@ -1055,42 +1048,33 @@ def handler(request, context): ) def test_cancel_streaming(self): - """Check that we can catch a cancellation of a streaming call properly""" + """Check that we can catch a cancellation of a streaming call properly. + """ + + cancel_message = "cancel" # Intercept gRPC calls... interceptor = server_interceptor() - # aborting RPC handler - def handler(request, context): - yield Response( - server_id=request.client_id, - response_data="one", - ) - context.cancel() - return - - servicer = Servicer() - servicer.ServerStreamingMethod = handler - with futures.ThreadPoolExecutor(max_workers=1) as executor: server = grpc.server( executor, options=(("grpc.so_reuseport", 0),), interceptors=[interceptor], ) - add_GRPCTestServerServicer_to_server(servicer, server) + add_GRPCTestServerServicer_to_server(TestServer(), server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") # setup the RPC rpc_call = "/GRPCTestServer/ServerStreamingMethod" - request = Request(client_id=1, request_data="test") - msg = request.SerializeToString() + request = Request(client_id=1, request_data=cancel_message) + request_ser = request.SerializeToString() server.start() responses = [] with self.assertRaises(Exception): - for res in channel.unary_stream(rpc_call)(msg): + for res in channel.unary_stream(rpc_call)(request_ser): responses.append(res) server.stop(None) @@ -1127,9 +1111,8 @@ def handler(request, context): SpanAttributes.RPC_METHOD: "ServerStreamingMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.CANCELLED.value[ - 0 - ], + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.CANCELLED.value[0], }, ) @@ -1142,7 +1125,7 @@ def handler(request, context): { SpanAttributes.MESSAGE_TYPE: "RECEIVED", SpanAttributes.MESSAGE_ID: 1, - SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(request_ser) } ) self.assertEvent( @@ -1158,39 +1141,29 @@ def handler(request, context): def test_error(self): """Check that we can catch an error properly""" + error_message = "error" + # Intercept gRPC calls... interceptor = server_interceptor() - # our detailed failure message - failure_message = "This is a test failure" - - # error RPC handler - def handler(request, context): - context.set_code(grpc.StatusCode.FAILED_PRECONDITION) - context.set_details(failure_message) - return Response() - - servicer = Servicer() - servicer.SimpleMethod = handler - with futures.ThreadPoolExecutor(max_workers=1) as executor: server = grpc.server( executor, options=(("grpc.so_reuseport", 0),), interceptors=[interceptor], ) - add_GRPCTestServerServicer_to_server(servicer, server) + add_GRPCTestServerServicer_to_server(TestServer(), server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") rpc_call = "/GRPCTestServer/SimpleMethod" - request = Request(client_id=1, request_data="test") - msg = request.SerializeToString() + request = Request(client_id=1, request_data=error_message) + request_ser = request.SerializeToString() server.start() # unfortunately, these are just bare exceptions in grpc... with self.assertRaises(Exception): - channel.unary_unary(rpc_call)(msg) + channel.unary_unary(rpc_call)(request_ser) server.stop(None) spans_list = self.memory_exporter.get_finished_spans() @@ -1209,7 +1182,7 @@ def handler(request, context): self.assertEqual(span.status.status_code, StatusCode.ERROR) self.assertEqual( span.status.description, - f"{grpc.StatusCode.FAILED_PRECONDITION}: {failure_message}", + f"{grpc.StatusCode.INVALID_ARGUMENT}: {error_message}", ) # Check attributes @@ -1222,9 +1195,8 @@ def handler(request, context): SpanAttributes.RPC_METHOD: "SimpleMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.FAILED_PRECONDITION.value[ - 0 - ], + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.INVALID_ARGUMENT.value[0], }, ) @@ -1236,7 +1208,7 @@ def handler(request, context): { SpanAttributes.MESSAGE_TYPE: "RECEIVED", SpanAttributes.MESSAGE_ID: 1, - SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(request_ser) } ) self.assertEvent( @@ -1252,44 +1224,30 @@ def handler(request, context): def test_error_streaming(self): """Check that we can catch an error in a streaming call properly""" + error_message = "error" + # Intercept gRPC calls... interceptor = server_interceptor() - # our detailed failure message - failure_message = "This is a test failure" - - # error RPC handler - def handler(request, context): - yield Response( - server_id=request.client_id, - response_data="one", - ) - context.set_code(grpc.StatusCode.FAILED_PRECONDITION) - context.set_details(failure_message) - return - - servicer = Servicer() - servicer.ServerStreamingMethod = handler - with futures.ThreadPoolExecutor(max_workers=1) as executor: server = grpc.server( executor, options=(("grpc.so_reuseport", 0),), interceptors=[interceptor], ) - add_GRPCTestServerServicer_to_server(servicer, server) + add_GRPCTestServerServicer_to_server(TestServer(), server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") # setup the RPC rpc_call = "/GRPCTestServer/ServerStreamingMethod" - request = Request(client_id=1, request_data="test") - msg = request.SerializeToString() + request = Request(client_id=1, request_data=error_message) + request_ser = request.SerializeToString() server.start() responses = [] with self.assertRaises(Exception): - for res in channel.unary_stream(rpc_call)(msg): + for res in channel.unary_stream(rpc_call)(request_ser): responses.append(res) server.stop(None) @@ -1309,7 +1267,7 @@ def handler(request, context): self.assertEqual(span.status.status_code, StatusCode.ERROR) self.assertEqual( span.status.description, - f"{grpc.StatusCode.FAILED_PRECONDITION}: {failure_message}", + f"{grpc.StatusCode.INVALID_ARGUMENT}: {error_message}", ) # Check attributes @@ -1322,9 +1280,8 @@ def handler(request, context): SpanAttributes.RPC_METHOD: "ServerStreamingMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.FAILED_PRECONDITION.value[ - 0 - ], + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.INVALID_ARGUMENT.value[0], }, ) @@ -1337,7 +1294,7 @@ def handler(request, context): { SpanAttributes.MESSAGE_TYPE: "RECEIVED", SpanAttributes.MESSAGE_ID: 1, - SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(request_ser) } ) self.assertEvent( @@ -1353,37 +1310,29 @@ def handler(request, context): def test_raise_exception(self): """Check that we can catch a raised exception properly""" + exc_message = "exception" + # Intercept gRPC calls... interceptor = server_interceptor() - # our detailed error message - err_message = "This is a value error" - - # error RPC handler - def handler(request, context): - raise ValueError(err_message) - - servicer = Servicer() - servicer.SimpleMethod = handler - with futures.ThreadPoolExecutor(max_workers=1) as executor: server = grpc.server( executor, options=(("grpc.so_reuseport", 0),), interceptors=[interceptor], ) - add_GRPCTestServerServicer_to_server(servicer, server) + add_GRPCTestServerServicer_to_server(TestServer(), server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") rpc_call = "/GRPCTestServer/SimpleMethod" - request = Request(client_id=1, request_data="test") - msg = request.SerializeToString() + request = Request(client_id=1, request_data=exc_message) + request_ser = request.SerializeToString() server.start() # unfortunately, these are just bare exceptions in grpc... with self.assertRaises(Exception): - channel.unary_unary(rpc_call)(msg) + channel.unary_unary(rpc_call)(request_ser) server.stop(None) spans_list = self.memory_exporter.get_finished_spans() @@ -1402,7 +1351,7 @@ def handler(request, context): self.assertEqual(span.status.status_code, StatusCode.ERROR) self.assertEqual( span.status.description, - f"{ValueError.__name__}: {err_message}", + f"{ValueError.__name__}: {exc_message}", ) # Check attributes @@ -1415,9 +1364,8 @@ def handler(request, context): SpanAttributes.RPC_METHOD: "SimpleMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.UNKNOWN.value[ - 0 - ], + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.UNKNOWN.value[0], }, ) @@ -1429,7 +1377,7 @@ def handler(request, context): { SpanAttributes.MESSAGE_TYPE: "RECEIVED", SpanAttributes.MESSAGE_ID: 1, - SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(request_ser) } ) self.assertEvent( @@ -1437,51 +1385,41 @@ def handler(request, context): "exception", { "exception.type": ValueError.__name__, - "exception.message": err_message, + "exception.message": exc_message, # "exception.stacktrace": "...", "exception.escaped": str(False), } ) def test_raise_exception_streaming(self): - """Check that we can catch a raised exception in a streaming call properly""" + """Check that we can catch a raised exception in a streaming call + properly. + """ + + exc_message = "exception" # Intercept gRPC calls... interceptor = server_interceptor() - # our detailed error message - err_message = "This is a value error" - - # error RPC handler - def handler(request, context): - yield Response( - server_id=request.client_id, - response_data="one", - ) - raise ValueError(err_message) - - servicer = Servicer() - servicer.ServerStreamingMethod = handler - with futures.ThreadPoolExecutor(max_workers=1) as executor: server = grpc.server( executor, options=(("grpc.so_reuseport", 0),), interceptors=[interceptor], ) - add_GRPCTestServerServicer_to_server(servicer, server) + add_GRPCTestServerServicer_to_server(TestServer(), server) port = server.add_insecure_port("[::]:0") channel = grpc.insecure_channel(f"localhost:{port:d}") # setup the RPC rpc_call = "/GRPCTestServer/ServerStreamingMethod" - request = Request(client_id=1, request_data="test") - msg = request.SerializeToString() + request = Request(client_id=1, request_data=exc_message) + request_ser = request.SerializeToString() server.start() responses = [] with self.assertRaises(Exception): - for res in channel.unary_stream(rpc_call)(msg): + for res in channel.unary_stream(rpc_call)(request_ser): responses.append(res) server.stop(None) @@ -1501,7 +1439,7 @@ def handler(request, context): self.assertEqual(span.status.status_code, StatusCode.ERROR) self.assertEqual( span.status.description, - f"{ValueError.__name__}: {err_message}", + f"{ValueError.__name__}: {exc_message}", ) # Check attributes @@ -1514,9 +1452,8 @@ def handler(request, context): SpanAttributes.RPC_METHOD: "ServerStreamingMethod", SpanAttributes.RPC_SERVICE: "GRPCTestServer", SpanAttributes.RPC_SYSTEM: "grpc", - SpanAttributes.RPC_GRPC_STATUS_CODE: grpc.StatusCode.UNKNOWN.value[ - 0 - ], + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.UNKNOWN.value[0], }, ) @@ -1529,7 +1466,7 @@ def handler(request, context): { SpanAttributes.MESSAGE_TYPE: "RECEIVED", SpanAttributes.MESSAGE_ID: 1, - SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(msg) + SpanAttributes.MESSAGE_UNCOMPRESSED_SIZE: len(request_ser) } ) self.assertEvent( @@ -1546,12 +1483,736 @@ def handler(request, context): "exception", { "exception.type": ValueError.__name__, - "exception.message": err_message, + "exception.message": exc_message, # "exception.stacktrace": "...", "exception.escaped": str(False), } ) + def test_metrics(self): + # Intercept gRPC calls... + interceptor = server_interceptor() + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(TestServer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + request_ser = request.SerializeToString() + + try: + server.start() + response = channel.unary_unary(rpc_call)(request_ser) + finally: + server.stop(None) + + metrics_list = self.memory_metrics_reader.get_metrics_data() + data_point_seen = True + + self.assertNotEqual(len(metrics_list.resource_metrics), 0) + for resource_metric in metrics_list.resource_metrics: + self.assertNotEqual(len(resource_metric.scope_metrics), 0) + for scope_metric in resource_metric.scope_metrics: + self.assertNotEqual(len(scope_metric.metrics), 0) + self.assertEqualMetricInstrumentationScope( + scope_metric, opentelemetry.instrumentation.grpc + ) + self.assertEqual( + len(scope_metric.metrics), len(_expected_metric_names) + ) + + for metric in scope_metric.metrics: + self.assertIn(metric.name, _expected_metric_names) + self.assertIsInstance( + metric.data, _expected_metric_names[metric.name][0] + ) + self.assertEqual( + metric.unit, _expected_metric_names[metric.name][1] + ) + self.assertEqual( + metric.description, + _expected_metric_names[metric.name][2] + ) + + data_points = list(metric.data.data_points) + self.assertEqual(len(data_points), 1) + + for point in data_points: + if isinstance(metric.data, Histogram): + self.assertIsInstance( + point, HistogramDataPoint + ) + self.assertEqual(point.count, 1) + if metric.name == "rpc.server.duration": + self.assertGreaterEqual(point.sum, 0) + elif metric.name == "rpc.server.request.size": + self.assertEqual(point.sum, len(request_ser)) + elif metric.name == "rpc.server.response.size": + self.assertEqual(point.sum, len(response)) + elif metric.name == "rpc.server.requests_per_rpc": + self.assertEqual(point.sum, 1) + elif metric.name == "rpc.server.responses_per_rpc": + self.assertEqual(point.sum, 1) + + self.assertMetricDataPointHasAttributes( + point, + { + SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + }, + ) + if metric.name == "rpc.server.duration": + self.assertMetricDataPointHasAttributes( + point, + { + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.OK.value[0] + }, + ) + + data_point_seen &= True + + self.assertTrue(data_point_seen) + + def test_metrics_error(self): + + error_message = "error" + + # Intercept gRPC calls... + interceptor = server_interceptor() + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(TestServer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data=error_message) + request_ser = request.SerializeToString() + + try: + server.start() + with self.assertRaises(grpc.RpcError): + channel.unary_unary(rpc_call)(request_ser) + finally: + server.stop(None) + + metrics_list = self.memory_metrics_reader.get_metrics_data() + data_point_seen = True + + self.assertNotEqual(len(metrics_list.resource_metrics), 0) + for resource_metric in metrics_list.resource_metrics: + self.assertNotEqual(len(resource_metric.scope_metrics), 0) + for scope_metric in resource_metric.scope_metrics: + self.assertNotEqual(len(scope_metric.metrics), 0) + self.assertEqualMetricInstrumentationScope( + scope_metric, opentelemetry.instrumentation.grpc + ) + self.assertEqual( + len(scope_metric.metrics), len(_expected_metric_names) + ) + + for metric in scope_metric.metrics: + self.assertIn(metric.name, _expected_metric_names) + self.assertIsInstance( + metric.data, _expected_metric_names[metric.name][0] + ) + self.assertEqual( + metric.unit, _expected_metric_names[metric.name][1] + ) + self.assertEqual( + metric.description, + _expected_metric_names[metric.name][2] + ) + + data_points = list(metric.data.data_points) + self.assertEqual(len(data_points), 1) + + for point in data_points: + if isinstance(metric.data, Histogram): + self.assertIsInstance( + point, HistogramDataPoint + ) + self.assertEqual(point.count, 1) + if metric.name == "rpc.server.duration": + self.assertGreaterEqual(point.sum, 0) + elif metric.name == "rpc.server.request.size": + self.assertEqual(point.sum, len(request_ser)) + elif metric.name == "rpc.server.response.size": + self.assertEqual(point.sum, 0) + elif metric.name == "rpc.server.requests_per_rpc": + self.assertEqual(point.sum, 1) + elif metric.name == "rpc.server.responses_per_rpc": + self.assertEqual(point.sum, 1) + + self.assertMetricDataPointHasAttributes( + point, + { + SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + }, + ) + if metric.name == "rpc.server.duration": + self.assertMetricDataPointHasAttributes( + point, + { + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.INVALID_ARGUMENT.value[ + 0 + ] + }, + ) + data_point_seen &= True + + self.assertTrue(data_point_seen) + + def test_metrics_three_calls(self): + no_calls = 3 + + # Intercept gRPC calls... + interceptor = server_interceptor() + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(TestServer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + rpc_call = "/GRPCTestServer/SimpleMethod" + request = Request(client_id=1, request_data="test") + request_ser = request.SerializeToString() + + try: + server.start() + for _ in range(no_calls): + response = channel.unary_unary(rpc_call)(request_ser) + finally: + server.stop(None) + + metrics_list = self.memory_metrics_reader.get_metrics_data() + data_point_seen = True + + self.assertNotEqual(len(metrics_list.resource_metrics), 0) + for resource_metric in metrics_list.resource_metrics: + self.assertNotEqual(len(resource_metric.scope_metrics), 0) + for scope_metric in resource_metric.scope_metrics: + self.assertNotEqual(len(scope_metric.metrics), 0) + self.assertEqualMetricInstrumentationScope( + scope_metric, opentelemetry.instrumentation.grpc + ) + self.assertEqual( + len(scope_metric.metrics), len(_expected_metric_names) + ) + + for metric in scope_metric.metrics: + self.assertIn(metric.name, _expected_metric_names) + self.assertIsInstance( + metric.data, _expected_metric_names[metric.name][0] + ) + self.assertEqual( + metric.unit, _expected_metric_names[metric.name][1] + ) + self.assertEqual( + metric.description, + _expected_metric_names[metric.name][2] + ) + + data_points = list(metric.data.data_points) + self.assertEqual(len(data_points), 1) + + for point in data_points: + if isinstance(metric.data, Histogram): + self.assertIsInstance( + point, HistogramDataPoint + ) + self.assertEqual(point.count, no_calls) + if metric.name == "rpc.server.duration": + self.assertGreaterEqual(point.sum, 0) + elif metric.name == "rpc.server.request.size": + self.assertEqual( + point.sum, no_calls * len(request_ser) + ) + elif metric.name == "rpc.server.response.size": + self.assertEqual( + point.sum, no_calls * len(response) + ) + elif metric.name == "rpc.server.requests_per_rpc": + self.assertEqual(point.sum, no_calls) + elif metric.name == "rpc.server.responses_per_rpc": + self.assertEqual(point.sum, no_calls) + + self.assertMetricDataPointHasAttributes( + point, + { + SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: "SimpleMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + }, + ) + if metric.name == "rpc.server.duration": + self.assertMetricDataPointHasAttributes( + point, + { + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.OK.value[0] + }, + ) + data_point_seen &= True + + self.assertTrue(data_point_seen) + + def test_metrics_client_streaming(self): + + # Intercept gRPC calls... + interceptor = server_interceptor() + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(TestServer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + # setup the RPC + rpc_call = "/GRPCTestServer/ClientStreamingMethod" + request = Request(client_id=1, request_data="test") + requests = [request.SerializeToString() for _ in range(5)] + + try: + server.start() + response = channel.stream_unary(rpc_call)(iter(requests)) + finally: + server.stop(None) + + metrics_list = self.memory_metrics_reader.get_metrics_data() + data_point_seen = True + + self.assertNotEqual(len(metrics_list.resource_metrics), 0) + for resource_metric in metrics_list.resource_metrics: + self.assertNotEqual(len(resource_metric.scope_metrics), 0) + for scope_metric in resource_metric.scope_metrics: + self.assertNotEqual(len(scope_metric.metrics), 0) + self.assertEqualMetricInstrumentationScope( + scope_metric, opentelemetry.instrumentation.grpc + ) + self.assertEqual( + len(scope_metric.metrics), len(_expected_metric_names) + ) + + for metric in scope_metric.metrics: + self.assertIn(metric.name, _expected_metric_names) + self.assertIsInstance( + metric.data, _expected_metric_names[metric.name][0] + ) + self.assertEqual( + metric.unit, _expected_metric_names[metric.name][1] + ) + self.assertEqual( + metric.description, + _expected_metric_names[metric.name][2] + ) + + data_points = list(metric.data.data_points) + self.assertEqual(len(data_points), 1) + + for point in data_points: + if isinstance(metric.data, Histogram): + self.assertIsInstance( + point, HistogramDataPoint + ) + if metric.name == "rpc.server.duration": + self.assertEqual(point.count, 1) + self.assertGreaterEqual(point.sum, 0) + elif metric.name == "rpc.server.request.size": + self.assertEqual(point.count, len(requests)) + self.assertEqual( + point.sum, sum(map(len, requests)) + ) + elif metric.name == "rpc.server.response.size": + self.assertEqual(point.count, 1) + self.assertEqual(point.sum, len(response)) + elif metric.name == "rpc.server.requests_per_rpc": + self.assertEqual(point.count, 1) + self.assertEqual(point.sum, len(requests)) + elif metric.name == "rpc.server.responses_per_rpc": + self.assertEqual(point.count, 1) + self.assertEqual(point.sum, 1) + + self.assertMetricDataPointHasAttributes( + point, + { + SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: + "ClientStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + }, + ) + if metric.name == "rpc.server.duration": + self.assertMetricDataPointHasAttributes( + point, + { + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.OK.value[0] + }, + ) + data_point_seen &= True + + self.assertTrue(data_point_seen) + + def test_metrics_client_streaming_abort(self): + + error_message = "abort" + + # Intercept gRPC calls... + interceptor = server_interceptor() + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(TestServer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + # setup the RPC + rpc_call = "/GRPCTestServer/ClientStreamingMethod" + request = Request(client_id=1, request_data=error_message) + requests = [request.SerializeToString() for _ in range(5)] + + try: + server.start() + with self.assertRaises(grpc.RpcError): + channel.stream_unary(rpc_call)(iter(requests)) + finally: + server.stop(None) + + metrics_list = self.memory_metrics_reader.get_metrics_data() + data_point_seen = True + + self.assertNotEqual(len(metrics_list.resource_metrics), 0) + for resource_metric in metrics_list.resource_metrics: + self.assertNotEqual(len(resource_metric.scope_metrics), 0) + for scope_metric in resource_metric.scope_metrics: + self.assertNotEqual(len(scope_metric.metrics), 0) + self.assertEqualMetricInstrumentationScope( + scope_metric, opentelemetry.instrumentation.grpc + ) + self.assertLess( + len(scope_metric.metrics), len(_expected_metric_names) + ) + + for metric in scope_metric.metrics: + self.assertIn(metric.name, _expected_metric_names) + self.assertIsInstance( + metric.data, _expected_metric_names[metric.name][0] + ) + self.assertEqual( + metric.unit, _expected_metric_names[metric.name][1] + ) + self.assertEqual( + metric.description, + _expected_metric_names[metric.name][2] + ) + + data_points = list(metric.data.data_points) + self.assertEqual(len(data_points), 1) + + for point in data_points: + if isinstance(metric.data, Histogram): + self.assertIsInstance( + point, HistogramDataPoint + ) + if metric.name == "rpc.server.duration": + self.assertEqual(point.count, 1) + self.assertGreaterEqual(point.sum, 0) + elif metric.name == "rpc.server.request.size": + self.assertEqual(point.count, len(requests)) + self.assertEqual( + point.sum, sum(map(len, requests)) + ) + elif metric.name == "rpc.server.response.size": + self.assertEqual(point.count, 0) + # self.assertEqual(point.sum, 0) + elif metric.name == "rpc.server.requests_per_rpc": + self.assertEqual(point.count, 1) + self.assertEqual(point.sum, len(requests)) + elif metric.name == "rpc.server.responses_per_rpc": + self.assertEqual(point.count, 0) + # self.assertEqual(point.sum, 1) + + self.assertMetricDataPointHasAttributes( + point, + { + SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: + "ClientStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + }, + ) + if metric.name == "rpc.server.duration": + self.assertMetricDataPointHasAttributes( + point, + { + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.FAILED_PRECONDITION.value[ + 0 + ] + }, + ) + data_point_seen &= True + + self.assertTrue(data_point_seen) + + def test_metrics_server_streaming(self): + + # Intercept gRPC calls... + interceptor = server_interceptor() + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(TestServer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + # setup the RPC + rpc_call = "/GRPCTestServer/ServerStreamingMethod" + request = Request(client_id=1, request_data="test") + request_ser = request.SerializeToString() + + try: + server.start() + responses = list(channel.unary_stream(rpc_call)(request_ser)) + finally: + server.stop(None) + + metrics_list = self.memory_metrics_reader.get_metrics_data() + data_point_seen = True + + self.assertNotEqual(len(metrics_list.resource_metrics), 0) + for resource_metric in metrics_list.resource_metrics: + self.assertNotEqual(len(resource_metric.scope_metrics), 0) + for scope_metric in resource_metric.scope_metrics: + self.assertNotEqual(len(scope_metric.metrics), 0) + self.assertEqualMetricInstrumentationScope( + scope_metric, opentelemetry.instrumentation.grpc + ) + self.assertEqual( + len(scope_metric.metrics), len(_expected_metric_names) + ) + + for metric in scope_metric.metrics: + self.assertIn(metric.name, _expected_metric_names) + self.assertIsInstance( + metric.data, _expected_metric_names[metric.name][0] + ) + self.assertEqual( + metric.unit, _expected_metric_names[metric.name][1] + ) + self.assertEqual( + metric.description, + _expected_metric_names[metric.name][2] + ) + + data_points = list(metric.data.data_points) + self.assertEqual(len(data_points), 1) + + for point in data_points: + if isinstance(metric.data, Histogram): + self.assertIsInstance( + point, HistogramDataPoint + ) + if metric.name == "rpc.server.duration": + self.assertEqual(point.count, 1) + self.assertGreaterEqual(point.sum, 0) + elif metric.name == "rpc.server.request.size": + self.assertEqual(point.count, 1) + self.assertEqual(point.sum, len(request_ser)) + elif metric.name == "rpc.server.response.size": + self.assertEqual(point.count, len(responses)) + self.assertEqual( + point.sum, sum(map(len, responses)) + ) + elif metric.name == "rpc.server.requests_per_rpc": + self.assertEqual(point.count, 1) + self.assertEqual(point.sum, 1) + elif metric.name == "rpc.server.responses_per_rpc": + self.assertEqual(point.count, 1) + self.assertEqual(point.sum, len(responses)) + + self.assertMetricDataPointHasAttributes( + point, + { + SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: + "ServerStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + }, + ) + if metric.name == "rpc.server.duration": + self.assertMetricDataPointHasAttributes( + point, + { + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.OK.value[0] + }, + ) + data_point_seen &= True + + self.assertTrue(data_point_seen) + + def test_metrics_bidirectional_streaming(self): + + # Intercept gRPC calls... + interceptor = server_interceptor() + + with futures.ThreadPoolExecutor(max_workers=1) as executor: + server = grpc.server( + executor, + options=(("grpc.so_reuseport", 0),), + interceptors=[interceptor], + ) + add_GRPCTestServerServicer_to_server(TestServer(), server) + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel(f"localhost:{port:d}") + + # setup the RPC + rpc_call = "/GRPCTestServer/BidirectionalStreamingMethod" + request = Request(client_id=1, request_data="test") + requests = [request.SerializeToString() for _ in range(5)] + + try: + server.start() + responses = list( + channel.stream_stream(rpc_call)(iter(requests)) + ) + finally: + server.stop(None) + + metrics_list = self.memory_metrics_reader.get_metrics_data() + data_point_seen = True + + self.assertNotEqual(len(metrics_list.resource_metrics), 0) + for resource_metric in metrics_list.resource_metrics: + self.assertNotEqual(len(resource_metric.scope_metrics), 0) + for scope_metric in resource_metric.scope_metrics: + self.assertNotEqual(len(scope_metric.metrics), 0) + self.assertEqualMetricInstrumentationScope( + scope_metric, opentelemetry.instrumentation.grpc + ) + self.assertEqual( + len(scope_metric.metrics), len(_expected_metric_names) + ) + + for metric in scope_metric.metrics: + self.assertIn(metric.name, _expected_metric_names) + self.assertIsInstance( + metric.data, _expected_metric_names[metric.name][0] + ) + self.assertEqual( + metric.unit, _expected_metric_names[metric.name][1] + ) + self.assertEqual( + metric.description, + _expected_metric_names[metric.name][2] + ) + + data_points = list(metric.data.data_points) + self.assertEqual(len(data_points), 1) + + for point in data_points: + if isinstance(metric.data, Histogram): + self.assertIsInstance( + point, HistogramDataPoint + ) + if metric.name == "rpc.server.duration": + self.assertEqual(point.count, 1) + self.assertGreaterEqual(point.sum, 0) + elif metric.name == "rpc.server.request.size": + self.assertEqual(point.count, len(requests)) + self.assertEqual( + point.sum, sum(map(len, requests)) + ) + elif metric.name == "rpc.server.response.size": + self.assertEqual(point.count, len(responses)) + self.assertEqual( + point.sum, sum(map(len, responses)) + ) + elif metric.name == "rpc.server.requests_per_rpc": + self.assertEqual(point.count, 1) + self.assertEqual(point.sum, len(requests)) + elif metric.name == "rpc.server.responses_per_rpc": + self.assertEqual(point.count, 1) + self.assertEqual(point.sum, len(responses)) + + self.assertMetricDataPointHasAttributes( + point, + { + SpanAttributes.NET_PEER_IP: "[::1]", + # SpanAttributes.NET_PEER_PORT: "0", + SpanAttributes.NET_PEER_NAME: "localhost", + SpanAttributes.RPC_METHOD: + "BidirectionalStreamingMethod", + SpanAttributes.RPC_SERVICE: "GRPCTestServer", + SpanAttributes.RPC_SYSTEM: "grpc", + }, + ) + if metric.name == "rpc.server.duration": + self.assertMetricDataPointHasAttributes( + point, + { + SpanAttributes.RPC_GRPC_STATUS_CODE: + grpc.StatusCode.OK.value[0] + }, + ) + data_point_seen &= True + + self.assertTrue(data_point_seen) + def get_latch(num): """Get a countdown latch function for use in n threads."""