diff --git a/.flake8 b/.flake8 index 6d1c2bc1752..0c2204782fc 100644 --- a/.flake8 +++ b/.flake8 @@ -17,3 +17,4 @@ exclude = __pycache__ ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/ ext/opentelemetry-ext-jaeger/build/* + docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/ diff --git a/.isort.cfg b/.isort.cfg index 014a7c64eac..c5723b06a20 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -13,6 +13,6 @@ line_length=79 ; docs: https://github.com/timothycrosley/isort#multi-line-output-modes multi_line_output=3 skip=target -skip_glob=ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen/*,.venv*/*,venv*/* -known_first_party=opentelemetry +skip_glob=**/gen/*,.venv*/*,venv*/* +known_first_party=opentelemetry,opentelemetry_example_app known_third_party=psutil,pytest diff --git a/docs/examples/opentelemetry-example-app/setup.py b/docs/examples/opentelemetry-example-app/setup.py index c90da05047a..1466fd94635 100644 --- a/docs/examples/opentelemetry-example-app/setup.py +++ b/docs/examples/opentelemetry-example-app/setup.py @@ -41,6 +41,7 @@ "opentelemetry-ext-flask", "flask", "requests", + "protobuf~=3.11", ], license="Apache-2.0", package_dir={"": "src"}, diff --git a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/__init__.py b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/__init__.py b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/__init__.py new file mode 100644 index 00000000000..bcedda2270e --- /dev/null +++ b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/__init__.py @@ -0,0 +1,19 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import importlib +import sys + +# gRPC-generated modules expect other generated modules to be on the path. +sys.path.extend(importlib.util.find_spec(__name__).submodule_search_locations) diff --git a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/codegen.py b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/codegen.py new file mode 100755 index 00000000000..72c24bad39e --- /dev/null +++ b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/codegen.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from grpc_tools import protoc + + +def main(): + return protoc.main( + [ + "-I.", + "--python_out=.", + "--grpc_python_out=.", + "helloworld.proto", + "route_guide.proto", + ] + ) + + +if __name__ == "__main__": + main() diff --git a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/helloworld.proto b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/helloworld.proto new file mode 100644 index 00000000000..446102166db --- /dev/null +++ b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/helloworld.proto @@ -0,0 +1,35 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// https://github.com/grpc/grpc/blob/master/examples/protos/helloworld.proto + +syntax = "proto3"; + +package helloworld; + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} diff --git a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/helloworld_pb2.py b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/helloworld_pb2.py new file mode 100644 index 00000000000..0e37874ebc2 --- /dev/null +++ b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/helloworld_pb2.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: helloworld.proto + +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='helloworld.proto', + package='helloworld', + syntax='proto3', + serialized_options=None, + serialized_pb=b'\n\x10helloworld.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2I\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x62\x06proto3' +) + + + + +_HELLOREQUEST = _descriptor.Descriptor( + name='HelloRequest', + full_name='helloworld.HelloRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='helloworld.HelloRequest.name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=32, + serialized_end=60, +) + + +_HELLOREPLY = _descriptor.Descriptor( + name='HelloReply', + full_name='helloworld.HelloReply', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='message', full_name='helloworld.HelloReply.message', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=62, + serialized_end=91, +) + +DESCRIPTOR.message_types_by_name['HelloRequest'] = _HELLOREQUEST +DESCRIPTOR.message_types_by_name['HelloReply'] = _HELLOREPLY +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +HelloRequest = _reflection.GeneratedProtocolMessageType('HelloRequest', (_message.Message,), { + 'DESCRIPTOR' : _HELLOREQUEST, + '__module__' : 'helloworld_pb2' + # @@protoc_insertion_point(class_scope:helloworld.HelloRequest) + }) +_sym_db.RegisterMessage(HelloRequest) + +HelloReply = _reflection.GeneratedProtocolMessageType('HelloReply', (_message.Message,), { + 'DESCRIPTOR' : _HELLOREPLY, + '__module__' : 'helloworld_pb2' + # @@protoc_insertion_point(class_scope:helloworld.HelloReply) + }) +_sym_db.RegisterMessage(HelloReply) + + + +_GREETER = _descriptor.ServiceDescriptor( + name='Greeter', + full_name='helloworld.Greeter', + file=DESCRIPTOR, + index=0, + serialized_options=None, + serialized_start=93, + serialized_end=166, + methods=[ + _descriptor.MethodDescriptor( + name='SayHello', + full_name='helloworld.Greeter.SayHello', + index=0, + containing_service=None, + input_type=_HELLOREQUEST, + output_type=_HELLOREPLY, + serialized_options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_GREETER) + +DESCRIPTOR.services_by_name['Greeter'] = _GREETER + +# @@protoc_insertion_point(module_scope) diff --git a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/helloworld_pb2_grpc.py b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/helloworld_pb2_grpc.py new file mode 100644 index 00000000000..18e07d16797 --- /dev/null +++ b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/helloworld_pb2_grpc.py @@ -0,0 +1,46 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +import helloworld_pb2 as helloworld__pb2 + + +class GreeterStub(object): + """The greeting service definition. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SayHello = channel.unary_unary( + '/helloworld.Greeter/SayHello', + request_serializer=helloworld__pb2.HelloRequest.SerializeToString, + response_deserializer=helloworld__pb2.HelloReply.FromString, + ) + + +class GreeterServicer(object): + """The greeting service definition. + """ + + def SayHello(self, request, context): + """Sends a greeting + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_GreeterServicer_to_server(servicer, server): + rpc_method_handlers = { + 'SayHello': grpc.unary_unary_rpc_method_handler( + servicer.SayHello, + request_deserializer=helloworld__pb2.HelloRequest.FromString, + response_serializer=helloworld__pb2.HelloReply.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'helloworld.Greeter', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) diff --git a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/route_guide.proto b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/route_guide.proto new file mode 100644 index 00000000000..7017e93498d --- /dev/null +++ b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/route_guide.proto @@ -0,0 +1,108 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// https://github.com/grpc/grpc/blob/master/examples/protos/route_guide.proto + +syntax = "proto3"; + +package routeguide; + +// Interface exported by the server. +service RouteGuide { + // A simple RPC. + // + // Obtains the feature at a given position. + // + // A feature with an empty name is returned if there's no feature at the given + // position. + rpc GetFeature(Point) returns (Feature) {} + + // A server-to-client streaming RPC. + // + // Obtains the Features available within the given Rectangle. Results are + // streamed rather than returned at once (e.g. in a response message with a + // repeated field), as the rectangle may cover a large area and contain a + // huge number of features. + rpc ListFeatures(Rectangle) returns (stream Feature) {} + + // A client-to-server streaming RPC. + // + // Accepts a stream of Points on a route being traversed, returning a + // RouteSummary when traversal is completed. + rpc RecordRoute(stream Point) returns (RouteSummary) {} + + // A Bidirectional streaming RPC. + // + // Accepts a stream of RouteNotes sent while a route is being traversed, + // while receiving other RouteNotes (e.g. from other users). + rpc RouteChat(stream RouteNote) returns (stream RouteNote) {} +} + +// Points are represented as latitude-longitude pairs in the E7 representation +// (degrees multiplied by 10**7 and rounded to the nearest integer). +// Latitudes should be in the range +/- 90 degrees and longitude should be in +// the range +/- 180 degrees (inclusive). +message Point { + int32 latitude = 1; + int32 longitude = 2; +} + +// A latitude-longitude rectangle, represented as two diagonally opposite +// points "lo" and "hi". +message Rectangle { + // One corner of the rectangle. + Point lo = 1; + + // The other corner of the rectangle. + Point hi = 2; +} + +// A feature names something at a given point. +// +// If a feature could not be named, the name is empty. +message Feature { + // The name of the feature. + string name = 1; + + // The point where the feature is detected. + Point location = 2; +} + +// A RouteNote is a message sent while at a given point. +message RouteNote { + // The location from which the message is sent. + Point location = 1; + + // The message to be sent. + string message = 2; +} + +// A RouteSummary is received in response to a RecordRoute rpc. +// +// It contains the number of individual points received, the number of +// detected features, and the total distance covered as the cumulative sum of +// the distance between each point. +message RouteSummary { + // The number of points received. + int32 point_count = 1; + + // The number of known features passed while traversing the route. + int32 feature_count = 2; + + // The distance covered in metres. + int32 distance = 3; + + // The duration of the traversal in seconds. + int32 elapsed_time = 4; +} diff --git a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/route_guide_pb2.py b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/route_guide_pb2.py new file mode 100644 index 00000000000..4a4006a2c77 --- /dev/null +++ b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/route_guide_pb2.py @@ -0,0 +1,328 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: route_guide.proto + +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='route_guide.proto', + package='routeguide', + syntax='proto3', + serialized_options=None, + serialized_pb=b'\n\x11route_guide.proto\x12\nrouteguide\",\n\x05Point\x12\x10\n\x08latitude\x18\x01 \x01(\x05\x12\x11\n\tlongitude\x18\x02 \x01(\x05\"I\n\tRectangle\x12\x1d\n\x02lo\x18\x01 \x01(\x0b\x32\x11.routeguide.Point\x12\x1d\n\x02hi\x18\x02 \x01(\x0b\x32\x11.routeguide.Point\"<\n\x07\x46\x65\x61ture\x12\x0c\n\x04name\x18\x01 \x01(\t\x12#\n\x08location\x18\x02 \x01(\x0b\x32\x11.routeguide.Point\"A\n\tRouteNote\x12#\n\x08location\x18\x01 \x01(\x0b\x32\x11.routeguide.Point\x12\x0f\n\x07message\x18\x02 \x01(\t\"b\n\x0cRouteSummary\x12\x13\n\x0bpoint_count\x18\x01 \x01(\x05\x12\x15\n\rfeature_count\x18\x02 \x01(\x05\x12\x10\n\x08\x64istance\x18\x03 \x01(\x05\x12\x14\n\x0c\x65lapsed_time\x18\x04 \x01(\x05\x32\x85\x02\n\nRouteGuide\x12\x36\n\nGetFeature\x12\x11.routeguide.Point\x1a\x13.routeguide.Feature\"\x00\x12>\n\x0cListFeatures\x12\x15.routeguide.Rectangle\x1a\x13.routeguide.Feature\"\x00\x30\x01\x12>\n\x0bRecordRoute\x12\x11.routeguide.Point\x1a\x18.routeguide.RouteSummary\"\x00(\x01\x12?\n\tRouteChat\x12\x15.routeguide.RouteNote\x1a\x15.routeguide.RouteNote\"\x00(\x01\x30\x01\x62\x06proto3' +) + + + + +_POINT = _descriptor.Descriptor( + name='Point', + full_name='routeguide.Point', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='latitude', full_name='routeguide.Point.latitude', index=0, + number=1, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='longitude', full_name='routeguide.Point.longitude', index=1, + number=2, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=33, + serialized_end=77, +) + + +_RECTANGLE = _descriptor.Descriptor( + name='Rectangle', + full_name='routeguide.Rectangle', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='lo', full_name='routeguide.Rectangle.lo', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='hi', full_name='routeguide.Rectangle.hi', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=79, + serialized_end=152, +) + + +_FEATURE = _descriptor.Descriptor( + name='Feature', + full_name='routeguide.Feature', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='routeguide.Feature.name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='location', full_name='routeguide.Feature.location', index=1, + number=2, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=154, + serialized_end=214, +) + + +_ROUTENOTE = _descriptor.Descriptor( + name='RouteNote', + full_name='routeguide.RouteNote', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='location', full_name='routeguide.RouteNote.location', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='message', full_name='routeguide.RouteNote.message', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=216, + serialized_end=281, +) + + +_ROUTESUMMARY = _descriptor.Descriptor( + name='RouteSummary', + full_name='routeguide.RouteSummary', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='point_count', full_name='routeguide.RouteSummary.point_count', index=0, + number=1, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='feature_count', full_name='routeguide.RouteSummary.feature_count', index=1, + number=2, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='distance', full_name='routeguide.RouteSummary.distance', index=2, + number=3, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='elapsed_time', full_name='routeguide.RouteSummary.elapsed_time', index=3, + number=4, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=283, + serialized_end=381, +) + +_RECTANGLE.fields_by_name['lo'].message_type = _POINT +_RECTANGLE.fields_by_name['hi'].message_type = _POINT +_FEATURE.fields_by_name['location'].message_type = _POINT +_ROUTENOTE.fields_by_name['location'].message_type = _POINT +DESCRIPTOR.message_types_by_name['Point'] = _POINT +DESCRIPTOR.message_types_by_name['Rectangle'] = _RECTANGLE +DESCRIPTOR.message_types_by_name['Feature'] = _FEATURE +DESCRIPTOR.message_types_by_name['RouteNote'] = _ROUTENOTE +DESCRIPTOR.message_types_by_name['RouteSummary'] = _ROUTESUMMARY +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +Point = _reflection.GeneratedProtocolMessageType('Point', (_message.Message,), { + 'DESCRIPTOR' : _POINT, + '__module__' : 'route_guide_pb2' + # @@protoc_insertion_point(class_scope:routeguide.Point) + }) +_sym_db.RegisterMessage(Point) + +Rectangle = _reflection.GeneratedProtocolMessageType('Rectangle', (_message.Message,), { + 'DESCRIPTOR' : _RECTANGLE, + '__module__' : 'route_guide_pb2' + # @@protoc_insertion_point(class_scope:routeguide.Rectangle) + }) +_sym_db.RegisterMessage(Rectangle) + +Feature = _reflection.GeneratedProtocolMessageType('Feature', (_message.Message,), { + 'DESCRIPTOR' : _FEATURE, + '__module__' : 'route_guide_pb2' + # @@protoc_insertion_point(class_scope:routeguide.Feature) + }) +_sym_db.RegisterMessage(Feature) + +RouteNote = _reflection.GeneratedProtocolMessageType('RouteNote', (_message.Message,), { + 'DESCRIPTOR' : _ROUTENOTE, + '__module__' : 'route_guide_pb2' + # @@protoc_insertion_point(class_scope:routeguide.RouteNote) + }) +_sym_db.RegisterMessage(RouteNote) + +RouteSummary = _reflection.GeneratedProtocolMessageType('RouteSummary', (_message.Message,), { + 'DESCRIPTOR' : _ROUTESUMMARY, + '__module__' : 'route_guide_pb2' + # @@protoc_insertion_point(class_scope:routeguide.RouteSummary) + }) +_sym_db.RegisterMessage(RouteSummary) + + + +_ROUTEGUIDE = _descriptor.ServiceDescriptor( + name='RouteGuide', + full_name='routeguide.RouteGuide', + file=DESCRIPTOR, + index=0, + serialized_options=None, + serialized_start=384, + serialized_end=645, + methods=[ + _descriptor.MethodDescriptor( + name='GetFeature', + full_name='routeguide.RouteGuide.GetFeature', + index=0, + containing_service=None, + input_type=_POINT, + output_type=_FEATURE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='ListFeatures', + full_name='routeguide.RouteGuide.ListFeatures', + index=1, + containing_service=None, + input_type=_RECTANGLE, + output_type=_FEATURE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='RecordRoute', + full_name='routeguide.RouteGuide.RecordRoute', + index=2, + containing_service=None, + input_type=_POINT, + output_type=_ROUTESUMMARY, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='RouteChat', + full_name='routeguide.RouteGuide.RouteChat', + index=3, + containing_service=None, + input_type=_ROUTENOTE, + output_type=_ROUTENOTE, + serialized_options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_ROUTEGUIDE) + +DESCRIPTOR.services_by_name['RouteGuide'] = _ROUTEGUIDE + +# @@protoc_insertion_point(module_scope) diff --git a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/route_guide_pb2_grpc.py b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/route_guide_pb2_grpc.py new file mode 100644 index 00000000000..05c1b793128 --- /dev/null +++ b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen/route_guide_pb2_grpc.py @@ -0,0 +1,113 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +import route_guide_pb2 as route__guide__pb2 + + +class RouteGuideStub(object): + """Interface exported by the server. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.GetFeature = channel.unary_unary( + '/routeguide.RouteGuide/GetFeature', + request_serializer=route__guide__pb2.Point.SerializeToString, + response_deserializer=route__guide__pb2.Feature.FromString, + ) + self.ListFeatures = channel.unary_stream( + '/routeguide.RouteGuide/ListFeatures', + request_serializer=route__guide__pb2.Rectangle.SerializeToString, + response_deserializer=route__guide__pb2.Feature.FromString, + ) + self.RecordRoute = channel.stream_unary( + '/routeguide.RouteGuide/RecordRoute', + request_serializer=route__guide__pb2.Point.SerializeToString, + response_deserializer=route__guide__pb2.RouteSummary.FromString, + ) + self.RouteChat = channel.stream_stream( + '/routeguide.RouteGuide/RouteChat', + request_serializer=route__guide__pb2.RouteNote.SerializeToString, + response_deserializer=route__guide__pb2.RouteNote.FromString, + ) + + +class RouteGuideServicer(object): + """Interface exported by the server. + """ + + def GetFeature(self, request, context): + """A simple RPC. + + Obtains the feature at a given position. + + A feature with an empty name is returned if there's no feature at the given + position. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def ListFeatures(self, request, context): + """A server-to-client streaming RPC. + + Obtains the Features available within the given Rectangle. Results are + streamed rather than returned at once (e.g. in a response message with a + repeated field), as the rectangle may cover a large area and contain a + huge number of features. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def RecordRoute(self, request_iterator, context): + """A client-to-server streaming RPC. + + Accepts a stream of Points on a route being traversed, returning a + RouteSummary when traversal is completed. + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def RouteChat(self, request_iterator, context): + """A Bidirectional streaming RPC. + + Accepts a stream of RouteNotes sent while a route is being traversed, + while receiving other RouteNotes (e.g. from other users). + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_RouteGuideServicer_to_server(servicer, server): + rpc_method_handlers = { + 'GetFeature': grpc.unary_unary_rpc_method_handler( + servicer.GetFeature, + request_deserializer=route__guide__pb2.Point.FromString, + response_serializer=route__guide__pb2.Feature.SerializeToString, + ), + 'ListFeatures': grpc.unary_stream_rpc_method_handler( + servicer.ListFeatures, + request_deserializer=route__guide__pb2.Rectangle.FromString, + response_serializer=route__guide__pb2.Feature.SerializeToString, + ), + 'RecordRoute': grpc.stream_unary_rpc_method_handler( + servicer.RecordRoute, + request_deserializer=route__guide__pb2.Point.FromString, + response_serializer=route__guide__pb2.RouteSummary.SerializeToString, + ), + 'RouteChat': grpc.stream_stream_rpc_method_handler( + servicer.RouteChat, + request_deserializer=route__guide__pb2.RouteNote.FromString, + response_serializer=route__guide__pb2.RouteNote.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'routeguide.RouteGuide', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) diff --git a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/hello_world_client.py b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/hello_world_client.py new file mode 100755 index 00000000000..2f2351b9afc --- /dev/null +++ b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/hello_world_client.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=import-error + +"""The Python implementation of the GRPC helloworld.Greeter client. + +Note that you need ``opentelemetry-ext-grpc`` and ``protobuf`` to be installed +to run these examples. To run this script in the context of the example app, +install ``opentelemetry-example-app``:: + + pip install -e ext/opentelemetry-ext-grpc/ + pip install -e docs/examples/opentelemetry-example-app + +Then run the server in one shell:: + + python -m opentelemetry_example_app.grpc.hello_world_server + +and the client in another:: + + python -m opentelemetry_example_app.grpc.hello_world_client + +See also: +https://github.com/grpc/grpc/blob/master/examples/python/helloworld/greeter_client.py +https://github.com/grpc/grpc/blob/v1.16.x/examples/python/interceptors/default_value/greeter_client.py +""" + +import logging + +import grpc + +from opentelemetry import trace +from opentelemetry.ext.grpc import client_interceptor +from opentelemetry.ext.grpc.grpcext import intercept_channel +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleExportSpanProcessor, +) + +try: + # Relative imports should work in the context of the package, e.g.: + # `python -m opentelemetry_example_app.grpc.hello_world_client`. + from .gen import helloworld_pb2, helloworld_pb2_grpc +except ImportError: + # This will fail when running the file as a script, e.g.: + # `./hello_world_client.py` + # fall back to importing from the same directory in this case. + from gen import helloworld_pb2, helloworld_pb2_grpc + +trace.set_tracer_provider(TracerProvider()) +trace.get_tracer_provider().add_span_processor( + SimpleExportSpanProcessor(ConsoleSpanExporter()) +) +tracer = trace.get_tracer(__name__) + + +def run(): + # NOTE(gRPC Python Team): .close() is possible on a channel and should be + # used in circumstances in which the with statement does not fit the needs + # of the code. + with grpc.insecure_channel("localhost:50051") as channel: + + channel = intercept_channel(channel, client_interceptor(tracer)) + + stub = helloworld_pb2_grpc.GreeterStub(channel) + + # stub.SayHello is a _InterceptorUnaryUnaryMultiCallable + response = stub.SayHello(helloworld_pb2.HelloRequest(name="YOU")) + + print("Greeter client received: " + response.message) + + +if __name__ == "__main__": + logging.basicConfig() + run() diff --git a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/hello_world_server.py b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/hello_world_server.py new file mode 100755 index 00000000000..86dcd66527c --- /dev/null +++ b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/hello_world_server.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=import-error + +"""The Python implementation of the GRPC helloworld.Greeter server. + +Note that you need ``opentelemetry-ext-grpc`` and ``protobuf`` to be installed +to run these examples. To run this script in the context of the example app, +install ``opentelemetry-example-app``:: + + pip install -e ext/opentelemetry-ext-grpc/ + pip install -e docs/examples/opentelemetry-example-app + +Then run the server in one shell:: + + python -m opentelemetry_example_app.grpc.hello_world_client + +and the client in another:: + + python -m opentelemetry_example_app.grpc.hello_world_server + +See also: +https://github.com/grpc/grpc/blob/master/examples/python/helloworld/greeter_server.py +""" + +import logging +from concurrent import futures + +import grpc + +from opentelemetry import trace +from opentelemetry.ext.grpc import server_interceptor +from opentelemetry.ext.grpc.grpcext import intercept_server +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleExportSpanProcessor, +) + +try: + # Relative imports should work in the context of the package, e.g.: + # `python -m opentelemetry_example_app.grpc.hello_world_server`. + from .gen import helloworld_pb2, helloworld_pb2_grpc +except ImportError: + # This will fail when running the file as a script, e.g.: + # `./hello_world_server.py` + # fall back to importing from the same directory in this case. + from gen import helloworld_pb2, helloworld_pb2_grpc + +trace.set_tracer_provider(TracerProvider()) +trace.get_tracer_provider().add_span_processor( + SimpleExportSpanProcessor(ConsoleSpanExporter()) +) +tracer = trace.get_tracer(__name__) + + +class Greeter(helloworld_pb2_grpc.GreeterServicer): + def SayHello(self, request, context): + return helloworld_pb2.HelloReply(message="Hello, %s!" % request.name) + + +def serve(): + + server = grpc.server(futures.ThreadPoolExecutor()) + server = intercept_server(server, server_interceptor(tracer)) + + helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) + server.add_insecure_port("[::]:50051") + server.start() + server.wait_for_termination() + + +if __name__ == "__main__": + logging.basicConfig() + serve() diff --git a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/route_guide_client.py b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/route_guide_client.py new file mode 100755 index 00000000000..18391b4228c --- /dev/null +++ b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/route_guide_client.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=import-error + +"""The Python implementation of the gRPC route guide client. + +Note that you need ``opentelemetry-ext-grpc`` and ``protobuf`` to be installed +to run these examples. To run this script in the context of the example app, +install ``opentelemetry-example-app``:: + + pip install -e ext/opentelemetry-ext-grpc/ + pip install -e docs/examples/opentelemetry-example-app + +Then run the server in one shell:: + + python -m opentelemetry_example_app.grpc.route_guide_server + +and the client in another:: + + python -m opentelemetry_example_app.grpc.route_guide_client + +See also: +https://github.com/grpc/grpc/tree/master/examples/python/route_guide +""" + + +import logging +import random + +import grpc + +from opentelemetry import trace +from opentelemetry.ext.grpc import client_interceptor +from opentelemetry.ext.grpc.grpcext import intercept_channel +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleExportSpanProcessor, +) + +try: + # Relative imports should work in the context of the package, e.g.: + # `python -m opentelemetry_example_app.grpc.route_guide_client`. + from .gen import route_guide_pb2, route_guide_pb2_grpc + from . import route_guide_resources +except ImportError: + # This will fail when running the file as a script, e.g.: + # `./route_guide_client.py` + # fall back to importing from the same directory in this case. + from gen import route_guide_pb2, route_guide_pb2_grpc + import route_guide_resources + +trace.set_tracer_provider(TracerProvider()) +trace.get_tracer_provider().add_span_processor( + SimpleExportSpanProcessor(ConsoleSpanExporter()) +) +tracer = trace.get_tracer(__name__) + + +def make_route_note(message, latitude, longitude): + return route_guide_pb2.RouteNote( + message=message, + location=route_guide_pb2.Point(latitude=latitude, longitude=longitude), + ) + + +def guide_get_one_feature(stub, point): + feature = stub.GetFeature(point) + if not feature.location: + print("Server returned incomplete feature") + return + + if feature.name: + print("Feature called %s at %s" % (feature.name, feature.location)) + else: + print("Found no feature at %s" % feature.location) + + +def guide_get_feature(stub): + guide_get_one_feature( + stub, route_guide_pb2.Point(latitude=409146138, longitude=-746188906) + ) + guide_get_one_feature(stub, route_guide_pb2.Point(latitude=0, longitude=0)) + + +def guide_list_features(stub): + rectangle = route_guide_pb2.Rectangle( + lo=route_guide_pb2.Point(latitude=400000000, longitude=-750000000), + hi=route_guide_pb2.Point(latitude=420000000, longitude=-730000000), + ) + print("Looking for features between 40, -75 and 42, -73") + + features = stub.ListFeatures(rectangle) + + for feature in features: + print("Feature called %s at %s" % (feature.name, feature.location)) + + +def generate_route(feature_list): + for _ in range(0, 10): + random_feature = feature_list[random.randint(0, len(feature_list) - 1)] + print("Visiting point %s" % random_feature.location) + yield random_feature.location + + +def guide_record_route(stub): + feature_list = route_guide_resources.read_route_guide_database() + + route_iterator = generate_route(feature_list) + route_summary = stub.RecordRoute(route_iterator) + print("Finished trip with %s points " % route_summary.point_count) + print("Passed %s features " % route_summary.feature_count) + print("Travelled %s meters " % route_summary.distance) + print("It took %s seconds " % route_summary.elapsed_time) + + +def generate_messages(): + messages = [ + make_route_note("First message", 0, 0), + make_route_note("Second message", 0, 1), + make_route_note("Third message", 1, 0), + make_route_note("Fourth message", 0, 0), + make_route_note("Fifth message", 1, 0), + ] + for msg in messages: + print("Sending %s at %s" % (msg.message, msg.location)) + yield msg + + +def guide_route_chat(stub): + responses = stub.RouteChat(generate_messages()) + for response in responses: + print( + "Received message %s at %s" % (response.message, response.location) + ) + + +def run(): + + # NOTE(gRPC Python Team): .close() is possible on a channel and should be + # used in circumstances in which the with statement does not fit the needs + # of the code. + with grpc.insecure_channel("localhost:50051") as channel: + channel = intercept_channel(channel, client_interceptor(tracer)) + + stub = route_guide_pb2_grpc.RouteGuideStub(channel) + + # Unary + print("-------------- GetFeature --------------") + guide_get_feature(stub) + + # Server streaming + print("-------------- ListFeatures --------------") + guide_list_features(stub) + + # Client streaming + print("-------------- RecordRoute --------------") + guide_record_route(stub) + + # Bidirectional streaming + print("-------------- RouteChat --------------") + guide_route_chat(stub) + + +if __name__ == "__main__": + logging.basicConfig() + run() diff --git a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/route_guide_db.json b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/route_guide_db.json new file mode 100644 index 00000000000..9d6a980ab7d --- /dev/null +++ b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/route_guide_db.json @@ -0,0 +1,601 @@ +[{ + "location": { + "latitude": 407838351, + "longitude": -746143763 + }, + "name": "Patriots Path, Mendham, NJ 07945, USA" +}, { + "location": { + "latitude": 408122808, + "longitude": -743999179 + }, + "name": "101 New Jersey 10, Whippany, NJ 07981, USA" +}, { + "location": { + "latitude": 413628156, + "longitude": -749015468 + }, + "name": "U.S. 6, Shohola, PA 18458, USA" +}, { + "location": { + "latitude": 419999544, + "longitude": -740371136 + }, + "name": "5 Conners Road, Kingston, NY 12401, USA" +}, { + "location": { + "latitude": 414008389, + "longitude": -743951297 + }, + "name": "Mid Hudson Psychiatric Center, New Hampton, NY 10958, USA" +}, { + "location": { + "latitude": 419611318, + "longitude": -746524769 + }, + "name": "287 Flugertown Road, Livingston Manor, NY 12758, USA" +}, { + "location": { + "latitude": 406109563, + "longitude": -742186778 + }, + "name": "4001 Tremley Point Road, Linden, NJ 07036, USA" +}, { + "location": { + "latitude": 416802456, + "longitude": -742370183 + }, + "name": "352 South Mountain Road, Wallkill, NY 12589, USA" +}, { + "location": { + "latitude": 412950425, + "longitude": -741077389 + }, + "name": "Bailey Turn Road, Harriman, NY 10926, USA" +}, { + "location": { + "latitude": 412144655, + "longitude": -743949739 + }, + "name": "193-199 Wawayanda Road, Hewitt, NJ 07421, USA" +}, { + "location": { + "latitude": 415736605, + "longitude": -742847522 + }, + "name": "406-496 Ward Avenue, Pine Bush, NY 12566, USA" +}, { + "location": { + "latitude": 413843930, + "longitude": -740501726 + }, + "name": "162 Merrill Road, Highland Mills, NY 10930, USA" +}, { + "location": { + "latitude": 410873075, + "longitude": -744459023 + }, + "name": "Clinton Road, West Milford, NJ 07480, USA" +}, { + "location": { + "latitude": 412346009, + "longitude": -744026814 + }, + "name": "16 Old Brook Lane, Warwick, NY 10990, USA" +}, { + "location": { + "latitude": 402948455, + "longitude": -747903913 + }, + "name": "3 Drake Lane, Pennington, NJ 08534, USA" +}, { + "location": { + "latitude": 406337092, + "longitude": -740122226 + }, + "name": "6324 8th Avenue, Brooklyn, NY 11220, USA" +}, { + "location": { + "latitude": 406421967, + "longitude": -747727624 + }, + "name": "1 Merck Access Road, Whitehouse Station, NJ 08889, USA" +}, { + "location": { + "latitude": 416318082, + "longitude": -749677716 + }, + "name": "78-98 Schalck Road, Narrowsburg, NY 12764, USA" +}, { + "location": { + "latitude": 415301720, + "longitude": -748416257 + }, + "name": "282 Lakeview Drive Road, Highland Lake, NY 12743, USA" +}, { + "location": { + "latitude": 402647019, + "longitude": -747071791 + }, + "name": "330 Evelyn Avenue, Hamilton Township, NJ 08619, USA" +}, { + "location": { + "latitude": 412567807, + "longitude": -741058078 + }, + "name": "New York State Reference Route 987E, Southfields, NY 10975, USA" +}, { + "location": { + "latitude": 416855156, + "longitude": -744420597 + }, + "name": "103-271 Tempaloni Road, Ellenville, NY 12428, USA" +}, { + "location": { + "latitude": 404663628, + "longitude": -744820157 + }, + "name": "1300 Airport Road, North Brunswick Township, NJ 08902, USA" +}, { + "location": { + "latitude": 407113723, + "longitude": -749746483 + }, + "name": "" +}, { + "location": { + "latitude": 402133926, + "longitude": -743613249 + }, + "name": "" +}, { + "location": { + "latitude": 400273442, + "longitude": -741220915 + }, + "name": "" +}, { + "location": { + "latitude": 411236786, + "longitude": -744070769 + }, + "name": "" +}, { + "location": { + "latitude": 411633782, + "longitude": -746784970 + }, + "name": "211-225 Plains Road, Augusta, NJ 07822, USA" +}, { + "location": { + "latitude": 415830701, + "longitude": -742952812 + }, + "name": "" +}, { + "location": { + "latitude": 413447164, + "longitude": -748712898 + }, + "name": "165 Pedersen Ridge Road, Milford, PA 18337, USA" +}, { + "location": { + "latitude": 405047245, + "longitude": -749800722 + }, + "name": "100-122 Locktown Road, Frenchtown, NJ 08825, USA" +}, { + "location": { + "latitude": 418858923, + "longitude": -746156790 + }, + "name": "" +}, { + "location": { + "latitude": 417951888, + "longitude": -748484944 + }, + "name": "650-652 Willi Hill Road, Swan Lake, NY 12783, USA" +}, { + "location": { + "latitude": 407033786, + "longitude": -743977337 + }, + "name": "26 East 3rd Street, New Providence, NJ 07974, USA" +}, { + "location": { + "latitude": 417548014, + "longitude": -740075041 + }, + "name": "" +}, { + "location": { + "latitude": 410395868, + "longitude": -744972325 + }, + "name": "" +}, { + "location": { + "latitude": 404615353, + "longitude": -745129803 + }, + "name": "" +}, { + "location": { + "latitude": 406589790, + "longitude": -743560121 + }, + "name": "611 Lawrence Avenue, Westfield, NJ 07090, USA" +}, { + "location": { + "latitude": 414653148, + "longitude": -740477477 + }, + "name": "18 Lannis Avenue, New Windsor, NY 12553, USA" +}, { + "location": { + "latitude": 405957808, + "longitude": -743255336 + }, + "name": "82-104 Amherst Avenue, Colonia, NJ 07067, USA" +}, { + "location": { + "latitude": 411733589, + "longitude": -741648093 + }, + "name": "170 Seven Lakes Drive, Sloatsburg, NY 10974, USA" +}, { + "location": { + "latitude": 412676291, + "longitude": -742606606 + }, + "name": "1270 Lakes Road, Monroe, NY 10950, USA" +}, { + "location": { + "latitude": 409224445, + "longitude": -748286738 + }, + "name": "509-535 Alphano Road, Great Meadows, NJ 07838, USA" +}, { + "location": { + "latitude": 406523420, + "longitude": -742135517 + }, + "name": "652 Garden Street, Elizabeth, NJ 07202, USA" +}, { + "location": { + "latitude": 401827388, + "longitude": -740294537 + }, + "name": "349 Sea Spray Court, Neptune City, NJ 07753, USA" +}, { + "location": { + "latitude": 410564152, + "longitude": -743685054 + }, + "name": "13-17 Stanley Street, West Milford, NJ 07480, USA" +}, { + "location": { + "latitude": 408472324, + "longitude": -740726046 + }, + "name": "47 Industrial Avenue, Teterboro, NJ 07608, USA" +}, { + "location": { + "latitude": 412452168, + "longitude": -740214052 + }, + "name": "5 White Oak Lane, Stony Point, NY 10980, USA" +}, { + "location": { + "latitude": 409146138, + "longitude": -746188906 + }, + "name": "Berkshire Valley Management Area Trail, Jefferson, NJ, USA" +}, { + "location": { + "latitude": 404701380, + "longitude": -744781745 + }, + "name": "1007 Jersey Avenue, New Brunswick, NJ 08901, USA" +}, { + "location": { + "latitude": 409642566, + "longitude": -746017679 + }, + "name": "6 East Emerald Isle Drive, Lake Hopatcong, NJ 07849, USA" +}, { + "location": { + "latitude": 408031728, + "longitude": -748645385 + }, + "name": "1358-1474 New Jersey 57, Port Murray, NJ 07865, USA" +}, { + "location": { + "latitude": 413700272, + "longitude": -742135189 + }, + "name": "367 Prospect Road, Chester, NY 10918, USA" +}, { + "location": { + "latitude": 404310607, + "longitude": -740282632 + }, + "name": "10 Simon Lake Drive, Atlantic Highlands, NJ 07716, USA" +}, { + "location": { + "latitude": 409319800, + "longitude": -746201391 + }, + "name": "11 Ward Street, Mount Arlington, NJ 07856, USA" +}, { + "location": { + "latitude": 406685311, + "longitude": -742108603 + }, + "name": "300-398 Jefferson Avenue, Elizabeth, NJ 07201, USA" +}, { + "location": { + "latitude": 419018117, + "longitude": -749142781 + }, + "name": "43 Dreher Road, Roscoe, NY 12776, USA" +}, { + "location": { + "latitude": 412856162, + "longitude": -745148837 + }, + "name": "Swan Street, Pine Island, NY 10969, USA" +}, { + "location": { + "latitude": 416560744, + "longitude": -746721964 + }, + "name": "66 Pleasantview Avenue, Monticello, NY 12701, USA" +}, { + "location": { + "latitude": 405314270, + "longitude": -749836354 + }, + "name": "" +}, { + "location": { + "latitude": 414219548, + "longitude": -743327440 + }, + "name": "" +}, { + "location": { + "latitude": 415534177, + "longitude": -742900616 + }, + "name": "565 Winding Hills Road, Montgomery, NY 12549, USA" +}, { + "location": { + "latitude": 406898530, + "longitude": -749127080 + }, + "name": "231 Rocky Run Road, Glen Gardner, NJ 08826, USA" +}, { + "location": { + "latitude": 407586880, + "longitude": -741670168 + }, + "name": "100 Mount Pleasant Avenue, Newark, NJ 07104, USA" +}, { + "location": { + "latitude": 400106455, + "longitude": -742870190 + }, + "name": "517-521 Huntington Drive, Manchester Township, NJ 08759, USA" +}, { + "location": { + "latitude": 400066188, + "longitude": -746793294 + }, + "name": "" +}, { + "location": { + "latitude": 418803880, + "longitude": -744102673 + }, + "name": "40 Mountain Road, Napanoch, NY 12458, USA" +}, { + "location": { + "latitude": 414204288, + "longitude": -747895140 + }, + "name": "" +}, { + "location": { + "latitude": 414777405, + "longitude": -740615601 + }, + "name": "" +}, { + "location": { + "latitude": 415464475, + "longitude": -747175374 + }, + "name": "48 North Road, Forestburgh, NY 12777, USA" +}, { + "location": { + "latitude": 404062378, + "longitude": -746376177 + }, + "name": "" +}, { + "location": { + "latitude": 405688272, + "longitude": -749285130 + }, + "name": "" +}, { + "location": { + "latitude": 400342070, + "longitude": -748788996 + }, + "name": "" +}, { + "location": { + "latitude": 401809022, + "longitude": -744157964 + }, + "name": "" +}, { + "location": { + "latitude": 404226644, + "longitude": -740517141 + }, + "name": "9 Thompson Avenue, Leonardo, NJ 07737, USA" +}, { + "location": { + "latitude": 410322033, + "longitude": -747871659 + }, + "name": "" +}, { + "location": { + "latitude": 407100674, + "longitude": -747742727 + }, + "name": "" +}, { + "location": { + "latitude": 418811433, + "longitude": -741718005 + }, + "name": "213 Bush Road, Stone Ridge, NY 12484, USA" +}, { + "location": { + "latitude": 415034302, + "longitude": -743850945 + }, + "name": "" +}, { + "location": { + "latitude": 411349992, + "longitude": -743694161 + }, + "name": "" +}, { + "location": { + "latitude": 404839914, + "longitude": -744759616 + }, + "name": "1-17 Bergen Court, New Brunswick, NJ 08901, USA" +}, { + "location": { + "latitude": 414638017, + "longitude": -745957854 + }, + "name": "35 Oakland Valley Road, Cuddebackville, NY 12729, USA" +}, { + "location": { + "latitude": 412127800, + "longitude": -740173578 + }, + "name": "" +}, { + "location": { + "latitude": 401263460, + "longitude": -747964303 + }, + "name": "" +}, { + "location": { + "latitude": 412843391, + "longitude": -749086026 + }, + "name": "" +}, { + "location": { + "latitude": 418512773, + "longitude": -743067823 + }, + "name": "" +}, { + "location": { + "latitude": 404318328, + "longitude": -740835638 + }, + "name": "42-102 Main Street, Belford, NJ 07718, USA" +}, { + "location": { + "latitude": 419020746, + "longitude": -741172328 + }, + "name": "" +}, { + "location": { + "latitude": 404080723, + "longitude": -746119569 + }, + "name": "" +}, { + "location": { + "latitude": 401012643, + "longitude": -744035134 + }, + "name": "" +}, { + "location": { + "latitude": 404306372, + "longitude": -741079661 + }, + "name": "" +}, { + "location": { + "latitude": 403966326, + "longitude": -748519297 + }, + "name": "" +}, { + "location": { + "latitude": 405002031, + "longitude": -748407866 + }, + "name": "" +}, { + "location": { + "latitude": 409532885, + "longitude": -742200683 + }, + "name": "" +}, { + "location": { + "latitude": 416851321, + "longitude": -742674555 + }, + "name": "" +}, { + "location": { + "latitude": 406411633, + "longitude": -741722051 + }, + "name": "3387 Richmond Terrace, Staten Island, NY 10303, USA" +}, { + "location": { + "latitude": 413069058, + "longitude": -744597778 + }, + "name": "261 Van Sickle Road, Goshen, NY 10924, USA" +}, { + "location": { + "latitude": 418465462, + "longitude": -746859398 + }, + "name": "" +}, { + "location": { + "latitude": 411733222, + "longitude": -744228360 + }, + "name": "" +}, { + "location": { + "latitude": 410248224, + "longitude": -747127767 + }, + "name": "3 Hasta Way, Newton, NJ 07860, USA" +}] diff --git a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/route_guide_resources.py b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/route_guide_resources.py new file mode 100644 index 00000000000..c7977698ba7 --- /dev/null +++ b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/route_guide_resources.py @@ -0,0 +1,46 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# https://github.com/grpc/grpc/tree/master/examples/python/route_guide + +# pylint: disable=import-error + +"""Common resources used in the gRPC route guide example.""" + +import json +import os + +import route_guide_pb2 + + +def read_route_guide_database(): + """Reads the route guide database. + + Returns: + The full contents of the route guide database as a sequence of + route_guide_pb2.Features. + """ + feature_list = [] + db_file = os.path.join(os.path.dirname(__file__), "route_guide_db.json") + with open(db_file) as route_guide_db_file: + for item in json.load(route_guide_db_file): + feature = route_guide_pb2.Feature( + name=item["name"], + location=route_guide_pb2.Point( + latitude=item["location"]["latitude"], + longitude=item["location"]["longitude"], + ), + ) + feature_list.append(feature) + return feature_list diff --git a/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/route_guide_server.py b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/route_guide_server.py new file mode 100755 index 00000000000..9cd9db666e3 --- /dev/null +++ b/docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/route_guide_server.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=import-error +# pylint: disable=invalid-name + +"""The Python implementation of the gRPC route guide server. + +Note that you need ``opentelemetry-ext-grpc`` and ``protobuf`` to be installed +to run these examples. To run this script in the context of the example app, +install ``opentelemetry-example-app``:: + + pip install -e ext/opentelemetry-ext-grpc/ + pip install -e docs/examples/opentelemetry-example-app + +Then run the server in one shell:: + + python -m opentelemetry_example_app.grpc.route_guide_server + +and the client in another:: + + python -m opentelemetry_example_app.grpc.route_guide_client + +See also: +https://github.com/grpc/grpc/tree/master/examples/python/route_guide +""" + +import logging +import math +import time +from concurrent import futures + +import grpc + +from opentelemetry import trace +from opentelemetry.ext.grpc import server_interceptor +from opentelemetry.ext.grpc.grpcext import intercept_server +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import ( + ConsoleSpanExporter, + SimpleExportSpanProcessor, +) + +try: + # Relative imports should work in the context of the package, e.g.: + # `python -m opentelemetry_example_app.grpc.route_guide_server`. + from .gen import route_guide_pb2, route_guide_pb2_grpc + from . import route_guide_resources +except ImportError: + # This will fail when running the file as a script, e.g.: + # `./route_guide_server.py` + # fall back to importing from the same directory in this case. + from gen import route_guide_pb2, route_guide_pb2_grpc + import route_guide_resources + +trace.set_tracer_provider(TracerProvider()) +trace.get_tracer_provider().add_span_processor( + SimpleExportSpanProcessor(ConsoleSpanExporter()) +) +tracer = trace.get_tracer(__name__) + + +def get_feature(feature_db, point): + """Returns Feature at given location or None.""" + for feature in feature_db: + if feature.location == point: + return feature + return None + + +def get_distance(start, end): + """Distance between two points.""" + coord_factor = 10000000.0 + lat_1 = start.latitude / coord_factor + lat_2 = end.latitude / coord_factor + lon_1 = start.longitude / coord_factor + lon_2 = end.longitude / coord_factor + lat_rad_1 = math.radians(lat_1) + lat_rad_2 = math.radians(lat_2) + delta_lat_rad = math.radians(lat_2 - lat_1) + delta_lon_rad = math.radians(lon_2 - lon_1) + + # Formula is based on http://mathforum.org/library/drmath/view/51879.html + a = pow(math.sin(delta_lat_rad / 2), 2) + ( + math.cos(lat_rad_1) + * math.cos(lat_rad_2) + * pow(math.sin(delta_lon_rad / 2), 2) + ) + c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) + R = 6371000 + # metres + return R * c + + +class RouteGuideServicer(route_guide_pb2_grpc.RouteGuideServicer): + """Provides methods that implement functionality of route guide server.""" + + def __init__(self): + self.db = route_guide_resources.read_route_guide_database() + + def GetFeature(self, request, context): + feature = get_feature(self.db, request) + if feature is None: + return route_guide_pb2.Feature(name="", location=request) + return feature + + def ListFeatures(self, request, context): + left = min(request.lo.longitude, request.hi.longitude) + right = max(request.lo.longitude, request.hi.longitude) + top = max(request.lo.latitude, request.hi.latitude) + bottom = min(request.lo.latitude, request.hi.latitude) + for feature in self.db: + if ( + feature.location.longitude >= left + and feature.location.longitude <= right + and feature.location.latitude >= bottom + and feature.location.latitude <= top + ): + yield feature + + def RecordRoute(self, request_iterator, context): + point_count = 0 + feature_count = 0 + distance = 0.0 + prev_point = None + + start_time = time.time() + for point in request_iterator: + point_count += 1 + if get_feature(self.db, point): + feature_count += 1 + if prev_point: + distance += get_distance(prev_point, point) + prev_point = point + + elapsed_time = time.time() - start_time + return route_guide_pb2.RouteSummary( + point_count=point_count, + feature_count=feature_count, + distance=int(distance), + elapsed_time=int(elapsed_time), + ) + + def RouteChat(self, request_iterator, context): + prev_notes = [] + for new_note in request_iterator: + for prev_note in prev_notes: + if prev_note.location == new_note.location: + yield prev_note + prev_notes.append(new_note) + + +def serve(): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + server = intercept_server(server, server_interceptor(tracer)) + + route_guide_pb2_grpc.add_RouteGuideServicer_to_server( + RouteGuideServicer(), server + ) + server.add_insecure_port("[::]:50051") + server.start() + server.wait_for_termination() + + +if __name__ == "__main__": + logging.basicConfig() + serve() diff --git a/docs/ext/grpc/grpc.client_interceptor.rst b/docs/ext/grpc/grpc.client_interceptor.rst new file mode 100644 index 00000000000..46de43810ac --- /dev/null +++ b/docs/ext/grpc/grpc.client_interceptor.rst @@ -0,0 +1,7 @@ +grpc.client\_interceptor module +=============================== + +.. automodule:: opentelemetry.ext.grpc.client_interceptor + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/ext/grpc/grpc.rst b/docs/ext/grpc/grpc.rst new file mode 100644 index 00000000000..7e7d5f54264 --- /dev/null +++ b/docs/ext/grpc/grpc.rst @@ -0,0 +1,17 @@ +.. include:: ../../../ext/opentelemetry-ext-grpc/README.rst + +Submodules +---------- + +.. toctree:: + + grpc.client_interceptor + grpc.server_interceptor + +Module contents +--------------- + +.. automodule:: opentelemetry.ext.grpc + :members: + :undoc-members: + :show-inheritance: diff --git a/docs/ext/grpc/grpc.server_interceptor.rst b/docs/ext/grpc/grpc.server_interceptor.rst new file mode 100644 index 00000000000..b51ae1a7cb8 --- /dev/null +++ b/docs/ext/grpc/grpc.server_interceptor.rst @@ -0,0 +1,7 @@ +grpc.server\_interceptor module +=============================== + +.. automodule:: opentelemetry.ext.grpc.server_interceptor + :members: + :undoc-members: + :show-inheritance: diff --git a/ext/opentelemetry-ext-grpc/CHANGELOG.md b/ext/opentelemetry-ext-grpc/CHANGELOG.md new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ext/opentelemetry-ext-grpc/README.rst b/ext/opentelemetry-ext-grpc/README.rst new file mode 100644 index 00000000000..335c03614b9 --- /dev/null +++ b/ext/opentelemetry-ext-grpc/README.rst @@ -0,0 +1,18 @@ +OpenTelemetry gRPC Integration +============================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-ext-grpc.svg + :target: https://pypi.org/project/opentelemetry-ext-grpc/ + +Client and server interceptors for `gRPC Python`_. + +.. _gRPC Python: https://grpc.github.io/grpc/python/grpc.html + +Installation +------------ + +:: + + pip install opentelemetry-ext-grpc diff --git a/ext/opentelemetry-ext-grpc/setup.cfg b/ext/opentelemetry-ext-grpc/setup.cfg new file mode 100644 index 00000000000..df7074078b8 --- /dev/null +++ b/ext/opentelemetry-ext-grpc/setup.cfg @@ -0,0 +1,46 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[metadata] +name = opentelemetry-ext-grpc +description = OpenTelemetry gRPC Integration +long_description = file: README.rst +long_description_content_type = text/x-rst +author = OpenTelemetry Authors +author_email = cncf-opentelemetry-contributors@lists.cncf.io +url = https://github.com/open-telemetry/opentelemetry-python/ext/opentelemetry-ext-grpc +platforms = any +license = Apache-2.0 +classifiers = + Development Status :: 3 - Alpha + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.4 + Programming Language :: Python :: 3.5 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + +[options] +python_requires = >=3.4 +package_dir= + =src +packages=find_namespace: +install_requires = + opentelemetry-api + grpcio~=1.27 + +[options.packages.find] +where = src diff --git a/ext/opentelemetry-ext-grpc/setup.py b/ext/opentelemetry-ext-grpc/setup.py new file mode 100644 index 00000000000..9a0a4b5d1e3 --- /dev/null +++ b/ext/opentelemetry-ext-grpc/setup.py @@ -0,0 +1,27 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import setuptools + +BASE_DIR = os.path.dirname(__file__) +VERSION_FILENAME = os.path.join( + BASE_DIR, "src", "opentelemetry", "ext", "grpc", "version.py" +) +PACKAGE_INFO = {} +with open(VERSION_FILENAME) as f: + exec(f.read(), PACKAGE_INFO) + +setuptools.setup(version=PACKAGE_INFO["__version__"]) diff --git a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py new file mode 100644 index 00000000000..8807abcb1f2 --- /dev/null +++ b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/__init__.py @@ -0,0 +1,46 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=import-outside-toplevel +# pylint:disable=import-self +# pylint:disable=no-name-in-module +# pylint:disable=relative-beyond-top-level + + +def client_interceptor(tracer): + """Create a gRPC client channel interceptor. + + Args: + tracer: The tracer to use to create client-side spans. + + Returns: + An invocation-side interceptor object. + """ + from . import _client + + return _client.OpenTelemetryClientInterceptor(tracer) + + +def server_interceptor(tracer): + """Create a gRPC server interceptor. + + Args: + tracer: The tracer to use to create server-side spans. + + Returns: + A service-side interceptor object. + """ + from . import _server + + return _server.OpenTelemetryServerInterceptor(tracer) diff --git a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py new file mode 100644 index 00000000000..ebf455910c7 --- /dev/null +++ b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py @@ -0,0 +1,176 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=relative-beyond-top-level +# pylint:disable=arguments-differ +# pylint:disable=no-member +# pylint:disable=signature-differs + +"""Implementation of the invocation-side open-telemetry interceptor.""" + +from collections import OrderedDict +from typing import MutableMapping + +import grpc + +from opentelemetry import propagators, trace + +from . import grpcext +from ._utilities import RpcInfo + + +class _GuardedSpan: + def __init__(self, span): + self.span = span + self._engaged = True + + def __enter__(self): + self.span.__enter__() + return self + + def __exit__(self, *args, **kwargs): + if self._engaged: + return self.span.__exit__(*args, **kwargs) + return False + + def release(self): + self._engaged = False + return self.span + + +def _inject_span_context(metadata: MutableMapping[str, str]) -> None: + # pylint:disable=unused-argument + def append_metadata( + carrier: MutableMapping[str, str], key: str, value: str + ): + metadata[key] = value + + # Inject current active span from the context + propagators.inject(append_metadata, metadata) + + +def _make_future_done_callback(span, rpc_info): + def callback(response_future): + with span: + code = response_future.code() + if code != grpc.StatusCode.OK: + rpc_info.error = code + return + response = response_future.result() + rpc_info.response = response + + return callback + + +class OpenTelemetryClientInterceptor( + grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor +): + def __init__(self, tracer): + self._tracer = tracer + + def _start_span(self, method): + return self._tracer.start_as_current_span( + name=method, kind=trace.SpanKind.CLIENT + ) + + # pylint:disable=no-self-use + def _trace_result(self, guarded_span, rpc_info, result): + # If the RPC is called asynchronously, release the guard and add a + # callback so that the span can be finished once the future is done. + if isinstance(result, grpc.Future): + result.add_done_callback( + _make_future_done_callback(guarded_span.release(), rpc_info) + ) + return result + response = result + # Handle the case when the RPC is initiated via the with_call + # method and the result is a tuple with the first element as the + # response. + # http://www.grpc.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.with_call + if isinstance(result, tuple): + response = result[0] + rpc_info.response = response + return result + + def _start_guarded_span(self, *args, **kwargs): + return _GuardedSpan(self._start_span(*args, **kwargs)) + + def intercept_unary(self, request, metadata, client_info, invoker): + if not metadata: + mutable_metadata = OrderedDict() + else: + mutable_metadata = OrderedDict(metadata) + + with self._start_guarded_span(client_info.full_method) as guarded_span: + _inject_span_context(mutable_metadata) + metadata = tuple(mutable_metadata.items()) + + rpc_info = RpcInfo( + full_method=client_info.full_method, + metadata=metadata, + timeout=client_info.timeout, + request=request, + ) + result = invoker(request, metadata) + return self._trace_result(guarded_span, rpc_info, result) + + # For RPCs that stream responses, the result can be a generator. To record + # the span across the generated responses and detect any errors, we wrap + # the result in a new generator that yields the response values. + def _intercept_server_stream( + self, request_or_iterator, metadata, client_info, invoker + ): + if not metadata: + mutable_metadata = OrderedDict() + else: + mutable_metadata = OrderedDict(metadata) + + with self._start_span(client_info.full_method): + _inject_span_context(mutable_metadata) + metadata = tuple(mutable_metadata.items()) + rpc_info = RpcInfo( + full_method=client_info.full_method, + metadata=metadata, + timeout=client_info.timeout, + ) + if client_info.is_client_stream: + rpc_info.request = request_or_iterator + result = invoker(request_or_iterator, metadata) + for response in result: + yield response + + def intercept_stream( + self, request_or_iterator, metadata, client_info, invoker + ): + if client_info.is_server_stream: + return self._intercept_server_stream( + request_or_iterator, metadata, client_info, invoker + ) + + if not metadata: + mutable_metadata = OrderedDict() + else: + mutable_metadata = OrderedDict(metadata) + + with self._start_guarded_span(client_info.full_method) as guarded_span: + _inject_span_context(mutable_metadata) + metadata = tuple(mutable_metadata.items()) + rpc_info = RpcInfo( + full_method=client_info.full_method, + metadata=metadata, + timeout=client_info.timeout, + request=request_or_iterator, + ) + result = invoker(request_or_iterator, metadata) + return self._trace_result(guarded_span, rpc_info, result) diff --git a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_server.py b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_server.py new file mode 100644 index 00000000000..cb0e997d367 --- /dev/null +++ b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_server.py @@ -0,0 +1,209 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=relative-beyond-top-level +# pylint:disable=arguments-differ +# pylint:disable=no-member +# pylint:disable=signature-differs + +"""Implementation of the service-side open-telemetry interceptor. + +This library borrows heavily from the OpenTracing gRPC integration: +https://github.com/opentracing-contrib/python-grpc +""" + +from contextlib import contextmanager +from typing import List + +import grpc + +from opentelemetry import propagators, trace +from opentelemetry.context import attach, detach + +from . import grpcext +from ._utilities import RpcInfo + + +# pylint:disable=abstract-method +class _OpenTelemetryServicerContext(grpc.ServicerContext): + def __init__(self, servicer_context, active_span): + self._servicer_context = servicer_context + self._active_span = active_span + self.code = grpc.StatusCode.OK + self.details = None + super(_OpenTelemetryServicerContext, self).__init__() + + def is_active(self, *args, **kwargs): + return self._servicer_context.is_active(*args, **kwargs) + + def time_remaining(self, *args, **kwargs): + return self._servicer_context.time_remaining(*args, **kwargs) + + def cancel(self, *args, **kwargs): + return self._servicer_context.cancel(*args, **kwargs) + + def add_callback(self, *args, **kwargs): + return self._servicer_context.add_callback(*args, **kwargs) + + def invocation_metadata(self, *args, **kwargs): + return self._servicer_context.invocation_metadata(*args, **kwargs) + + def peer(self, *args, **kwargs): + return self._servicer_context.peer(*args, **kwargs) + + def peer_identities(self, *args, **kwargs): + return self._servicer_context.peer_identities(*args, **kwargs) + + def peer_identity_key(self, *args, **kwargs): + return self._servicer_context.peer_identity_key(*args, **kwargs) + + def auth_context(self, *args, **kwargs): + return self._servicer_context.auth_context(*args, **kwargs) + + def send_initial_metadata(self, *args, **kwargs): + return self._servicer_context.send_initial_metadata(*args, **kwargs) + + def set_trailing_metadata(self, *args, **kwargs): + return self._servicer_context.set_trailing_metadata(*args, **kwargs) + + def abort(self, *args, **kwargs): + if not hasattr(self._servicer_context, "abort"): + raise RuntimeError( + "abort() is not supported with the installed version of grpcio" + ) + return self._servicer_context.abort(*args, **kwargs) + + def abort_with_status(self, *args, **kwargs): + if not hasattr(self._servicer_context, "abort_with_status"): + raise RuntimeError( + "abort_with_status() is not supported with the installed " + "version of grpcio" + ) + return self._servicer_context.abort_with_status(*args, **kwargs) + + def set_code(self, code): + self.code = code + return self._servicer_context.set_code(code) + + def set_details(self, details): + self.details = details + return self._servicer_context.set_details(details) + + +# On the service-side, errors can be signaled either by exceptions or by +# calling `set_code` on the `servicer_context`. This function checks for the +# latter and updates the span accordingly. +# pylint:disable=unused-argument +def _check_error_code(span, servicer_context, rpc_info): + if servicer_context.code != grpc.StatusCode.OK: + rpc_info.error = servicer_context.code + + +class OpenTelemetryServerInterceptor( + grpcext.UnaryServerInterceptor, grpcext.StreamServerInterceptor +): + def __init__(self, tracer): + self._tracer = tracer + + @contextmanager + # pylint:disable=no-self-use + def _set_remote_context(self, servicer_context): + metadata = servicer_context.invocation_metadata() + if metadata: + md_dict = {md.key: md.value for md in metadata} + + def get_from_grpc_metadata(metadata, key) -> List[str]: + return [md_dict[key]] if key in md_dict else [] + + # Update the context with the traceparent from the RPC metadata. + ctx = propagators.extract(get_from_grpc_metadata, metadata) + token = attach(ctx) + try: + yield + finally: + detach(token) + else: + yield + + def _start_span(self, method): + span = self._tracer.start_as_current_span( + name=method, kind=trace.SpanKind.SERVER + ) + return span + + def intercept_unary(self, request, servicer_context, server_info, handler): + + with self._set_remote_context(servicer_context): + with self._start_span(server_info.full_method) as span: + rpc_info = RpcInfo( + full_method=server_info.full_method, + metadata=servicer_context.invocation_metadata(), + timeout=servicer_context.time_remaining(), + request=request, + ) + servicer_context = _OpenTelemetryServicerContext( + servicer_context, span + ) + response = handler(request, servicer_context) + + _check_error_code(span, servicer_context, rpc_info) + + rpc_info.response = response + + return response + + # For RPCs that stream responses, the result can be a generator. To record + # the span across the generated responses and detect any errors, we wrap + # the result in a new generator that yields the response values. + def _intercept_server_stream( + self, request_or_iterator, servicer_context, server_info, handler + ): + with self._set_remote_context(servicer_context): + with self._start_span(server_info.full_method) as span: + rpc_info = RpcInfo( + full_method=server_info.full_method, + metadata=servicer_context.invocation_metadata(), + timeout=servicer_context.time_remaining(), + ) + if not server_info.is_client_stream: + rpc_info.request = request_or_iterator + servicer_context = _OpenTelemetryServicerContext( + servicer_context, span + ) + result = handler(request_or_iterator, servicer_context) + for response in result: + yield response + _check_error_code(span, servicer_context, rpc_info) + + def intercept_stream( + self, request_or_iterator, servicer_context, server_info, handler + ): + if server_info.is_server_stream: + return self._intercept_server_stream( + request_or_iterator, servicer_context, server_info, handler + ) + with self._set_remote_context(servicer_context): + with self._start_span(server_info.full_method) as span: + rpc_info = RpcInfo( + full_method=server_info.full_method, + metadata=servicer_context.invocation_metadata(), + timeout=servicer_context.time_remaining(), + ) + servicer_context = _OpenTelemetryServicerContext( + servicer_context, span + ) + response = handler(request_or_iterator, servicer_context) + _check_error_code(span, servicer_context, rpc_info) + rpc_info.response = response + return response diff --git a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_utilities.py b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_utilities.py new file mode 100644 index 00000000000..b6ff7d311a4 --- /dev/null +++ b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_utilities.py @@ -0,0 +1,33 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Internal utilities.""" + + +class RpcInfo: + def __init__( + self, + full_method=None, + metadata=None, + timeout=None, + request=None, + response=None, + error=None, + ): + self.full_method = full_method + self.metadata = metadata + self.timeout = timeout + self.request = request + self.response = response + self.error = error diff --git a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/grpcext/__init__.py b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/grpcext/__init__.py new file mode 100644 index 00000000000..fe83467a70a --- /dev/null +++ b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/grpcext/__init__.py @@ -0,0 +1,216 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=import-outside-toplevel +# pylint:disable=import-self +# pylint:disable=no-name-in-module + +import abc + + +class UnaryClientInfo(abc.ABC): + """Consists of various information about a unary RPC on the + invocation-side. + + Attributes: + full_method: A string of the full RPC method, i.e., + /package.service/method. + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. + """ + + +class StreamClientInfo(abc.ABC): + """Consists of various information about a stream RPC on the + invocation-side. + + Attributes: + full_method: A string of the full RPC method, i.e., + /package.service/method. + is_client_stream: Indicates whether the RPC is client-streaming. + is_server_stream: Indicates whether the RPC is server-streaming. + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. + """ + + +class UnaryClientInterceptor(abc.ABC): + """Affords intercepting unary-unary RPCs on the invocation-side.""" + + @abc.abstractmethod + def intercept_unary(self, request, metadata, client_info, invoker): + """Intercepts unary-unary RPCs on the invocation-side. + + Args: + request: The request value for the RPC. + metadata: Optional :term:`metadata` to be transmitted to the + service-side of the RPC. + client_info: A UnaryClientInfo containing various information about + the RPC. + invoker: The handler to complete the RPC on the client. It is the + interceptor's responsibility to call it. + + Returns: + The result from calling invoker(request, metadata). + """ + raise NotImplementedError() + + +class StreamClientInterceptor(abc.ABC): + """Affords intercepting stream RPCs on the invocation-side.""" + + @abc.abstractmethod + def intercept_stream( + self, request_or_iterator, metadata, client_info, invoker + ): + """Intercepts stream RPCs on the invocation-side. + + Args: + request_or_iterator: The request value for the RPC if + `client_info.is_client_stream` is `false`; otherwise, an iterator of + request values. + metadata: Optional :term:`metadata` to be transmitted to the service-side + of the RPC. + client_info: A StreamClientInfo containing various information about + the RPC. + invoker: The handler to complete the RPC on the client. It is the + interceptor's responsibility to call it. + + Returns: + The result from calling invoker(metadata). + """ + raise NotImplementedError() + + +def intercept_channel(channel, *interceptors): + """Creates an intercepted channel. + + Args: + channel: A Channel. + interceptors: Zero or more UnaryClientInterceptors or + StreamClientInterceptors + + Returns: + A Channel. + + Raises: + TypeError: If an interceptor derives from neither UnaryClientInterceptor + nor StreamClientInterceptor. + """ + from . import _interceptor + + return _interceptor.intercept_channel(channel, *interceptors) + + +class UnaryServerInfo(abc.ABC): + """Consists of various information about a unary RPC on the service-side. + + Attributes: + full_method: A string of the full RPC method, i.e., + /package.service/method. + """ + + +class StreamServerInfo(abc.ABC): + """Consists of various information about a stream RPC on the service-side. + + Attributes: + full_method: A string of the full RPC method, i.e., + /package.service/method. + is_client_stream: Indicates whether the RPC is client-streaming. + is_server_stream: Indicates whether the RPC is server-streaming. + """ + + +class UnaryServerInterceptor(abc.ABC): + """Affords intercepting unary-unary RPCs on the service-side.""" + + @abc.abstractmethod + def intercept_unary(self, request, servicer_context, server_info, handler): + """Intercepts unary-unary RPCs on the service-side. + + Args: + request: The request value for the RPC. + servicer_context: A ServicerContext. + server_info: A UnaryServerInfo containing various information about + the RPC. + handler: The handler to complete the RPC on the server. It is the + interceptor's responsibility to call it. + + Returns: + The result from calling handler(request, servicer_context). + """ + raise NotImplementedError() + + +class StreamServerInterceptor(abc.ABC): + """Affords intercepting stream RPCs on the service-side.""" + + @abc.abstractmethod + def intercept_stream( + self, request_or_iterator, servicer_context, server_info, handler + ): + """Intercepts stream RPCs on the service-side. + + Args: + request_or_iterator: The request value for the RPC if + `server_info.is_client_stream` is `False`; otherwise, an iterator of + request values. + servicer_context: A ServicerContext. + server_info: A StreamServerInfo containing various information about + the RPC. + handler: The handler to complete the RPC on the server. It is the + interceptor's responsibility to call it. + + Returns: + The result from calling handler(servicer_context). + """ + raise NotImplementedError() + + +def intercept_server(server, *interceptors): + """Creates an intercepted server. + + Args: + server: A Server. + interceptors: Zero or more UnaryServerInterceptors or + StreamServerInterceptors + + Returns: + A Server. + + Raises: + TypeError: If an interceptor derives from neither UnaryServerInterceptor + nor StreamServerInterceptor. + """ + from . import _interceptor + + return _interceptor.intercept_server(server, *interceptors) + + +__all__ = ( + "UnaryClientInterceptor", + "StreamClientInfo", + "StreamClientInterceptor", + "UnaryServerInfo", + "StreamServerInfo", + "UnaryServerInterceptor", + "StreamServerInterceptor", + "intercept_channel", + "intercept_server", +) diff --git a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/grpcext/_interceptor.py b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/grpcext/_interceptor.py new file mode 100644 index 00000000000..0cae2cf9fde --- /dev/null +++ b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/grpcext/_interceptor.py @@ -0,0 +1,423 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=relative-beyond-top-level +# pylint:disable=arguments-differ +# pylint:disable=no-member +# pylint:disable=signature-differs + +"""Implementation of gRPC Python interceptors.""" + + +import collections + +import grpc + +from .. import grpcext + + +class _UnaryClientInfo( + collections.namedtuple("_UnaryClientInfo", ("full_method", "timeout")) +): + pass + + +class _StreamClientInfo( + collections.namedtuple( + "_StreamClientInfo", + ("full_method", "is_client_stream", "is_server_stream", "timeout"), + ) +): + pass + + +class _InterceptorUnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): + def __init__(self, method, base_callable, interceptor): + self._method = method + self._base_callable = base_callable + self._interceptor = interceptor + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + def invoker(request, metadata): + return self._base_callable(request, timeout, metadata, credentials) + + client_info = _UnaryClientInfo(self._method, timeout) + return self._interceptor.intercept_unary( + request, metadata, client_info, invoker + ) + + def with_call( + self, request, timeout=None, metadata=None, credentials=None + ): + def invoker(request, metadata): + return self._base_callable.with_call( + request, timeout, metadata, credentials + ) + + client_info = _UnaryClientInfo(self._method, timeout) + return self._interceptor.intercept_unary( + request, metadata, client_info, invoker + ) + + def future(self, request, timeout=None, metadata=None, credentials=None): + def invoker(request, metadata): + return self._base_callable.future( + request, timeout, metadata, credentials + ) + + client_info = _UnaryClientInfo(self._method, timeout) + return self._interceptor.intercept_unary( + request, metadata, client_info, invoker + ) + + +class _InterceptorUnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): + def __init__(self, method, base_callable, interceptor): + self._method = method + self._base_callable = base_callable + self._interceptor = interceptor + + def __call__(self, request, timeout=None, metadata=None, credentials=None): + def invoker(request, metadata): + return self._base_callable(request, timeout, metadata, credentials) + + client_info = _StreamClientInfo(self._method, False, True, timeout) + return self._interceptor.intercept_stream( + request, metadata, client_info, invoker + ) + + +class _InterceptorStreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): + def __init__(self, method, base_callable, interceptor): + self._method = method + self._base_callable = base_callable + self._interceptor = interceptor + + def __call__( + self, request_iterator, timeout=None, metadata=None, credentials=None + ): + def invoker(request_iterator, metadata): + return self._base_callable( + request_iterator, timeout, metadata, credentials + ) + + client_info = _StreamClientInfo(self._method, True, False, timeout) + return self._interceptor.intercept_stream( + request_iterator, metadata, client_info, invoker + ) + + def with_call( + self, request_iterator, timeout=None, metadata=None, credentials=None + ): + def invoker(request_iterator, metadata): + return self._base_callable.with_call( + request_iterator, timeout, metadata, credentials + ) + + client_info = _StreamClientInfo(self._method, True, False, timeout) + return self._interceptor.intercept_stream( + request_iterator, metadata, client_info, invoker + ) + + def future( + self, request_iterator, timeout=None, metadata=None, credentials=None + ): + def invoker(request_iterator, metadata): + return self._base_callable.future( + request_iterator, timeout, metadata, credentials + ) + + client_info = _StreamClientInfo(self._method, True, False, timeout) + return self._interceptor.intercept_stream( + request_iterator, metadata, client_info, invoker + ) + + +class _InterceptorStreamStreamMultiCallable(grpc.StreamStreamMultiCallable): + def __init__(self, method, base_callable, interceptor): + self._method = method + self._base_callable = base_callable + self._interceptor = interceptor + + def __call__( + self, request_iterator, timeout=None, metadata=None, credentials=None + ): + def invoker(request_iterator, metadata): + return self._base_callable( + request_iterator, timeout, metadata, credentials + ) + + client_info = _StreamClientInfo(self._method, True, True, timeout) + return self._interceptor.intercept_stream( + request_iterator, metadata, client_info, invoker + ) + + +class _InterceptorChannel(grpc.Channel): + def __init__(self, channel, interceptor): + self._channel = channel + self._interceptor = interceptor + + def subscribe(self, *args, **kwargs): + self._channel.subscribe(*args, **kwargs) + + def unsubscribe(self, *args, **kwargs): + self._channel.unsubscribe(*args, **kwargs) + + def unary_unary( + self, method, request_serializer=None, response_deserializer=None + ): + base_callable = self._channel.unary_unary( + method, request_serializer, response_deserializer + ) + if isinstance(self._interceptor, grpcext.UnaryClientInterceptor): + return _InterceptorUnaryUnaryMultiCallable( + method, base_callable, self._interceptor + ) + return base_callable + + def unary_stream( + self, method, request_serializer=None, response_deserializer=None + ): + base_callable = self._channel.unary_stream( + method, request_serializer, response_deserializer + ) + if isinstance(self._interceptor, grpcext.StreamClientInterceptor): + return _InterceptorUnaryStreamMultiCallable( + method, base_callable, self._interceptor + ) + return base_callable + + def stream_unary( + self, method, request_serializer=None, response_deserializer=None + ): + base_callable = self._channel.stream_unary( + method, request_serializer, response_deserializer + ) + if isinstance(self._interceptor, grpcext.StreamClientInterceptor): + return _InterceptorStreamUnaryMultiCallable( + method, base_callable, self._interceptor + ) + return base_callable + + def stream_stream( + self, method, request_serializer=None, response_deserializer=None + ): + base_callable = self._channel.stream_stream( + method, request_serializer, response_deserializer + ) + if isinstance(self._interceptor, grpcext.StreamClientInterceptor): + return _InterceptorStreamStreamMultiCallable( + method, base_callable, self._interceptor + ) + return base_callable + + def close(self): + if not hasattr(self._channel, "close"): + raise RuntimeError( + "close() is not supported with the installed version of grpcio" + ) + self._channel.close() + + +def intercept_channel(channel, *interceptors): + result = channel + for interceptor in interceptors: + if not isinstance( + interceptor, grpcext.UnaryClientInterceptor + ) and not isinstance(interceptor, grpcext.StreamClientInterceptor): + raise TypeError( + "interceptor must be either a " + "grpcext.UnaryClientInterceptor or a " + "grpcext.StreamClientInterceptor" + ) + result = _InterceptorChannel(result, interceptor) + return result + + +class _UnaryServerInfo( + collections.namedtuple("_UnaryServerInfo", ("full_method",)) +): + pass + + +class _StreamServerInfo( + collections.namedtuple( + "_StreamServerInfo", + ("full_method", "is_client_stream", "is_server_stream"), + ) +): + pass + + +class _InterceptorRpcMethodHandler(grpc.RpcMethodHandler): + def __init__(self, rpc_method_handler, method, interceptor): + self._rpc_method_handler = rpc_method_handler + self._method = method + self._interceptor = interceptor + + @property + def request_streaming(self): + return self._rpc_method_handler.request_streaming + + @property + def response_streaming(self): + return self._rpc_method_handler.response_streaming + + @property + def request_deserializer(self): + return self._rpc_method_handler.request_deserializer + + @property + def response_serializer(self): + return self._rpc_method_handler.response_serializer + + @property + def unary_unary(self): + if not isinstance(self._interceptor, grpcext.UnaryServerInterceptor): + return self._rpc_method_handler.unary_unary + + def adaptation(request, servicer_context): + def handler(request, servicer_context): + return self._rpc_method_handler.unary_unary( + request, servicer_context + ) + + return self._interceptor.intercept_unary( + request, + servicer_context, + _UnaryServerInfo(self._method), + handler, + ) + + return adaptation + + @property + def unary_stream(self): + if not isinstance(self._interceptor, grpcext.StreamServerInterceptor): + return self._rpc_method_handler.unary_stream + + def adaptation(request, servicer_context): + def handler(request, servicer_context): + return self._rpc_method_handler.unary_stream( + request, servicer_context + ) + + return self._interceptor.intercept_stream( + request, + servicer_context, + _StreamServerInfo(self._method, False, True), + handler, + ) + + return adaptation + + @property + def stream_unary(self): + if not isinstance(self._interceptor, grpcext.StreamServerInterceptor): + return self._rpc_method_handler.stream_unary + + def adaptation(request_iterator, servicer_context): + def handler(request_iterator, servicer_context): + return self._rpc_method_handler.stream_unary( + request_iterator, servicer_context + ) + + return self._interceptor.intercept_stream( + request_iterator, + servicer_context, + _StreamServerInfo(self._method, True, False), + handler, + ) + + return adaptation + + @property + def stream_stream(self): + if not isinstance(self._interceptor, grpcext.StreamServerInterceptor): + return self._rpc_method_handler.stream_stream + + def adaptation(request_iterator, servicer_context): + def handler(request_iterator, servicer_context): + return self._rpc_method_handler.stream_stream( + request_iterator, servicer_context + ) + + return self._interceptor.intercept_stream( + request_iterator, + servicer_context, + _StreamServerInfo(self._method, True, True), + handler, + ) + + return adaptation + + +class _InterceptorGenericRpcHandler(grpc.GenericRpcHandler): + def __init__(self, generic_rpc_handler, interceptor): + self.generic_rpc_handler = generic_rpc_handler + self._interceptor = interceptor + + def service(self, handler_call_details): + result = self.generic_rpc_handler.service(handler_call_details) + if result: + result = _InterceptorRpcMethodHandler( + result, handler_call_details.method, self._interceptor + ) + return result + + +class _InterceptorServer(grpc.Server): + def __init__(self, server, interceptor): + self._server = server + self._interceptor = interceptor + + def add_generic_rpc_handlers(self, generic_rpc_handlers): + generic_rpc_handlers = [ + _InterceptorGenericRpcHandler( + generic_rpc_handler, self._interceptor + ) + for generic_rpc_handler in generic_rpc_handlers + ] + return self._server.add_generic_rpc_handlers(generic_rpc_handlers) + + def add_insecure_port(self, *args, **kwargs): + return self._server.add_insecure_port(*args, **kwargs) + + def add_secure_port(self, *args, **kwargs): + return self._server.add_secure_port(*args, **kwargs) + + def start(self, *args, **kwargs): + return self._server.start(*args, **kwargs) + + def stop(self, *args, **kwargs): + return self._server.stop(*args, **kwargs) + + def wait_for_termination(self, *args, **kwargs): + return self._server.wait_for_termination(*args, **kwargs) + + +def intercept_server(server, *interceptors): + result = server + for interceptor in interceptors: + if not isinstance( + interceptor, grpcext.UnaryServerInterceptor + ) and not isinstance(interceptor, grpcext.StreamServerInterceptor): + raise TypeError( + "interceptor must be either a " + "grpcext.UnaryServerInterceptor or a " + "grpcext.StreamServerInterceptor" + ) + result = _InterceptorServer(result, interceptor) + return result diff --git a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/version.py b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/version.py new file mode 100644 index 00000000000..0941210ca3f --- /dev/null +++ b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.6.dev0" diff --git a/ext/opentelemetry-ext-grpc/tests/__init__.py b/ext/opentelemetry-ext-grpc/tests/__init__.py new file mode 100644 index 00000000000..b0a6f428417 --- /dev/null +++ b/ext/opentelemetry-ext-grpc/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/ext/opentelemetry-ext-grpc/tests/test_server_interceptor.py b/ext/opentelemetry-ext-grpc/tests/test_server_interceptor.py new file mode 100644 index 00000000000..8dabd11fdf0 --- /dev/null +++ b/ext/opentelemetry-ext-grpc/tests/test_server_interceptor.py @@ -0,0 +1,243 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint:disable=unused-argument +# pylint:disable=no-self-use + +import threading +import unittest +from concurrent import futures +from contextlib import contextmanager +from unittest import mock + +import grpc + +from opentelemetry import trace +from opentelemetry.ext.grpc import server_interceptor +from opentelemetry.ext.grpc.grpcext import intercept_server +from opentelemetry.sdk import trace as trace_sdk + + +class UnaryUnaryMethodHandler(grpc.RpcMethodHandler): + def __init__(self, handler): + self.request_streaming = False + self.response_streaming = False + self.request_deserializer = None + self.response_serializer = None + self.unary_unary = handler + self.unary_stream = None + self.stream_unary = None + self.stream_stream = None + + +class UnaryUnaryRpcHandler(grpc.GenericRpcHandler): + def __init__(self, handler): + self._unary_unary_handler = handler + + def service(self, handler_call_details): + return UnaryUnaryMethodHandler(self._unary_unary_handler) + + +class TestOpenTelemetryServerInterceptor(unittest.TestCase): + def test_create_span(self): + """Check that the interceptor wraps calls with spans server-side.""" + + @contextmanager + def mock_start_as_current_span(*args, **kwargs): + yield mock.Mock(spec=trace.Span) + + # Intercept gRPC calls... + tracer = mock.Mock(spec=trace.Tracer) + tracer.start_as_current_span.side_effect = mock_start_as_current_span + interceptor = server_interceptor(tracer) + + # No-op RPC handler + def handler(request, context): + return b"" + + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + ) + # FIXME: grpcext interceptor doesn't apply to handlers passed to server + # init, should use intercept_service API instead. + server = intercept_server(server, interceptor) + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + try: + server.start() + channel.unary_unary("")(b"") + finally: + server.stop(None) + + tracer.start_as_current_span.assert_called_once_with( + name="", kind=trace.SpanKind.SERVER + ) + + def test_span_lifetime(self): + """Check that the span is active for the duration of the call.""" + + tracer_provider = trace_sdk.TracerProvider() + tracer = tracer_provider.get_tracer(__name__) + interceptor = server_interceptor(tracer) + + # To capture the current span at the time the handler is called + active_span_in_handler = None + + def handler(request, context): + nonlocal active_span_in_handler + active_span_in_handler = tracer.get_current_span() + return b"" + + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + ) + server = intercept_server(server, interceptor) + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + active_span_before_call = tracer.get_current_span() + try: + server.start() + channel.unary_unary("")(b"") + finally: + server.stop(None) + active_span_after_call = tracer.get_current_span() + + self.assertIsNone(active_span_before_call) + self.assertIsNone(active_span_after_call) + self.assertIsInstance(active_span_in_handler, trace_sdk.Span) + self.assertIsNone(active_span_in_handler.parent) + + def test_sequential_server_spans(self): + """Check that sequential RPCs get separate server spans.""" + + tracer_provider = trace_sdk.TracerProvider() + tracer = tracer_provider.get_tracer(__name__) + + interceptor = server_interceptor(tracer) + + # Capture the currently active span in each thread + active_spans_in_handler = [] + + def handler(request, context): + active_spans_in_handler.append(tracer.get_current_span()) + return b"" + + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=1), + options=(("grpc.so_reuseport", 0),), + ) + server = intercept_server(server, interceptor) + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + try: + server.start() + channel.unary_unary("")(b"") + channel.unary_unary("")(b"") + finally: + server.stop(None) + + self.assertEqual(len(active_spans_in_handler), 2) + # pylint:disable=unbalanced-tuple-unpacking + span1, span2 = active_spans_in_handler + # Spans should belong to separate traces, and each should be a root + # span + self.assertNotEqual(span1.context.span_id, span2.context.span_id) + self.assertNotEqual(span1.context.trace_id, span2.context.trace_id) + self.assertIsNone(span1.parent) + self.assertIsNone(span1.parent) + + def test_concurrent_server_spans(self): + """Check that concurrent RPC calls don't interfere with each other. + + This is the same check as test_sequential_server_spans except that the + RPCs are concurrent. Two handlers are invoked at the same time on two + separate threads. Each one should see a different active span and + context. + """ + + tracer_provider = trace_sdk.TracerProvider() + tracer = tracer_provider.get_tracer(__name__) + + interceptor = server_interceptor(tracer) + + # Capture the currently active span in each thread + active_spans_in_handler = [] + latch = get_latch(2) + + def handler(request, context): + latch() + active_spans_in_handler.append(tracer.get_current_span()) + return b"" + + server = grpc.server( + futures.ThreadPoolExecutor(max_workers=2), + options=(("grpc.so_reuseport", 0),), + ) + server = intercept_server(server, interceptor) + server.add_generic_rpc_handlers((UnaryUnaryRpcHandler(handler),)) + + port = server.add_insecure_port("[::]:0") + channel = grpc.insecure_channel("localhost:{:d}".format(port)) + + try: + server.start() + # Interleave calls so spans are active on each thread at the same + # time + with futures.ThreadPoolExecutor(max_workers=2) as tpe: + f1 = tpe.submit(channel.unary_unary(""), b"") + f2 = tpe.submit(channel.unary_unary(""), b"") + futures.wait((f1, f2)) + finally: + server.stop(None) + + self.assertEqual(len(active_spans_in_handler), 2) + # pylint:disable=unbalanced-tuple-unpacking + span1, span2 = active_spans_in_handler + # Spans should belong to separate traces, and each should be a root + # span + self.assertNotEqual(span1.context.span_id, span2.context.span_id) + self.assertNotEqual(span1.context.trace_id, span2.context.trace_id) + self.assertIsNone(span1.parent) + self.assertIsNone(span1.parent) + + +def get_latch(num): + """Get a countdown latch function for use in n threads.""" + cv = threading.Condition() + count = 0 + + def countdown_latch(): + """Block until n-1 other threads have called.""" + nonlocal count + cv.acquire() + count += 1 + cv.notify() + cv.release() + cv.acquire() + while count < num: + cv.wait() + cv.release() + + return countdown_latch diff --git a/opentelemetry-api/src/opentelemetry/context/__init__.py b/opentelemetry-api/src/opentelemetry/context/__init__.py index 779bdc488c1..adf1bc0869f 100644 --- a/opentelemetry-api/src/opentelemetry/context/__init__.py +++ b/opentelemetry-api/src/opentelemetry/context/__init__.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import threading import typing from functools import wraps from os import environ @@ -24,7 +25,7 @@ logger = logging.getLogger(__name__) _RUNTIME_CONTEXT = None # type: typing.Optional[RuntimeContext] - +_RUNTIME_CONTEXT_LOCK = threading.Lock() _F = typing.TypeVar("_F", bound=typing.Callable[..., typing.Any]) @@ -42,26 +43,30 @@ def wrapper( **kwargs: typing.Dict[typing.Any, typing.Any] ) -> typing.Optional[typing.Any]: global _RUNTIME_CONTEXT # pylint: disable=global-statement - if _RUNTIME_CONTEXT is None: - # FIXME use a better implementation of a configuration manager to avoid having - # to get configuration values straight from environment variables - if version_info < (3, 5): - # contextvars are not supported in 3.4, use thread-local storage - default_context = "threadlocal_context" - else: - default_context = "contextvars_context" - - configured_context = environ.get( - "OPENTELEMETRY_CONTEXT", default_context - ) # type: str - try: - _RUNTIME_CONTEXT = next( - iter_entry_points( - "opentelemetry_context", configured_context + + with _RUNTIME_CONTEXT_LOCK: + if _RUNTIME_CONTEXT is None: + # FIXME use a better implementation of a configuration manager to avoid having + # to get configuration values straight from environment variables + if version_info < (3, 5): + # contextvars are not supported in 3.4, use thread-local storage + default_context = "threadlocal_context" + else: + default_context = "contextvars_context" + + configured_context = environ.get( + "OPENTELEMETRY_CONTEXT", default_context + ) # type: str + try: + _RUNTIME_CONTEXT = next( + iter_entry_points( + "opentelemetry_context", configured_context + ) + ).load()() + except Exception: # pylint: disable=broad-except + logger.error( + "Failed to load context: %s", configured_context ) - ).load()() - except Exception: # pylint: disable=broad-except - logger.error("Failed to load context: %s", configured_context) return func(*args, **kwargs) # type: ignore return wrapper # type:ignore diff --git a/pyproject.toml b/pyproject.toml index 5d19c298827..961304074c9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,8 +2,9 @@ line-length = 79 exclude = ''' ( - /( - ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen # generated files + /( # generated files + docs/examples/opentelemetry-example-app/src/opentelemetry_example_app/grpc/gen| + ext/opentelemetry-ext-jaeger/src/opentelemetry/ext/jaeger/gen )/ ) ''' diff --git a/tox.ini b/tox.ini index 6e64bdbd441..06a54bc83d7 100644 --- a/tox.ini +++ b/tox.ini @@ -76,8 +76,12 @@ envlist = py3{4,5,6,7,8}-test-opentracing-shim pypy3-test-opentracing-shim + ; opentelemetry-opentracing-shim + py3{4,5,6,7,8}-test-opentracing-shim + pypy3-test-opentracing-shim - py3{4,5,6,7,8}-coverage + ; opentelemetry-ext-grpc + py3{4,5,6,7,8}-test-ext-grpc ; Coverage is temporarily disabled for pypy3 due to the pytest bug. ; pypy3-coverage @@ -107,6 +111,7 @@ changedir = test-api: opentelemetry-api/tests test-sdk: opentelemetry-sdk/tests test-auto-instrumentation: opentelemetry-auto-instrumentation/tests + test-ext-grpc: ext/opentelemetry-ext-grpc/tests test-ext-http-requests: ext/opentelemetry-ext-http-requests/tests test-ext-jaeger: ext/opentelemetry-ext-jaeger/tests test-ext-dbapi: ext/opentelemetry-ext-dbapi/tests @@ -144,6 +149,8 @@ commands_pre = example-http: pip install -r {toxinidir}/docs/examples/http/requirements.txt ext: pip install {toxinidir}/opentelemetry-api + grpc: pip install {toxinidir}/ext/opentelemetry-ext-grpc + grpc: pip install {toxinidir}/opentelemetry-sdk wsgi,flask: pip install {toxinidir}/ext/opentelemetry-ext-testutil wsgi,flask: pip install {toxinidir}/ext/opentelemetry-ext-wsgi wsgi,flask: pip install {toxinidir}/opentelemetry-sdk