From 04e5623fb702589aa7ed494c3116eab84378a036 Mon Sep 17 00:00:00 2001 From: Mukund Date: Wed, 28 Aug 2024 14:05:32 +0000 Subject: [PATCH 01/19] Add opentelemetry as a dependency in setup.py --- setup.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/setup.py b/setup.py index dbb66cf7c..cc852f7d8 100644 --- a/setup.py +++ b/setup.py @@ -45,6 +45,8 @@ "protobuf>=3.20.2,<6.0.0dev,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5", "grpc-google-iam-v1 >= 0.12.4, < 1.0.0dev", "grpcio-status >= 1.33.2", + "opentelemetry-api", + "opentelemetry-sdk", ] extras = {"libcst": "libcst >= 0.3.10"} url = "https://github.com/googleapis/python-pubsub" From ac867995c05bbc0fd314ed82ff36e07bcb4f3489 Mon Sep 17 00:00:00 2001 From: Mukund Date: Wed, 28 Aug 2024 14:17:08 +0000 Subject: [PATCH 02/19] Add enable_open_telemetry_tracing Publisher Option --- google/cloud/pubsub_v1/types.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index 3d071a189..a9bb1d582 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -174,6 +174,8 @@ class PublisherOptions(NamedTuple): "compatible with :class:`~.pubsub_v1.types.TimeoutType`." ) + enable_open_telemetry_tracing: bool = False # disabled by default + """Open Telemetry tracing is enabled if this is set to True.""" # Define the type class and default values for flow control settings. # From b73663574709048f69a47d7af1a774d27260e492 Mon Sep 17 00:00:00 2001 From: Mukund Date: Wed, 28 Aug 2024 15:29:56 +0000 Subject: [PATCH 03/19] Set/ Override Open Telemetry enabled PublisherOption in the Publish client --- google/cloud/pubsub_v1/publisher/client.py | 17 ++++++++++ google/cloud/pubsub_v1/types.py | 1 + .../publisher/test_publisher_client.py | 32 +++++++++++++++++++ 3 files changed, 50 insertions(+) diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 54b353276..2f314850f 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -22,6 +22,7 @@ import typing from typing import Any, Dict, Optional, Sequence, Tuple, Type, Union import warnings +import sys from google.api_core import gapic_v1 from google.auth.credentials import AnonymousCredentials # type: ignore @@ -153,6 +154,22 @@ def __init__( # The object controlling the message publishing flow self._flow_controller = FlowController(self.publisher_options.flow_control) + self._open_telemetry_enabled = ( + self.publisher_options.enable_open_telemetry_tracing + ) + # OpenTelemetry features used by the library are not supported in Python versions <= 3.7. + # Refer https://github.com/open-telemetry/opentelemetry-python/issues/3993#issuecomment-2211976389 + if ( + self.publisher_options.enable_open_telemetry_tracing + and sys.version_info.major == 3 + and sys.version_info.minor < 8 + ): + warnings.warn( + message="Open Telemetry for Python version 3.7 or lower is not supported. Disabling Open Telemetry tracing.", + category=RuntimeWarning, + ) + self._open_telemetry_enabled = False + @classmethod def from_service_account_file( # type: ignore[override] cls, diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index a9bb1d582..c4282e685 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -177,6 +177,7 @@ class PublisherOptions(NamedTuple): enable_open_telemetry_tracing: bool = False # disabled by default """Open Telemetry tracing is enabled if this is set to True.""" + # Define the type class and default values for flow control settings. # # This class is used when creating a publisher or subscriber client, and diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 9db5e0ef8..d34abe912 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -129,6 +129,38 @@ def test_init_w_custom_transport(creds): assert client.batch_settings.max_messages == 100 +@pytest.mark.parametrize( + "enable_open_telemetry", + [ + True, + False, + ], +) +def test_open_telemetry_publisher_options(creds, enable_open_telemetry): + if sys.version_info >= (3, 8) or enable_open_telemetry is False: + client = publisher.Client( + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=enable_open_telemetry + ), + credentials=creds, + ) + assert client._open_telemetry_enabled == enable_open_telemetry + else: + # Open Telemetry is not supported and hence disabled for Python + # versions 3.7 or below + with pytest.warns( + RuntimeWarning, + match="Open Telemetry for Python version 3.7 or lower is not supported. Disabling Open Telemetry tracing.", + ): + client = publisher.Client( + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=enable_open_telemetry + ), + credentials=creds, + ) + assert client._open_telemetry_enabled is False + + def test_init_w_api_endpoint(creds): client_options = {"api_endpoint": "testendpoint.google.com"} client = publisher.Client(client_options=client_options, credentials=creds) From d700967f972f344a39e30cd5e57f573a80be65ff Mon Sep 17 00:00:00 2001 From: Mukund Date: Wed, 28 Aug 2024 20:52:09 +0000 Subject: [PATCH 04/19] Add OpenTelemetryContextSetter to use for context propagation --- google/cloud/pubsub_v1/open_telemetry/__init__.py | 0 .../open_telemetry/context_propagation.py | 15 +++++++++++++++ .../pubsub_v1/publisher/test_publisher_client.py | 10 ++++++++++ 3 files changed, 25 insertions(+) create mode 100644 google/cloud/pubsub_v1/open_telemetry/__init__.py create mode 100644 google/cloud/pubsub_v1/open_telemetry/context_propagation.py diff --git a/google/cloud/pubsub_v1/open_telemetry/__init__.py b/google/cloud/pubsub_v1/open_telemetry/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/google/cloud/pubsub_v1/open_telemetry/context_propagation.py b/google/cloud/pubsub_v1/open_telemetry/context_propagation.py new file mode 100644 index 000000000..d2bfbee16 --- /dev/null +++ b/google/cloud/pubsub_v1/open_telemetry/context_propagation.py @@ -0,0 +1,15 @@ +from opentelemetry.propagators.textmap import Setter +from google.pubsub_v1 import types as gapic_types + + +class OpenTelemetryContextSetter(Setter): + """ + Used by Open Telemetry for context propagation. + """ + + def set(self, carrier: gapic_types.PubsubMessage, key: str, value: str): + """ + Injects trace context into Pub/Sub message attributes with + "googclient_" prefix. + """ + carrier.attributes["googclient_" + key] = value diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index d34abe912..d6b3da594 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -41,6 +41,9 @@ from google.pubsub_v1 import types as gapic_types from google.pubsub_v1.services.publisher import client as publisher_client from google.pubsub_v1.services.publisher.transports.grpc import PublisherGrpcTransport +from google.cloud.pubsub_v1.open_telemetry.context_propagation import ( + OpenTelemetryContextSetter, +) def _assert_retries_equal(retry, retry2): @@ -161,6 +164,13 @@ def test_open_telemetry_publisher_options(creds, enable_open_telemetry): assert client._open_telemetry_enabled is False +def test_opentelemetry_context_setter(): + msg = gapic_types.PubsubMessage(data=b"foo") + OpenTelemetryContextSetter().set(carrier=msg, key="key", value="bar") + + assert "googclient_key" in msg.attributes.keys() + + def test_init_w_api_endpoint(creds): client_options = {"api_endpoint": "testendpoint.google.com"} client = publisher.Client(client_options=client_options, credentials=creds) From 5a42d0e6e174821e103d90b4f9c5a20b0ab3b7e1 Mon Sep 17 00:00:00 2001 From: Mukund Date: Wed, 28 Aug 2024 21:12:49 +0000 Subject: [PATCH 05/19] Add Publish Create Span and PublishMessageWrapper --- .../open_telemetry/publish_message_wrapper.py | 39 +++++++++++++++++++ google/cloud/pubsub_v1/publisher/client.py | 17 ++++++++ .../publisher/test_publisher_client.py | 36 +++++++++++++++++ 3 files changed, 92 insertions(+) create mode 100644 google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py diff --git a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py new file mode 100644 index 000000000..545f5e650 --- /dev/null +++ b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py @@ -0,0 +1,39 @@ +import sys +from datetime import datetime + +from google.pubsub_v1 import types as gapic_types +from opentelemetry import trace + + +class PublishMessageWrapper: + _OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1.publisher" + _OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub" + + _PUBLISH_START_EVENT: str = "publish start" + + def __init__(self, message: gapic_types.PubsubMessage): + self._message: gapic_types.PubsubMessage = message + + def start_create_span(self, topic: str, ordering_key: str) -> trace.Span: + tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) + with tracer.start_as_current_span( + name=f"{topic} create", + attributes={ + "messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM, + "messaging.destination.name": topic, + "code.function": "google.cloud.pubsub.PublisherClient.publish", + "messaging.gcp_pubsub.message.ordering_key": ordering_key, + "messaging.operation": "create", + "gcp.project_id": topic.split("/")[1], + "messaging.message.body.size": sys.getsizeof(self._message.data), + }, + kind=trace.SpanKind.PRODUCER, + end_on_exit=False, + ) as create_span: + create_span.add_event( + name=self._PUBLISH_START_EVENT, + attributes={ + "timestamp": str(datetime.now()), + }, + ) + return create_span diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 2f314850f..105ea78cb 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -23,6 +23,7 @@ from typing import Any, Dict, Optional, Sequence, Tuple, Type, Union import warnings import sys +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from google.api_core import gapic_v1 from google.auth.credentials import AnonymousCredentials # type: ignore @@ -38,6 +39,12 @@ from google.pubsub_v1 import gapic_version as package_version from google.pubsub_v1 import types as gapic_types from google.pubsub_v1.services.publisher import client as publisher_client +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) +from google.cloud.pubsub_v1.open_telemetry.context_propagation import ( + OpenTelemetryContextSetter, +) __version__ = package_version.__version__ @@ -385,6 +392,16 @@ def publish( # type: ignore[override] ) message = gapic_types.PubsubMessage.wrap(vanilla_pb) + if self._open_telemetry_enabled: + wrapper = PublishMessageWrapper(message) + create_span = wrapper.start_create_span( + topic=topic, ordering_key=ordering_key + ) + TraceContextTextMapPropagator().inject( + carrier=message, + setter=OpenTelemetryContextSetter(), + ) + # Messages should go through flow control to prevent excessive # queuing on the client side (depending on the settings). try: diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index d6b3da594..7f5340724 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -28,6 +28,10 @@ import pytest import time +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry import trace from google.api_core import gapic_v1 from google.api_core import retry as retries @@ -171,6 +175,38 @@ def test_opentelemetry_context_setter(): assert "googclient_key" in msg.attributes.keys() +@pytest.mark.skipif( + sys.version_info < (3, 8), + reason="Open Telemetry not supported below Python version 3.8", +) +def test_opentelemetry_publisher_create_span(creds): + TOPIC = "projects/projectID/topics/topicID" + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=True, + ), + ) + + # Trace Provider setup. + provider = TracerProvider() + memory_exporter = InMemorySpanExporter() + processor = SimpleSpanProcessor(memory_exporter) + provider.add_span_processor(processor) + trace.set_tracer_provider(provider) + + client.publish(TOPIC, b"message") + spans = memory_exporter.get_finished_spans() + + # Open Telemetry allows export of only finished spans. Since, + # publish create span is still not finished at this point of development + # it will not be exported. Hence, the number of exported spans will be 0. + # This test will be revisited when the create span is finished after making + # a publish RPC call and also when there create span is finished when + # an exception / error occurs before the publish RPC call is made. + assert len(spans) == 0 + + def test_init_w_api_endpoint(creds): client_options = {"api_endpoint": "testendpoint.google.com"} client = publisher.Client(client_options=client_options, credentials=creds) From e7aea6cd4d0743c412b797cfeac156439fc11f8e Mon Sep 17 00:00:00 2001 From: Mukund Date: Thu, 29 Aug 2024 21:16:53 +0000 Subject: [PATCH 06/19] Add Publish Flow Control Span --- .../open_telemetry/publish_message_wrapper.py | 32 +++++++++++++++++-- google/cloud/pubsub_v1/publisher/client.py | 21 ++++++++++-- .../publisher/test_publisher_client.py | 16 ++++++---- 3 files changed, 57 insertions(+), 12 deletions(-) diff --git a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py index 545f5e650..9a0a24d46 100644 --- a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py +++ b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py @@ -1,8 +1,10 @@ import sys from datetime import datetime +import warnings from google.pubsub_v1 import types as gapic_types from opentelemetry import trace +from opentelemetry.trace.propagation import set_span_in_context class PublishMessageWrapper: @@ -10,11 +12,12 @@ class PublishMessageWrapper: _OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub" _PUBLISH_START_EVENT: str = "publish start" + _PUBLISH_FLOW_CONTROL: str = "publisher flow control" def __init__(self, message: gapic_types.PubsubMessage): self._message: gapic_types.PubsubMessage = message - def start_create_span(self, topic: str, ordering_key: str) -> trace.Span: + def start_create_span(self, topic: str, ordering_key: str) -> None: tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) with tracer.start_as_current_span( name=f"{topic} create", @@ -36,4 +39,29 @@ def start_create_span(self, topic: str, ordering_key: str) -> trace.Span: "timestamp": str(datetime.now()), }, ) - return create_span + self._create_span: trace.Span = create_span + + def start_publisher_flow_control_span(self) -> None: + tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) + if self._create_span is None: # pragma: NO COVER + warnings.warn( + message="publish create span is None. Hence, not starting publish flow control span", + category=RuntimeWarning, + ) + return + with tracer.start_as_current_span( + name=self._PUBLISH_FLOW_CONTROL, + kind=trace.SpanKind.INTERNAL, + context=set_span_in_context(self._create_span), + end_on_exit=False, + ) as flow_control_span: + self._flow_control_span: trace.Span = flow_control_span + + def end_publisher_flow_control_span(self) -> None: + if self._flow_control_span is None: # pragma: NO COVER + warnings.warn( + message="publish flow control span is None. Hence, not ending it.", + category=RuntimeWarning, + ) + return + self._flow_control_span.end() diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 105ea78cb..8adb87967 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -392,11 +392,10 @@ def publish( # type: ignore[override] ) message = gapic_types.PubsubMessage.wrap(vanilla_pb) + wrapper: PublishMessageWrapper = None if self._open_telemetry_enabled: wrapper = PublishMessageWrapper(message) - create_span = wrapper.start_create_span( - topic=topic, ordering_key=ordering_key - ) + wrapper.start_create_span(topic=topic, ordering_key=ordering_key) TraceContextTextMapPropagator().inject( carrier=message, setter=OpenTelemetryContextSetter(), @@ -405,7 +404,23 @@ def publish( # type: ignore[override] # Messages should go through flow control to prevent excessive # queuing on the client side (depending on the settings). try: + if self._open_telemetry_enabled: + if wrapper: + wrapper.start_publisher_flow_control_span() + else: # pragma: NO COVER + warnings.warn( + message="PubSubMessageWrapper is None. Not starting publisher flow control span.", + category=RuntimeWarning, + ) self._flow_controller.add(message) + if self._open_telemetry_enabled: + if wrapper: + wrapper.end_publisher_flow_control_span() + else: # pragma: NO COVER + warnings.warn( + message="PubSubMessageWrapper is None. Not ending publisher flow control span.", + category=RuntimeWarning, + ) except exceptions.FlowControlLimitError as exc: future = futures.Future() future.set_exception(exc) diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 7f5340724..9d82966de 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -198,13 +198,15 @@ def test_opentelemetry_publisher_create_span(creds): client.publish(TOPIC, b"message") spans = memory_exporter.get_finished_spans() - # Open Telemetry allows export of only finished spans. Since, - # publish create span is still not finished at this point of development - # it will not be exported. Hence, the number of exported spans will be 0. - # This test will be revisited when the create span is finished after making - # a publish RPC call and also when there create span is finished when - # an exception / error occurs before the publish RPC call is made. - assert len(spans) == 0 + # Publisher Flow Control Span should have ended and hence be the only + # exported span available for the test at this point in development. + assert len(spans) == 1 + flow_control_span = spans[0] + assert flow_control_span.name == "publisher flow control" + assert flow_control_span.kind == trace.SpanKind.INTERNAL + # Assert the Publisher Flow Control Span has a parent - the Publish Create + # Span that is still not finished. + assert len(flow_control_span._parent) is not None def test_init_w_api_endpoint(creds): From 2e3f576c66875b232a504129b8711fde25c681cc Mon Sep 17 00:00:00 2001 From: Mukund Date: Thu, 29 Aug 2024 23:19:29 +0000 Subject: [PATCH 07/19] End Publisher Flow Control and Create Span on FlowControlLimitError --- .../open_telemetry/publish_message_wrapper.py | 21 ++++++- google/cloud/pubsub_v1/publisher/client.py | 10 ++++ tests/unit/pubsub_v1/conftest.py | 10 ++++ .../publisher/test_publisher_client.py | 55 ++++++++++++++++++- 4 files changed, 92 insertions(+), 4 deletions(-) diff --git a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py index 9a0a24d46..be15330aa 100644 --- a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py +++ b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py @@ -41,6 +41,20 @@ def start_create_span(self, topic: str, ordering_key: str) -> None: ) self._create_span: trace.Span = create_span + def end_create_span(self, exc: Exception = None) -> None: + if self._create_span is None: # pragma: NO COVER + warnings.warn( + message="publish create span is None. Hence, not ending it.", + category=RuntimeWarning, + ) + return + if exc: + self._create_span.record_exception(exception=exc) + self._create_span.set_status( + trace.Status(status_code=trace.StatusCode.ERROR) + ) + self._create_span.end() + def start_publisher_flow_control_span(self) -> None: tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) if self._create_span is None: # pragma: NO COVER @@ -57,11 +71,16 @@ def start_publisher_flow_control_span(self) -> None: ) as flow_control_span: self._flow_control_span: trace.Span = flow_control_span - def end_publisher_flow_control_span(self) -> None: + def end_publisher_flow_control_span(self, exc: Exception = None) -> None: if self._flow_control_span is None: # pragma: NO COVER warnings.warn( message="publish flow control span is None. Hence, not ending it.", category=RuntimeWarning, ) return + if exc: + self._flow_control_span.record_exception(exception=exc) + self._flow_control_span.set_status( + trace.Status(status_code=trace.StatusCode.ERROR) + ) self._flow_control_span.end() diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 8adb87967..13990a0d4 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -422,6 +422,16 @@ def publish( # type: ignore[override] category=RuntimeWarning, ) except exceptions.FlowControlLimitError as exc: + if self._open_telemetry_enabled: + if wrapper: + wrapper.end_publisher_flow_control_span(exc) + wrapper.end_create_span(exc) + else: # pragma: NO COVER + warnings.warn( + message="PubSubMessageWrapper is None. Not ending publisher create and flow control spans on FlowControlLimitError.", + category=RuntimeWarning, + ) + future = futures.Future() future.set_exception(exc) return future diff --git a/tests/unit/pubsub_v1/conftest.py b/tests/unit/pubsub_v1/conftest.py index dc4192931..897301343 100644 --- a/tests/unit/pubsub_v1/conftest.py +++ b/tests/unit/pubsub_v1/conftest.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from opentelemetry.sdk.trace import TracerProvider + import google.auth.credentials import pytest @@ -23,3 +25,11 @@ def creds(): GOOGLE_APPLICATION_CREDENTIALS set. """ yield google.auth.credentials.AnonymousCredentials() + + +@pytest.fixture(scope="session", autouse=True) +def provider(): + """ + Provide an Open Telemetry Tracer that can be re-used across tests. + """ + yield TracerProvider() diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 9d82966de..772d236b6 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -29,7 +29,6 @@ import pytest import time from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter -from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry import trace @@ -179,7 +178,58 @@ def test_opentelemetry_context_setter(): sys.version_info < (3, 8), reason="Open Telemetry not supported below Python version 3.8", ) -def test_opentelemetry_publisher_create_span(creds): +def test_opentelemetry_flow_control_exception(creds, provider): + publisher_options = types.PublisherOptions( + flow_control=types.PublishFlowControl( + message_limit=10, + byte_limit=150, + limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, + ), + enable_open_telemetry_tracing=True, + ) + client = publisher.Client(credentials=creds, publisher_options=publisher_options) + + mock_batch = mock.Mock(spec=client._batch_class) + topic = "topic/path" + client._set_batch(topic, mock_batch) + + # Trace Provider setup. + memory_exporter = InMemorySpanExporter() + processor = SimpleSpanProcessor(memory_exporter) + provider.add_span_processor(processor) + trace.set_tracer_provider(provider) + + future1 = client.publish(topic, b"a" * 100) + future2 = client.publish(topic, b"b" * 100) + + future1.result() # no error, still within flow control limits + with pytest.raises(exceptions.FlowControlLimitError): + future2.result() + + spans = memory_exporter.get_finished_spans() + # Span 1 = Publisher Flow Control Span of first publish + # Span 2 = Publisher Flow Control Span of second publish(raises FlowControlLimitError) + # Span 3 = Publish Create Span of second publish(raises FlowControlLimitError) + assert len(spans) == 3 + + failed_flow_control_span = spans[1] + finished_publish_create_span = spans[2] + assert failed_flow_control_span.name == "publisher flow control" + assert failed_flow_control_span.kind == trace.SpanKind.INTERNAL + assert ( + failed_flow_control_span._parent[1] == finished_publish_create_span._context[1] + ) + assert failed_flow_control_span.status.status_code == trace.StatusCode.ERROR + + assert len(failed_flow_control_span.events) == 1 + assert failed_flow_control_span.events[0].name == "exception" + + +@pytest.mark.skipif( + sys.version_info < (3, 8), + reason="Open Telemetry not supported below Python version 3.8", +) +def test_opentelemetry_publisher_create_span(creds, provider): TOPIC = "projects/projectID/topics/topicID" client = publisher.Client( credentials=creds, @@ -189,7 +239,6 @@ def test_opentelemetry_publisher_create_span(creds): ) # Trace Provider setup. - provider = TracerProvider() memory_exporter = InMemorySpanExporter() processor = SimpleSpanProcessor(memory_exporter) provider.add_span_processor(processor) From 0f865d7688466fb17dcdd9830c3ba49e4eaf25c0 Mon Sep 17 00:00:00 2001 From: Mukund Date: Fri, 30 Aug 2024 15:28:48 +0000 Subject: [PATCH 08/19] Add Publisher Batching Span --- .../open_telemetry/publish_message_wrapper.py | 39 ++++++++- google/cloud/pubsub_v1/publisher/client.py | 83 ++++++++++++------ .../publisher/test_publisher_client.py | 86 ++++++++++++++++--- 3 files changed, 170 insertions(+), 38 deletions(-) diff --git a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py index be15330aa..02e3b3c56 100644 --- a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py +++ b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py @@ -10,6 +10,7 @@ class PublishMessageWrapper: _OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1.publisher" _OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub" + _OPEN_TELEMETRY_PUBLISHER_BATCHING = "publisher batching" _PUBLISH_START_EVENT: str = "publish start" _PUBLISH_FLOW_CONTROL: str = "publisher flow control" @@ -41,10 +42,10 @@ def start_create_span(self, topic: str, ordering_key: str) -> None: ) self._create_span: trace.Span = create_span - def end_create_span(self, exc: Exception = None) -> None: + def end_create_span(self, exc: BaseException = None) -> None: if self._create_span is None: # pragma: NO COVER warnings.warn( - message="publish create span is None. Hence, not ending it.", + message="publish create span is None. Hence, not ending it", category=RuntimeWarning, ) return @@ -71,10 +72,10 @@ def start_publisher_flow_control_span(self) -> None: ) as flow_control_span: self._flow_control_span: trace.Span = flow_control_span - def end_publisher_flow_control_span(self, exc: Exception = None) -> None: + def end_publisher_flow_control_span(self, exc: BaseException = None) -> None: if self._flow_control_span is None: # pragma: NO COVER warnings.warn( - message="publish flow control span is None. Hence, not ending it.", + message="publish flow control span is None. Hence, not ending it", category=RuntimeWarning, ) return @@ -84,3 +85,33 @@ def end_publisher_flow_control_span(self, exc: Exception = None) -> None: trace.Status(status_code=trace.StatusCode.ERROR) ) self._flow_control_span.end() + + def start_publisher_batching_span(self) -> None: + if self._create_span is None: # pragma: NO COVER + warnings.warn( + message="publish create span is None. Hence, not starting publisher batching span", + category=RuntimeWarning, + ) + return + tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) + with tracer.start_as_current_span( + name=self._OPEN_TELEMETRY_PUBLISHER_BATCHING, + kind=trace.SpanKind.INTERNAL, + context=set_span_in_context(self._create_span), + end_on_exit=False, + ) as batching_span: + self._batching_span = batching_span + + def end_publisher_batching_span(self, exc: BaseException = None) -> None: + if self._batching_span is None: # pragma: NO COVER + warnings.warn( + message="publisher batching span is None. Hence, not ending it", + category=RuntimeWarning, + ) + return + if exc: + self._batching_span.record_exception(exception=exc) + self._batching_span.set_status( + trace.Status(status_code=trace.StatusCode.ERROR) + ) + self._batching_span.end() diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 13990a0d4..f0eee3bb9 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -445,31 +445,66 @@ def on_publish_done(future): if timeout is gapic_v1.method.DEFAULT: # if custom timeout not passed in timeout = self.publisher_options.timeout + if self._open_telemetry_enabled: + if wrapper: + wrapper.start_publisher_batching_span() + else: # pragma: NO COVER + warnings.warn( + message="PublishMessageWrapper is None. Hence, not starting publisher batching span", + category=RuntimeWarning, + ) with self._batch_lock: - if self._is_stopped: - raise RuntimeError("Cannot publish on a stopped publisher.") - - # Set retry timeout to "infinite" when message ordering is enabled. - # Note that this then also impacts messages added with an empty - # ordering key. - if self._enable_message_ordering: - if retry is gapic_v1.method.DEFAULT: - # use the default retry for the publish GRPC method as a base - transport = self._transport - base_retry = transport._wrapped_methods[transport.publish]._retry - retry = base_retry.with_deadline(2.0**32) - # timeout needs to be overridden and set to infinite in - # addition to the retry deadline since both determine - # the duration for which retries are attempted. - timeout = 2.0**32 - elif retry is not None: - retry = retry.with_deadline(2.0**32) - timeout = 2.0**32 - - # Delegate the publishing to the sequencer. - sequencer = self._get_or_create_sequencer(topic, ordering_key) - future = sequencer.publish(message, retry=retry, timeout=timeout) - future.add_done_callback(on_publish_done) + try: + if self._is_stopped: + raise RuntimeError("Cannot publish on a stopped publisher.") + + # Set retry timeout to "infinite" when message ordering is enabled. + # Note that this then also impacts messages added with an empty + # ordering key. + if self._enable_message_ordering: + if retry is gapic_v1.method.DEFAULT: + # use the default retry for the publish GRPC method as a base + transport = self._transport + base_retry = transport._wrapped_methods[ + transport.publish + ]._retry + retry = base_retry.with_deadline(2.0**32) + # timeout needs to be overridden and set to infinite in + # addition to the retry deadline since both determine + # the duration for which retries are attempted. + timeout = 2.0**32 + elif retry is not None: + retry = retry.with_deadline(2.0**32) + timeout = 2.0**32 + + # Delegate the publishing to the sequencer. + sequencer = self._get_or_create_sequencer(topic, ordering_key) + future = sequencer.publish(message, retry=retry, timeout=timeout) + future.add_done_callback(on_publish_done) + except BaseException as be: + # Exceptions can be thrown when attempting to add messages to + # the batch. If they're thrown, record them in publisher + # batching and create span, end them and bubble + # the exception up. + if self._open_telemetry_enabled: + if wrapper: + wrapper.end_publisher_batching_span(be) + wrapper.end_create_span(be) + else: # pragma: NO COVER + warnings.warn( + message="PublishMessageWrapper is None. Hence, not recording exception and ending publisher batching span and create span", + category=RuntimeWarning, + ) + raise be + + if self._open_telemetry_enabled: + if wrapper: + wrapper.end_publisher_batching_span() + else: # pragma: NO COVER + warnings.warn( + message="PublishMessageWrapper is None. Hence, not ending publisher batching span", + category=RuntimeWarning, + ) # Create a timer thread if necessary to enforce the batching # timeout. diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 772d236b6..cc3a38bf0 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -174,6 +174,62 @@ def test_opentelemetry_context_setter(): assert "googclient_key" in msg.attributes.keys() +@pytest.mark.skipif( + sys.version_info < (3, 8), + reason="Open Telemetry not supported below Python version 3.8", +) +@pytest.mark.parametrize( + "enable_open_telemetry", + [ + True, + False, # for test code coverage - exception thrown but Open Telemetry disabled + ], +) +def test_opentelemetry_publisher_batching_exception( + creds, provider, enable_open_telemetry +): + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=enable_open_telemetry, + ), + ) + + # Setup Open Telemetry tracing + memory_exporter = InMemorySpanExporter() + processor = SimpleSpanProcessor(memory_exporter) + provider.add_span_processor(processor) + trace.set_tracer_provider(provider) + + # Throw an exception when sequencer.publish() is called + sequencer = mock.Mock(spec=ordered_sequencer.OrderedSequencer) + sequencer.publish = mock.Mock(side_effect=RuntimeError("some error")) + client._get_or_create_sequencer = mock.Mock(return_value=sequencer) + + TOPIC = "projects/projectID/topics/topicID" + with pytest.raises(RuntimeError): + client.publish(TOPIC, b"message") + + spans = memory_exporter.get_finished_spans() + + if enable_open_telemetry: + # Span 1: Publisher Flow Control span + # Span 2: Publisher Batching span + # Span 3: Create Publish span + assert len(spans) == 3 + + batching_span = spans[1] + create_span = spans[2] + assert batching_span.name == "publisher batching" + assert batching_span.kind == trace.SpanKind.INTERNAL + assert batching_span._parent[1] == create_span._context[1] + + # Verify exception recorded by the Publisher Batching span. + assert batching_span.status.status_code == trace.StatusCode.ERROR + assert len(batching_span.events) == 1 + assert batching_span.events[0].name == "exception" + + @pytest.mark.skipif( sys.version_info < (3, 8), reason="Open Telemetry not supported below Python version 3.8", @@ -208,12 +264,13 @@ def test_opentelemetry_flow_control_exception(creds, provider): spans = memory_exporter.get_finished_spans() # Span 1 = Publisher Flow Control Span of first publish + # Span 2 = Publisher Batching Span of first publish # Span 2 = Publisher Flow Control Span of second publish(raises FlowControlLimitError) # Span 3 = Publish Create Span of second publish(raises FlowControlLimitError) - assert len(spans) == 3 + assert len(spans) == 4 - failed_flow_control_span = spans[1] - finished_publish_create_span = spans[2] + failed_flow_control_span = spans[2] + finished_publish_create_span = spans[3] assert failed_flow_control_span.name == "publisher flow control" assert failed_flow_control_span.kind == trace.SpanKind.INTERNAL assert ( @@ -229,7 +286,7 @@ def test_opentelemetry_flow_control_exception(creds, provider): sys.version_info < (3, 8), reason="Open Telemetry not supported below Python version 3.8", ) -def test_opentelemetry_publisher_create_span(creds, provider): +def test_opentelemetry_publish(creds, provider): TOPIC = "projects/projectID/topics/topicID" client = publisher.Client( credentials=creds, @@ -247,15 +304,24 @@ def test_opentelemetry_publisher_create_span(creds, provider): client.publish(TOPIC, b"message") spans = memory_exporter.get_finished_spans() - # Publisher Flow Control Span should have ended and hence be the only - # exported span available for the test at this point in development. - assert len(spans) == 1 + # Span 1: Publisher Flow control span + # Span 2: Publisher Batching span + # Publish Create Span would still be active, and hence not exported + # at this point in development. + assert len(spans) == 2 + flow_control_span = spans[0] assert flow_control_span.name == "publisher flow control" assert flow_control_span.kind == trace.SpanKind.INTERNAL - # Assert the Publisher Flow Control Span has a parent - the Publish Create - # Span that is still not finished. - assert len(flow_control_span._parent) is not None + # Assert the Publisher Flow Control Span has a parent(the Publish Create + # Span is still not finished, and hence the value of parent cannot yet be + # asserted at this point in development) + assert flow_control_span._parent is not None + + batching_span = spans[1] + assert batching_span.name == "publisher batching" + assert batching_span.kind == trace.SpanKind.INTERNAL + assert batching_span._parent is not None def test_init_w_api_endpoint(creds): From b4a51dfd572fbf6507d0bc5943d01073ec164773 Mon Sep 17 00:00:00 2001 From: Mukund Date: Fri, 30 Aug 2024 17:15:12 +0000 Subject: [PATCH 09/19] Fix TraceContextPropagation * Current code incorrectly attempts trace context propagation outside of the context of the create span. This results in trace context propagation not occurring and the "googclient_" prefixed attributes never being added to the message. * The fix adds moves the TraceContextPropagation code to the create span function, where its context is available and also adds in a test to verify that the attributes are populated as expected. --- .../open_telemetry/publish_message_wrapper.py | 8 +++++ google/cloud/pubsub_v1/publisher/client.py | 8 ----- .../publisher/test_publisher_client.py | 31 ++++++++++++++++++- 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py index 02e3b3c56..3506970be 100644 --- a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py +++ b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py @@ -5,6 +5,10 @@ from google.pubsub_v1 import types as gapic_types from opentelemetry import trace from opentelemetry.trace.propagation import set_span_in_context +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from google.cloud.pubsub_v1.open_telemetry.context_propagation import ( + OpenTelemetryContextSetter, +) class PublishMessageWrapper: @@ -41,6 +45,10 @@ def start_create_span(self, topic: str, ordering_key: str) -> None: }, ) self._create_span: trace.Span = create_span + TraceContextTextMapPropagator().inject( + carrier=self._message, + setter=OpenTelemetryContextSetter(), + ) def end_create_span(self, exc: BaseException = None) -> None: if self._create_span is None: # pragma: NO COVER diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index f0eee3bb9..3e2ea71df 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -23,7 +23,6 @@ from typing import Any, Dict, Optional, Sequence, Tuple, Type, Union import warnings import sys -from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from google.api_core import gapic_v1 from google.auth.credentials import AnonymousCredentials # type: ignore @@ -42,9 +41,6 @@ from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( PublishMessageWrapper, ) -from google.cloud.pubsub_v1.open_telemetry.context_propagation import ( - OpenTelemetryContextSetter, -) __version__ = package_version.__version__ @@ -396,10 +392,6 @@ def publish( # type: ignore[override] if self._open_telemetry_enabled: wrapper = PublishMessageWrapper(message) wrapper.start_create_span(topic=topic, ordering_key=ordering_key) - TraceContextTextMapPropagator().inject( - carrier=message, - setter=OpenTelemetryContextSetter(), - ) # Messages should go through flow control to prevent excessive # queuing on the client side (depending on the settings). diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index cc3a38bf0..6ac64ac50 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -174,6 +174,35 @@ def test_opentelemetry_context_setter(): assert "googclient_key" in msg.attributes.keys() +@pytest.mark.skipif( + sys.version_info < (3, 8), + reason="Open Telemetry not supported below Python version 3.8", +) +def test_opentelemetry_context_propagation(creds, provider): + TOPIC = "projects/projectID/topics/topicID" + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=True, + ), + ) + + # Set up Open Telemetry tracing. + memory_exporter = InMemorySpanExporter() + processor = SimpleSpanProcessor(memory_exporter) + provider.add_span_processor(processor) + trace.set_tracer_provider(provider) + + message_mock = mock.Mock(spec=publisher.flow_controller.FlowController.add) + client._flow_controller.add = message_mock + client.publish(TOPIC, b"data") + + message_mock.assert_called_once() + args = message_mock.call_args.args + assert len(args) == 1 + assert "googclient_traceparent" in args[0].attributes + + @pytest.mark.skipif( sys.version_info < (3, 8), reason="Open Telemetry not supported below Python version 3.8", @@ -255,7 +284,7 @@ def test_opentelemetry_flow_control_exception(creds, provider): provider.add_span_processor(processor) trace.set_tracer_provider(provider) - future1 = client.publish(topic, b"a" * 100) + future1 = client.publish(topic, b"a" * 60) future2 = client.publish(topic, b"b" * 100) future1.result() # no error, still within flow control limits From f98b3c5787004ae506b67cca0a0f1834a87e4bc1 Mon Sep 17 00:00:00 2001 From: Mukund Date: Fri, 30 Aug 2024 20:50:24 +0000 Subject: [PATCH 10/19] Plumb PublishMessageWrapper to sequencers and batch publish * Also fix a mistake introduced in an earlier commit where I only initialize the PublishMessageWrapper with the message when Open Telemetry is enabled. This was resulting in PublishMessageWrapper not None and hence the message not being passed to the sequencers when Open Telemetry is disabled. The tests I've added caught this issue and the issue is now resolved at this point in development. --- .../open_telemetry/publish_message_wrapper.py | 14 + .../cloud/pubsub_v1/publisher/_batch/base.py | 8 +- .../pubsub_v1/publisher/_batch/thread.py | 34 +-- .../publisher/_sequencer/ordered_sequencer.py | 14 +- .../_sequencer/unordered_sequencer.py | 12 +- google/cloud/pubsub_v1/publisher/client.py | 7 +- .../pubsub_v1/publisher/batch/test_base.py | 5 +- .../pubsub_v1/publisher/batch/test_thread.py | 251 +++++++++++++----- .../sequencer/test_ordered_sequencer.py | 7 +- .../sequencer/test_unordered_sequencer.py | 11 +- .../publisher/test_publisher_client.py | 44 ++- 11 files changed, 292 insertions(+), 115 deletions(-) diff --git a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py index 3506970be..6c0c1d37d 100644 --- a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py +++ b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py @@ -22,6 +22,20 @@ class PublishMessageWrapper: def __init__(self, message: gapic_types.PubsubMessage): self._message: gapic_types.PubsubMessage = message + @property + def message(self): + return self._message + + @message.setter + def message(self, message: gapic_types.PubsubMessage): + self._message = message + + def __eq__(self, other): + """Used for pytest asserts to compare two PublishMessageWrapper objects with the same message.""" + if isinstance(self, other.__class__): + return self.message == other.message + return False + def start_create_span(self, topic: str, ordering_key: str) -> None: tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) with tracer.start_as_current_span( diff --git a/google/cloud/pubsub_v1/publisher/_batch/base.py b/google/cloud/pubsub_v1/publisher/_batch/base.py index 52505996b..c91e0a444 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/base.py +++ b/google/cloud/pubsub_v1/publisher/_batch/base.py @@ -19,6 +19,10 @@ import typing from typing import Optional, Sequence +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) + if typing.TYPE_CHECKING: # pragma: NO COVER from google.cloud import pubsub_v1 @@ -54,7 +58,7 @@ class Batch(metaclass=abc.ABCMeta): def __len__(self): """Return the number of messages currently in the batch.""" - return len(self.messages) + return len(self.message_wrappers) @staticmethod @abc.abstractmethod @@ -68,7 +72,7 @@ def make_lock(): # pragma: NO COVER @property @abc.abstractmethod - def messages(self) -> Sequence["gapic_types.PubsubMessage"]: # pragma: NO COVER + def message_wrappers(self) -> Sequence[PublishMessageWrapper]: # pragma: NO COVER """Return the messages currently in the batch. Returns: diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index 1617f8c90..977f5765b 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -26,6 +26,9 @@ from google.cloud.pubsub_v1.publisher import futures from google.cloud.pubsub_v1.publisher._batch import base from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) if typing.TYPE_CHECKING: # pragma: NO COVER from google.cloud import pubsub_v1 @@ -108,7 +111,7 @@ def __init__( # status changed from ACCEPTING_MESSAGES to any other # in order to avoid race conditions self._futures: List[futures.Future] = [] - self._messages: List[gapic_types.PubsubMessage] = [] + self._message_wrappers: List[PublishMessageWrapper] = [] self._status = base.BatchStatus.ACCEPTING_MESSAGES # The initial size is not zero, we need to account for the size overhead @@ -134,9 +137,9 @@ def client(self) -> "PublisherClient": return self._client @property - def messages(self) -> Sequence[gapic_types.PubsubMessage]: - """The messages currently in the batch.""" - return self._messages + def message_wrappers(self) -> Sequence[PublishMessageWrapper]: + """The message wrappers currently in the batch.""" + return self._message_wrappers @property def settings(self) -> "types.BatchSettings": @@ -259,7 +262,7 @@ def _commit(self) -> None: # https://github.com/googleapis/google-cloud-python/issues/8036 # Sanity check: If there are no messages, no-op. - if not self._messages: + if not self._message_wrappers: _LOGGER.debug("No messages to publish, exiting commit") self._status = base.BatchStatus.SUCCESS return @@ -273,7 +276,7 @@ def _commit(self) -> None: # Performs retries for errors defined by the retry configuration. response = self._client._gapic_publish( topic=self._topic, - messages=self._messages, + messages=[wrapper.message for wrapper in self._message_wrappers], retry=self._commit_retry, timeout=self._commit_timeout, ) @@ -326,7 +329,8 @@ def _commit(self) -> None: self._batch_done_callback(batch_transport_succeeded) def publish( - self, message: gapic_types.PubsubMessage + self, + wrapper: PublishMessageWrapper, ) -> Optional["pubsub_v1.publisher.futures.Future"]: """Publish a single message. @@ -338,7 +342,7 @@ def publish( This method is called by :meth:`~.PublisherClient.publish`. Args: - message: The Pub/Sub message. + wrapper: The Pub/Sub message wrapper. Returns: An object conforming to the :class:`~concurrent.futures.Future` interface @@ -351,12 +355,12 @@ def publish( """ # Coerce the type, just in case. - if not isinstance(message, gapic_types.PubsubMessage): + if not isinstance(wrapper.message, gapic_types.PubsubMessage): # For performance reasons, the message should be constructed by directly # using the raw protobuf class, and only then wrapping it into the # higher-level PubsubMessage class. - vanilla_pb = _raw_proto_pubbsub_message(**message) - message = gapic_types.PubsubMessage.wrap(vanilla_pb) + vanilla_pb = _raw_proto_pubbsub_message(**wrapper.message) + wrapper.message = gapic_types.PubsubMessage.wrap(vanilla_pb) future = None @@ -369,7 +373,7 @@ def publish( return None size_increase = gapic_types.PublishRequest( - messages=[message] + messages=[wrapper.message] )._pb.ByteSize() if (self._base_request_size + size_increase) > _SERVER_PUBLISH_MAX_BYTES: @@ -381,14 +385,14 @@ def publish( raise exceptions.MessageTooLargeError(err_msg) new_size = self._size + size_increase - new_count = len(self._messages) + 1 + new_count = len(self._message_wrappers) + 1 size_limit = min(self.settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES) overflow = new_size > size_limit or new_count >= self.settings.max_messages - if not self._messages or not overflow: + if not self._message_wrappers or not overflow: # Store the actual message in the batch's message queue. - self._messages.append(message) + self._message_wrappers.append(wrapper) self._size = new_size # Track the future on this batch (so that the result of the diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py index 30c76a44f..9644a1fa2 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py @@ -23,7 +23,9 @@ from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher._sequencer import base as sequencer_base from google.cloud.pubsub_v1.publisher._batch import base as batch_base -from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) if typing.TYPE_CHECKING: # pragma: NO COVER from google.cloud.pubsub_v1 import types @@ -262,15 +264,15 @@ def _create_batch( def publish( self, - message: gapic_types.PubsubMessage, + wrapper: PublishMessageWrapper, retry: "OptionalRetry" = gapic_v1.method.DEFAULT, timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, ) -> futures.Future: """Publish message for this ordering key. Args: - message: - The Pub/Sub message. + wrapper: + The Pub/Sub message wrapper. retry: The retry settings to apply when publishing the message. timeout: @@ -317,11 +319,11 @@ def publish( self._ordered_batches.append(new_batch) batch = self._ordered_batches[-1] - future = batch.publish(message) + future = batch.publish(wrapper) while future is None: batch = self._create_batch(commit_retry=retry, commit_timeout=timeout) self._ordered_batches.append(batch) - future = batch.publish(message) + future = batch.publish(wrapper) return future diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py index 7d57aa821..7dbd3f084 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py @@ -18,7 +18,9 @@ from google.api_core import gapic_v1 from google.cloud.pubsub_v1.publisher._sequencer import base -from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) if typing.TYPE_CHECKING: # pragma: NO COVER from google.cloud.pubsub_v1.publisher import _batch @@ -115,15 +117,15 @@ def _create_batch( def publish( self, - message: gapic_types.PubsubMessage, + wrapper: PublishMessageWrapper, retry: "OptionalRetry" = gapic_v1.method.DEFAULT, timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, ) -> "futures.Future": """Batch message into existing or new batch. Args: - message: - The Pub/Sub message. + wrapper: + The Pub/Sub message wrapper. retry: The retry settings to apply when publishing the message. timeout: @@ -151,7 +153,7 @@ def publish( future = None while future is None: # Might throw MessageTooLargeError - future = batch.publish(message) + future = batch.publish(wrapper) # batch is full, triggering commit_when_full if future is None: batch = self._create_batch(commit_retry=retry, commit_timeout=timeout) diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 3e2ea71df..f848bc9f2 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -388,9 +388,8 @@ def publish( # type: ignore[override] ) message = gapic_types.PubsubMessage.wrap(vanilla_pb) - wrapper: PublishMessageWrapper = None + wrapper: PublishMessageWrapper = PublishMessageWrapper(message) if self._open_telemetry_enabled: - wrapper = PublishMessageWrapper(message) wrapper.start_create_span(topic=topic, ordering_key=ordering_key) # Messages should go through flow control to prevent excessive @@ -471,7 +470,9 @@ def on_publish_done(future): # Delegate the publishing to the sequencer. sequencer = self._get_or_create_sequencer(topic, ordering_key) - future = sequencer.publish(message, retry=retry, timeout=timeout) + future = sequencer.publish( + wrapper=wrapper, retry=retry, timeout=timeout + ) future.add_done_callback(on_publish_done) except BaseException as be: # Exceptions can be thrown when attempting to add messages to diff --git a/tests/unit/pubsub_v1/publisher/batch/test_base.py b/tests/unit/pubsub_v1/publisher/batch/test_base.py index a95d72c12..ae5dbea04 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_base.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_base.py @@ -21,6 +21,9 @@ from google.cloud.pubsub_v1.publisher._batch.base import BatchStatus from google.cloud.pubsub_v1.publisher._batch.thread import Batch from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) def create_batch(status, settings=types.BatchSettings()): @@ -41,5 +44,5 @@ def create_batch(status, settings=types.BatchSettings()): def test_len(): batch = create_batch(status=BatchStatus.ACCEPTING_MESSAGES) assert len(batch) == 0 - batch.publish(gapic_types.PubsubMessage(data=b"foo")) + batch.publish(PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"foo"))) assert len(batch) == 1 diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 2752d62a2..958419788 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -36,6 +36,9 @@ from google.cloud.pubsub_v1.publisher._batch import thread from google.cloud.pubsub_v1.publisher._batch.thread import Batch from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) def create_client(): @@ -126,8 +129,16 @@ def test_commit_no_op(): def test_blocking__commit(): batch = create_batch() futures = ( - batch.publish({"data": b"This is my message."}), - batch.publish({"data": b"This is another message."}), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"This is my message.") + ) + ), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"This is another message.") + ) + ), ) # Set up the underlying API publish method to return a PublishResponse. @@ -160,7 +171,11 @@ def test_blocking__commit(): def test_blocking__commit_custom_retry(): batch = create_batch(commit_retry=mock.sentinel.custom_retry) - batch.publish({"data": b"This is my message."}) + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"This is my message.") + ) + ) # Set up the underlying API publish method to return a PublishResponse. publish_response = gapic_types.PublishResponse(message_ids=["a"]) @@ -182,7 +197,11 @@ def test_blocking__commit_custom_retry(): def test_blocking__commit_custom_timeout(): batch = create_batch(commit_timeout=mock.sentinel.custom_timeout) - batch.publish({"data": b"This is my message."}) + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"This is my message.") + ) + ) # Set up the underlying API publish method to return a PublishResponse. publish_response = gapic_types.PublishResponse(message_ids=["a"]) @@ -217,13 +236,21 @@ def api_publish_delay(topic="", messages=(), retry=None, timeout=None): ) with api_publish_patch: - batch.publish({"data": b"first message"}) + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"first message") + ) + ) start = datetime.datetime.now() event_set = api_publish_called.wait(timeout=1.0) if not event_set: # pragma: NO COVER pytest.fail("API publish was not called in time") - batch.publish({"data": b"second message"}) + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"second message") + ) + ) end = datetime.datetime.now() # While a batch commit in progress, waiting for the API publish call to @@ -266,8 +293,16 @@ def test_blocking__commit_no_messages(): def test_blocking__commit_wrong_messageid_length(): batch = create_batch() futures = ( - batch.publish({"data": b"blah blah blah"}), - batch.publish({"data": b"blah blah blah blah"}), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah") + ) + ), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah blah") + ) + ), ) # Set up a PublishResponse that only returns one message ID. @@ -287,8 +322,16 @@ def test_blocking__commit_wrong_messageid_length(): def test_block__commmit_api_error(): batch = create_batch() futures = ( - batch.publish({"data": b"blah blah blah"}), - batch.publish({"data": b"blah blah blah blah"}), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah") + ) + ), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah blah") + ) + ), ) # Make the API throw an error when publishing. @@ -306,8 +349,16 @@ def test_block__commmit_api_error(): def test_block__commmit_retry_error(): batch = create_batch() futures = ( - batch.publish({"data": b"blah blah blah"}), - batch.publish({"data": b"blah blah blah blah"}), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah") + ) + ), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah blah") + ) + ), ) # Make the API throw an error when publishing. @@ -324,24 +375,31 @@ def test_block__commmit_retry_error(): def test_publish_updating_batch_size(): batch = create_batch(topic="topic_foo") - messages = ( - gapic_types.PubsubMessage(data=b"foobarbaz"), - gapic_types.PubsubMessage(data=b"spameggs"), - gapic_types.PubsubMessage(data=b"1335020400"), + wrappers = ( + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz"), + ), + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"spameggs"), + ), + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"1335020400"), + ), ) # Publish each of the messages, which should save them to the batch. - futures = [batch.publish(message) for message in messages] + futures = [batch.publish(wrapper) for wrapper in wrappers] # There should be three messages on the batch, and three futures. - assert len(batch.messages) == 3 + assert len(batch.message_wrappers) == 3 assert batch._futures == futures # The size should have been incremented by the sum of the size # contributions of each message to the PublishRequest. base_request_size = gapic_types.PublishRequest(topic="topic_foo")._pb.ByteSize() expected_request_size = base_request_size + sum( - gapic_types.PublishRequest(messages=[msg])._pb.ByteSize() for msg in messages + gapic_types.PublishRequest(messages=[wrapper.message])._pb.ByteSize() + for wrapper in wrappers ) assert batch.size == expected_request_size @@ -350,68 +408,82 @@ def test_publish_updating_batch_size(): def test_publish(): batch = create_batch() - message = gapic_types.PubsubMessage() - future = batch.publish(message) + wrapper = PublishMessageWrapper(message=gapic_types.PubsubMessage()) + future = batch.publish(wrapper) - assert len(batch.messages) == 1 + assert len(batch.message_wrappers) == 1 assert batch._futures == [future] def test_publish_max_messages_zero(): batch = create_batch(topic="topic_foo", max_messages=0) - - message = gapic_types.PubsubMessage(data=b"foobarbaz") + wrapper = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz"), + ) with mock.patch.object(batch, "commit") as commit: - future = batch.publish(message) + future = batch.publish(wrapper) assert future is not None - assert len(batch.messages) == 1 + assert len(batch.message_wrappers) == 1 assert batch._futures == [future] commit.assert_called_once() def test_publish_max_messages_enforced(): batch = create_batch(topic="topic_foo", max_messages=1) + wrapper = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz") + ) + wrapper2 = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz2") + ) - message = gapic_types.PubsubMessage(data=b"foobarbaz") - message2 = gapic_types.PubsubMessage(data=b"foobarbaz2") - - future = batch.publish(message) - future2 = batch.publish(message2) + future = batch.publish(wrapper) + future2 = batch.publish(wrapper2) assert future is not None assert future2 is None - assert len(batch.messages) == 1 + assert len(batch.message_wrappers) == 1 assert len(batch._futures) == 1 def test_publish_max_bytes_enforced(): batch = create_batch(topic="topic_foo", max_bytes=15) - message = gapic_types.PubsubMessage(data=b"foobarbaz") - message2 = gapic_types.PubsubMessage(data=b"foobarbaz2") + wrapper = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz") + ) + wrapper2 = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz2") + ) - future = batch.publish(message) - future2 = batch.publish(message2) + future = batch.publish(wrapper) + future2 = batch.publish(wrapper2) assert future is not None assert future2 is None - assert len(batch.messages) == 1 + assert len(batch.message_wrappers) == 1 assert len(batch._futures) == 1 def test_publish_exceed_max_messages(): max_messages = 4 batch = create_batch(max_messages=max_messages) - messages = ( - gapic_types.PubsubMessage(data=b"foobarbaz"), - gapic_types.PubsubMessage(data=b"spameggs"), - gapic_types.PubsubMessage(data=b"1335020400"), + wrappers = ( + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz"), + ), + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"spameggs"), + ), + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"1335020400"), + ), ) # Publish each of the messages, which should save them to the batch. with mock.patch.object(batch, "commit") as commit: - futures = [batch.publish(message) for message in messages] + futures = [batch.publish(wrapper) for wrapper in wrappers] assert batch._futures == futures assert len(futures) == max_messages - 1 @@ -420,7 +492,11 @@ def test_publish_exceed_max_messages(): # When a fourth message is published, commit should be called. # No future will be returned in this case. - future = batch.publish(gapic_types.PubsubMessage(data=b"last one")) + future = batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"last one") + ) + ) commit.assert_called_once_with() assert future is None @@ -443,28 +519,32 @@ def test_publish_single_message_size_exceeds_server_size_limit(): assert request_size == 1001 # sanity check, just above the (mocked) server limit with pytest.raises(exceptions.MessageTooLargeError): - batch.publish(big_message) + batch.publish(wrapper=PublishMessageWrapper(message=big_message)) @mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000) def test_publish_total_messages_size_exceeds_server_size_limit(): batch = create_batch(topic="topic_foo", max_messages=10, max_bytes=1500) - messages = ( - gapic_types.PubsubMessage(data=b"x" * 500), - gapic_types.PubsubMessage(data=b"x" * 600), + wrappers = ( + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"x" * 500), + ), + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"x" * 600), + ), ) # Sanity check - request size is still below BatchSettings.max_bytes, # but it exceeds the server-side size limit. request_size = gapic_types.PublishRequest( - topic="topic_foo", messages=messages + topic="topic_foo", messages=[wrapper.message for wrapper in wrappers] )._pb.ByteSize() assert 1000 < request_size < 1500 with mock.patch.object(batch, "commit") as fake_commit: - batch.publish(messages[0]) - batch.publish(messages[1]) + batch.publish(wrappers[0]) + batch.publish(wrappers[1]) # The server side limit should kick in and cause a commit. fake_commit.assert_called_once() @@ -472,21 +552,40 @@ def test_publish_total_messages_size_exceeds_server_size_limit(): def test_publish_dict(): batch = create_batch() - future = batch.publish({"data": b"foobarbaz", "attributes": {"spam": "eggs"}}) + future = batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage( + data=b"foobarbaz", + attributes={"spam": "eggs"}, + ), + ) + ) # There should be one message on the batch. - expected_message = gapic_types.PubsubMessage( - data=b"foobarbaz", attributes={"spam": "eggs"} + expected_message_wrapper = PublishMessageWrapper( + message=gapic_types.PubsubMessage( + data=b"foobarbaz", + attributes={"spam": "eggs"}, + ) ) - assert batch.messages == [expected_message] + + assert batch.message_wrappers == [expected_message_wrapper] assert batch._futures == [future] def test_cancel(): batch = create_batch() futures = ( - batch.publish({"data": b"This is my message."}), - batch.publish({"data": b"This is another message."}), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"This is my message."), + ), + ), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"This is another message."), + ), + ), ) batch.cancel(BatchCancellationReason.PRIOR_ORDERED_MESSAGE_FAILED) @@ -502,19 +601,29 @@ def test_do_not_commit_when_full_when_flag_is_off(): max_messages = 4 # Set commit_when_full flag to False batch = create_batch(max_messages=max_messages, commit_when_full=False) - messages = ( - gapic_types.PubsubMessage(data=b"foobarbaz"), - gapic_types.PubsubMessage(data=b"spameggs"), - gapic_types.PubsubMessage(data=b"1335020400"), + wrappers = ( + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz"), + ), + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"spameggs"), + ), + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"1335020400"), + ), ) with mock.patch.object(batch, "commit") as commit: # Publish 3 messages. - futures = [batch.publish(message) for message in messages] + futures = [batch.publish(wrapper) for wrapper in wrappers] assert len(futures) == 3 # When a fourth message is published, commit should not be called. - future = batch.publish(gapic_types.PubsubMessage(data=b"last one")) + future = batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"last one"), + ) + ) assert commit.call_count == 0 assert future is None @@ -534,8 +643,10 @@ def test_batch_done_callback_called_on_success(): batch = create_batch(batch_done_callback=batch_done_callback_tracker) # Ensure messages exist. - message = gapic_types.PubsubMessage(data=b"foobarbaz") - batch.publish(message) + wrapper = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz") + ) + batch.publish(wrapper) # One response for one published message. publish_response = gapic_types.PublishResponse(message_ids=["a"]) @@ -554,8 +665,10 @@ def test_batch_done_callback_called_on_publish_failure(): batch = create_batch(batch_done_callback=batch_done_callback_tracker) # Ensure messages exist. - message = gapic_types.PubsubMessage(data=b"foobarbaz") - batch.publish(message) + wrapper = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz") + ) + batch.publish(wrapper) # One response for one published message. publish_response = gapic_types.PublishResponse(message_ids=["a"]) @@ -580,8 +693,10 @@ def test_batch_done_callback_called_on_publish_response_invalid(): batch = create_batch(batch_done_callback=batch_done_callback_tracker) # Ensure messages exist. - message = gapic_types.PubsubMessage(data=b"foobarbaz") - batch.publish(message) + wrapper = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz"), + ) + batch.publish(wrapper) # No message ids returned in successful publish response -> invalid. publish_response = gapic_types.PublishResponse(message_ids=[]) diff --git a/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py b/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py index 7570c2970..4377d1447 100644 --- a/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py +++ b/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py @@ -27,12 +27,17 @@ from google.cloud.pubsub_v1 import publisher from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) _ORDERING_KEY = "ordering_key_1" def create_message(): - return gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + return PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + ) def create_client(): diff --git a/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py b/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py index 01d9d6ca4..739bae3bd 100644 --- a/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py +++ b/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py @@ -27,10 +27,15 @@ from google.cloud.pubsub_v1.publisher._batch import base from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) def create_message(): - return gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + return PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + ) def create_client(): @@ -140,7 +145,9 @@ def test_publish_after_batch_error(): batch = client._batch_class( client, "topic_name", types.BatchSettings(max_latency=float("inf")) ) - batch._messages.append(mock.Mock(name="message")) # Make batch truthy (non-empty). + batch._message_wrappers.append( + mock.Mock(name="message") + ) # Make batch truthy (non-empty). sequencer = unordered_sequencer.UnorderedSequencer(client, "topic_name") sequencer._set_batch(batch) diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 6ac64ac50..7eacfa2b1 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -47,6 +47,9 @@ from google.cloud.pubsub_v1.open_telemetry.context_propagation import ( OpenTelemetryContextSetter, ) +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) def _assert_retries_equal(retry, retry2): @@ -464,9 +467,17 @@ def test_publish(creds): # Check mock. batch.publish.assert_has_calls( [ - mock.call(gapic_types.PubsubMessage(data=b"spam")), mock.call( - gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"spam"), + ) + ), + mock.call( + PublishMessageWrapper( + message=gapic_types.PubsubMessage( + data=b"foo", attributes={"bar": "baz"} + ) + ) ), ] ) @@ -605,7 +616,9 @@ def test_publish_attrs_bytestring(creds): # The attributes should have been sent as text. batch.publish.assert_called_once_with( - gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + ) ) @@ -645,8 +658,9 @@ def test_publish_new_batch_needed(creds): commit_timeout=gapic_v1.method.DEFAULT, ) message_pb = gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) - batch1.publish.assert_called_once_with(message_pb) - batch2.publish.assert_called_once_with(message_pb) + wrapper = PublishMessageWrapper(message=message_pb) + batch1.publish.assert_called_once_with(wrapper) + batch2.publish.assert_called_once_with(wrapper) def test_publish_attrs_type_error(creds): @@ -669,9 +683,9 @@ def test_publish_custom_retry_overrides_configured_retry(creds): client.publish(topic, b"hello!", retry=mock.sentinel.custom_retry) fake_sequencer.publish.assert_called_once_with( - mock.ANY, retry=mock.sentinel.custom_retry, timeout=mock.ANY + wrapper=mock.ANY, retry=mock.sentinel.custom_retry, timeout=mock.ANY ) - message = fake_sequencer.publish.call_args.args[0] + message = fake_sequencer.publish.call_args.kwargs["wrapper"].message assert message.data == b"hello!" @@ -688,9 +702,9 @@ def test_publish_custom_timeout_overrides_configured_timeout(creds): client.publish(topic, b"hello!", timeout=mock.sentinel.custom_timeout) fake_sequencer.publish.assert_called_once_with( - mock.ANY, retry=mock.ANY, timeout=mock.sentinel.custom_timeout + wrapper=mock.ANY, retry=mock.ANY, timeout=mock.sentinel.custom_timeout ) - message = fake_sequencer.publish.call_args.args[0] + message = fake_sequencer.publish.call_args.kwargs["wrapper"].message assert message.data == b"hello!" @@ -850,10 +864,16 @@ def test_publish_with_ordering_key(creds): # Check mock. batch.publish.assert_has_calls( [ - mock.call(gapic_types.PubsubMessage(data=b"spam", ordering_key="k1")), mock.call( - gapic_types.PubsubMessage( - data=b"foo", attributes={"bar": "baz"}, ordering_key="k1" + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"spam", ordering_key="k1") + ), + ), + mock.call( + PublishMessageWrapper( + message=gapic_types.PubsubMessage( + data=b"foo", attributes={"bar": "baz"}, ordering_key="k1" + ) ) ), ] From 343ef17fe33566a3e7173d53635254596cc0d7d6 Mon Sep 17 00:00:00 2001 From: Mukund Date: Fri, 30 Aug 2024 21:17:08 +0000 Subject: [PATCH 11/19] Increase test coverage across files --- .../open_telemetry/publish_message_wrapper.py | 2 +- .../pubsub_v1/publisher/_batch/thread.py | 4 ++- .../publisher/test_publish_message_wrapper.py | 33 +++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) create mode 100644 tests/unit/pubsub_v1/publisher/test_publish_message_wrapper.py diff --git a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py index 6c0c1d37d..e02c5f15c 100644 --- a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py +++ b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py @@ -30,7 +30,7 @@ def message(self): def message(self, message: gapic_types.PubsubMessage): self._message = message - def __eq__(self, other): + def __eq__(self, other): # pragma: NO COVER """Used for pytest asserts to compare two PublishMessageWrapper objects with the same message.""" if isinstance(self, other.__class__): return self.message == other.message diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index 977f5765b..e444a07ad 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -355,7 +355,9 @@ def publish( """ # Coerce the type, just in case. - if not isinstance(wrapper.message, gapic_types.PubsubMessage): + if not isinstance( + wrapper.message, gapic_types.PubsubMessage + ): # pragma: NO COVER # For performance reasons, the message should be constructed by directly # using the raw protobuf class, and only then wrapping it into the # higher-level PubsubMessage class. diff --git a/tests/unit/pubsub_v1/publisher/test_publish_message_wrapper.py b/tests/unit/pubsub_v1/publisher/test_publish_message_wrapper.py new file mode 100644 index 000000000..35888805c --- /dev/null +++ b/tests/unit/pubsub_v1/publisher/test_publish_message_wrapper.py @@ -0,0 +1,33 @@ +# Copyright 2019, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) + + +def test_message_setter(): + wrapper = PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"foo")) + another_message = gapic_types.PubsubMessage(data=b"bar") + wrapper.message = another_message + + assert wrapper.message == another_message + + +def test_eq(): + wrapper1 = PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"foo")) + wrapper2 = PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"bar")) + + assert wrapper1.__eq__(wrapper2) is False From 02b93550d96286b95068a7b97705cbeb6a52bbd5 Mon Sep 17 00:00:00 2001 From: Mukund Date: Thu, 5 Sep 2024 22:56:44 +0000 Subject: [PATCH 12/19] Add Publish RPC Span * Also refactor pytest fixtures by abstracting / reusing OpenTelemetry setup across tests --- .../open_telemetry/publish_message_wrapper.py | 4 + .../pubsub_v1/publisher/_batch/thread.py | 72 ++++++ google/cloud/pubsub_v1/publisher/client.py | 4 + tests/unit/pubsub_v1/conftest.py | 20 +- .../pubsub_v1/publisher/batch/test_thread.py | 227 +++++++++++++++++- .../publisher/test_publisher_client.py | 40 +-- 6 files changed, 325 insertions(+), 42 deletions(-) diff --git a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py index e02c5f15c..89151bd19 100644 --- a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py +++ b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py @@ -26,6 +26,10 @@ def __init__(self, message: gapic_types.PubsubMessage): def message(self): return self._message + @property + def create_span(self): + return self._create_span + @message.setter def message(self, message: gapic_types.PubsubMessage): self._message = message diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index e444a07ad..f022354df 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -19,6 +19,9 @@ import time import typing from typing import Any, Callable, List, Optional, Sequence +from datetime import datetime + +from opentelemetry import trace import google.api_core.exceptions from google.api_core import gapic_v1 @@ -88,6 +91,8 @@ class Batch(base.Batch): timeout is used. """ + _OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1.publisher" + def __init__( self, client: "PublisherClient", @@ -122,6 +127,10 @@ def __init__( self._commit_retry = commit_retry self._commit_timeout = commit_timeout + # Publish RPC Span that will be set by method `_create_publish_rpc_span` + # if Open Telemetry is enabled. + self._rpc_span: trace.Span = None + @staticmethod def make_lock() -> threading.Lock: """Return a threading lock. @@ -229,6 +238,37 @@ def _start_commit_thread(self) -> None: ) commit_thread.start() + def _create_publish_rpc_span(self): + tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) + links = [] + + for wrapper in self._message_wrappers: + span = wrapper.create_span + # Add links only for sampled spans. + if span.is_recording(): + links.append(trace.Link(span.get_span_context())) + + with tracer.start_as_current_span( + name=f"{self._topic} publish", + attributes={ + "messaging.system": "com.google.cloud.pubsub.v1", + "messaging.destination.name": self._topic, + "gcp.project_id": self._topic.split("/")[1], + "messaging.batch.message_count": len(self._message_wrappers), + "messaging.operation": "publish", + "code.function": "_commit", + }, + links=links, + kind=trace.SpanKind.CLIENT, + end_on_exit=False, + ) as rpc_span: + ctx = rpc_span.get_span_context() + for wrapper in self._message_wrappers: + span = wrapper.create_span + if span.is_recording(): + span.add_link(ctx) + self._rpc_span = rpc_span + def _commit(self) -> None: """Actually publish all of the messages on the active batch. @@ -273,6 +313,9 @@ def _commit(self) -> None: batch_transport_succeeded = True try: + if self._client.open_telemetry_enabled: + self._create_publish_rpc_span() + # Performs retries for errors defined by the retry configuration. response = self._client._gapic_publish( topic=self._topic, @@ -280,11 +323,40 @@ def _commit(self) -> None: retry=self._commit_retry, timeout=self._commit_timeout, ) + + if self._client.open_telemetry_enabled: + self._rpc_span.end() + end_time = str(datetime.now()) + for message_id, wrapper in zip( + response.message_ids, self._message_wrappers + ): + span = wrapper.create_span + span.add_event( + name="publish end", + attributes={ + "timestamp": end_time, + }, + ) + span.set_attribute(key="messaging.message.id", value=message_id) + wrapper.end_create_span() except google.api_core.exceptions.GoogleAPIError as exc: # We failed to publish, even after retries, so set the exception on # all futures and exit. self._status = base.BatchStatus.ERROR + if self._client.open_telemetry_enabled: + if self._rpc_span: + self._rpc_span.record_exception( + exception=exc, + ) + self._rpc_span.set_status( + trace.Status(status_code=trace.StatusCode.ERROR) + ) + self._rpc_span.end() + + for wrapper in self._message_wrappers: + wrapper.end_create_span(exc=exc) + batch_transport_succeeded = False if self._batch_done_callback is not None: # Failed to publish batch. diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index f848bc9f2..996087d93 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -229,6 +229,10 @@ def api(self): warnings.warn(msg, category=DeprecationWarning) return super() + @property + def open_telemetry_enabled(self) -> bool: + return self._open_telemetry_enabled + def _get_or_create_sequencer(self, topic: str, ordering_key: str) -> SequencerType: """Get an existing sequencer or create a new one given the (topic, ordering_key) pair. diff --git a/tests/unit/pubsub_v1/conftest.py b/tests/unit/pubsub_v1/conftest.py index 897301343..f3cd421e3 100644 --- a/tests/unit/pubsub_v1/conftest.py +++ b/tests/unit/pubsub_v1/conftest.py @@ -13,6 +13,9 @@ # limitations under the License. from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry import trace import google.auth.credentials import pytest @@ -28,8 +31,15 @@ def creds(): @pytest.fixture(scope="session", autouse=True) -def provider(): - """ - Provide an Open Telemetry Tracer that can be re-used across tests. - """ - yield TracerProvider() +def set_trace_provider(): + provider = TracerProvider() + trace.set_tracer_provider(provider) + + +@pytest.fixture(scope="function") +def span_exporter(): + exporter = InMemorySpanExporter() + processor = SimpleSpanProcessor(exporter) + provider = trace.get_tracer_provider() + provider.add_span_processor(processor) + yield exporter diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 958419788..88f5b9e65 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -25,6 +25,8 @@ import pytest +from opentelemetry import trace + import google.api_core.exceptions from google.api_core import gapic_v1 from google.auth import credentials @@ -41,8 +43,13 @@ ) -def create_client(): - return publisher.Client(credentials=credentials.AnonymousCredentials()) +def create_client(enable_open_telemetry=False): + return publisher.Client( + credentials=credentials.AnonymousCredentials(), + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=enable_open_telemetry, + ), + ) def create_batch( @@ -51,7 +58,8 @@ def create_batch( commit_when_full=True, commit_retry=gapic_v1.method.DEFAULT, commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, - **batch_settings + enable_open_telemetry: bool = False, + **batch_settings, ): """Return a batch object suitable for testing. @@ -65,13 +73,14 @@ def create_batch( for the batch commit call. commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`): The timeout to apply to the batch commit call. + enable_open_telemetry (bool): Whether to enable OpenTelemetry. batch_settings (Mapping[str, str]): Arguments passed on to the :class:``~.pubsub_v1.types.BatchSettings`` constructor. Returns: ~.pubsub_v1.publisher.batch.thread.Batch: A batch object. """ - client = create_client() + client = create_client(enable_open_telemetry=enable_open_telemetry) settings = types.BatchSettings(**batch_settings) return Batch( client, @@ -708,3 +717,213 @@ def test_batch_done_callback_called_on_publish_response_invalid(): assert batch_done_callback_tracker.called assert not batch_done_callback_tracker.success + + +# Refer https://opentelemetry.io/docs/languages/python/#version-support +@pytest.mark.skipif( + sys.version_info < (3, 8), reason="Open Telemetry requires python3.8 or higher" +) +def test_open_telemetry_commit_publish_rpc_span_none(span_exporter): + """ + Test coverage for scenario where OpenTelemetry is enabled, publish RPC + span creation fails(unexpected) and hence batch._rpc_span is None. + Required for code coverage. + """ + TOPIC = "projects/projectID/topics/topicID" + batch = create_batch(topic=TOPIC, enable_open_telemetry=True) + + message = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo"), + ) + message.start_create_span(topic=TOPIC, ordering_key=None) + batch.publish(message) + + # Mock publish error. + error = google.api_core.exceptions.InternalServerError("error") + + with mock.patch.object( + type(batch), + "_create_publish_rpc_span", + side_effect=error, + ): + batch._commit() + + assert batch._rpc_span is None + spans = span_exporter.get_finished_spans() + + # Create span (mock error thrown when publish RPC span creation is attempted) + assert len(spans) == 1 + + publish_create_span = spans[0] + assert publish_create_span.status.status_code == trace.status.StatusCode.ERROR + assert publish_create_span.end_time is not None + + assert publish_create_span.name == f"{TOPIC} create" + # Publish start event and exception event should be present in publish + # create span. + assert len(publish_create_span.events) == 2 + assert publish_create_span.events[0].name == "publish start" + assert publish_create_span.events[1].name == "exception" + + +# Refer https://opentelemetry.io/docs/languages/python/#version-support +@pytest.mark.skipif( + sys.version_info < (3, 8), reason="Open Telemetry requires python3.8 or higher" +) +def test_open_telemetry_commit_publish_rpc_exception(span_exporter): + TOPIC = "projects/projectID/topics/topicID" + batch = create_batch(topic=TOPIC, enable_open_telemetry=True) + + message = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo"), + ) + message.start_create_span(topic=TOPIC, ordering_key=None) + batch.publish(message) + + # Mock publish error. + error = google.api_core.exceptions.InternalServerError("error") + + with mock.patch.object( + type(batch.client), + "_gapic_publish", + side_effect=error, + ): + batch._commit() + + spans = span_exporter.get_finished_spans() + # Span 1: Publish RPC span + # Span 2: Create span. + assert len(spans) == 2 + + # Verify both spans recorded error and have ended. + for span in spans: + assert span.status.status_code == trace.status.StatusCode.ERROR + assert span.end_time is not None + + publish_rpc_span = spans[0] + assert publish_rpc_span.name == f"{TOPIC} publish" + assert len(publish_rpc_span.events) == 1 + assert publish_rpc_span.events[0].name == "exception" + + publish_create_span = spans[1] + assert publish_create_span.name == f"{TOPIC} create" + # Publish start event and exception event should be present in publish + # create span. + assert len(publish_create_span.events) == 2 + assert publish_create_span.events[0].name == "publish start" + assert publish_create_span.events[1].name == "exception" + + +# Refer https://opentelemetry.io/docs/languages/python/#version-support +@pytest.mark.skipif( + sys.version_info < (3, 8), reason="Open Telemetry requires python3.8 or higher" +) +def test_opentelemetry_commit_sampling(span_exporter): + TOPIC = "projects/projectID/topics/topic" + batch = create_batch( + topic=TOPIC, + enable_open_telemetry=True, + ) + + message = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo"), + ) + message.start_create_span(topic=TOPIC, ordering_key=None) + message.create_span.is_recording = mock.Mock(return_value=False) + batch.publish(message) + + publish_response = gapic_types.PublishResponse(message_ids=["a"]) + with mock.patch.object( + type(batch.client), "_gapic_publish", return_value=publish_response + ): + batch._commit() + + spans = span_exporter.get_finished_spans() + + # Span 1: Publish RPC span + # Span 2: Create span + assert len(spans) == 2 + + publish_rpc_span, create_span = spans + + # Verify no links added when the spans are not sampled. + assert len(publish_rpc_span.links) == 0 + assert len(create_span.links) == 0 + + +@pytest.mark.skipif( + sys.version_info < (3, 8), reason="Open Telemetry requires python3.8 or higher" +) +def test_opentelemetry_commit(span_exporter): + TOPIC = "projects/projectID/topics/topic" + batch = create_batch( + topic=TOPIC, + enable_open_telemetry=True, + ) + + msg1 = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo"), + ) + msg2 = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"bar"), + ) + msg1.start_create_span(topic=TOPIC, ordering_key=None) + msg2.start_create_span(topic=TOPIC, ordering_key=None) + + # Add both messages to the batch. + batch.publish(msg1) + batch.publish(msg2) + + publish_response = gapic_types.PublishResponse(message_ids=["a", "b"]) + with mock.patch.object( + type(batch.client), "_gapic_publish", return_value=publish_response + ): + batch._commit() + + spans = span_exporter.get_finished_spans() + + # Span 1: publish RPC span - closed after publish RPC success. + # Span 2: publisher create span of message 1 - closed after publish RPC success. + # Span 3: publisher create span of message 2 - closed after publish RPC success. + assert len(spans) == 3 + publish_rpc_span, create_span1, create_span2 = spans + + # Verify publish RPC span + assert publish_rpc_span.name == f"{TOPIC} publish" + assert publish_rpc_span.kind == trace.SpanKind.CLIENT + assert publish_rpc_span.end_time is not None + attributes = publish_rpc_span.attributes + assert attributes["messaging.system"] == "com.google.cloud.pubsub.v1" + assert attributes["messaging.destination.name"] == TOPIC + assert attributes["gcp.project_id"] == "projectID" + assert attributes["messaging.batch.message_count"] == 2 + assert attributes["messaging.operation"] == "publish" + assert attributes["code.function"] == "_commit" + assert publish_rpc_span.parent is None + # Verify the links correspond to the spans of the published messages. + assert len(publish_rpc_span.links) == 2 + assert publish_rpc_span.links[0].context == create_span1.context + assert publish_rpc_span.links[1].context == create_span2.context + + # Verify spans of the published messages. + assert create_span1.name == f"{TOPIC} create" + assert create_span2.name == f"{TOPIC} create" + + # Verify the publish create spans have been closed after publish success. + assert create_span1.end_time is not None + assert create_span2.end_time is not None + + # Verify message IDs returned from gapic publish are added as attributes + # to the publisher create spans of the messages. + assert "messaging.message.id" in create_span1.attributes + assert create_span1.attributes["messaging.message.id"] == "a" + assert "messaging.message.id" in create_span2.attributes + assert create_span2.attributes["messaging.message.id"] == "b" + + # Verify publish end event added to the span + assert len(create_span1.events) == 2 + assert len(create_span2.events) == 2 + assert create_span1.events[0].name == "publish start" + assert create_span1.events[1].name == "publish end" + assert create_span2.events[0].name == "publish start" + assert create_span2.events[1].name == "publish end" diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 7eacfa2b1..d8999bb33 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -28,8 +28,6 @@ import pytest import time -from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter -from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry import trace from google.api_core import gapic_v1 @@ -181,7 +179,7 @@ def test_opentelemetry_context_setter(): sys.version_info < (3, 8), reason="Open Telemetry not supported below Python version 3.8", ) -def test_opentelemetry_context_propagation(creds, provider): +def test_opentelemetry_context_propagation(creds, span_exporter): TOPIC = "projects/projectID/topics/topicID" client = publisher.Client( credentials=creds, @@ -190,12 +188,6 @@ def test_opentelemetry_context_propagation(creds, provider): ), ) - # Set up Open Telemetry tracing. - memory_exporter = InMemorySpanExporter() - processor = SimpleSpanProcessor(memory_exporter) - provider.add_span_processor(processor) - trace.set_tracer_provider(provider) - message_mock = mock.Mock(spec=publisher.flow_controller.FlowController.add) client._flow_controller.add = message_mock client.publish(TOPIC, b"data") @@ -218,7 +210,7 @@ def test_opentelemetry_context_propagation(creds, provider): ], ) def test_opentelemetry_publisher_batching_exception( - creds, provider, enable_open_telemetry + creds, span_exporter, enable_open_telemetry ): client = publisher.Client( credentials=creds, @@ -227,12 +219,6 @@ def test_opentelemetry_publisher_batching_exception( ), ) - # Setup Open Telemetry tracing - memory_exporter = InMemorySpanExporter() - processor = SimpleSpanProcessor(memory_exporter) - provider.add_span_processor(processor) - trace.set_tracer_provider(provider) - # Throw an exception when sequencer.publish() is called sequencer = mock.Mock(spec=ordered_sequencer.OrderedSequencer) sequencer.publish = mock.Mock(side_effect=RuntimeError("some error")) @@ -242,7 +228,7 @@ def test_opentelemetry_publisher_batching_exception( with pytest.raises(RuntimeError): client.publish(TOPIC, b"message") - spans = memory_exporter.get_finished_spans() + spans = span_exporter.get_finished_spans() if enable_open_telemetry: # Span 1: Publisher Flow Control span @@ -266,7 +252,7 @@ def test_opentelemetry_publisher_batching_exception( sys.version_info < (3, 8), reason="Open Telemetry not supported below Python version 3.8", ) -def test_opentelemetry_flow_control_exception(creds, provider): +def test_opentelemetry_flow_control_exception(creds, span_exporter): publisher_options = types.PublisherOptions( flow_control=types.PublishFlowControl( message_limit=10, @@ -281,12 +267,6 @@ def test_opentelemetry_flow_control_exception(creds, provider): topic = "topic/path" client._set_batch(topic, mock_batch) - # Trace Provider setup. - memory_exporter = InMemorySpanExporter() - processor = SimpleSpanProcessor(memory_exporter) - provider.add_span_processor(processor) - trace.set_tracer_provider(provider) - future1 = client.publish(topic, b"a" * 60) future2 = client.publish(topic, b"b" * 100) @@ -294,7 +274,7 @@ def test_opentelemetry_flow_control_exception(creds, provider): with pytest.raises(exceptions.FlowControlLimitError): future2.result() - spans = memory_exporter.get_finished_spans() + spans = span_exporter.get_finished_spans() # Span 1 = Publisher Flow Control Span of first publish # Span 2 = Publisher Batching Span of first publish # Span 2 = Publisher Flow Control Span of second publish(raises FlowControlLimitError) @@ -318,7 +298,7 @@ def test_opentelemetry_flow_control_exception(creds, provider): sys.version_info < (3, 8), reason="Open Telemetry not supported below Python version 3.8", ) -def test_opentelemetry_publish(creds, provider): +def test_opentelemetry_publish(creds, span_exporter): TOPIC = "projects/projectID/topics/topicID" client = publisher.Client( credentials=creds, @@ -327,14 +307,8 @@ def test_opentelemetry_publish(creds, provider): ), ) - # Trace Provider setup. - memory_exporter = InMemorySpanExporter() - processor = SimpleSpanProcessor(memory_exporter) - provider.add_span_processor(processor) - trace.set_tracer_provider(provider) - client.publish(TOPIC, b"message") - spans = memory_exporter.get_finished_spans() + spans = span_exporter.get_finished_spans() # Span 1: Publisher Flow control span # Span 2: Publisher Batching span From b519fb5884b78287782d5deab4ee98e897d9d46b Mon Sep 17 00:00:00 2001 From: Mukund Date: Fri, 6 Sep 2024 20:19:05 +0000 Subject: [PATCH 13/19] Fix mypy type checking errors --- .../open_telemetry/publish_message_wrapper.py | 17 ++++++++++------- .../cloud/pubsub_v1/publisher/_batch/thread.py | 3 ++- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py index 89151bd19..1da83d084 100644 --- a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py +++ b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py @@ -1,6 +1,7 @@ import sys from datetime import datetime import warnings +from typing import Optional from google.pubsub_v1 import types as gapic_types from opentelemetry import trace @@ -26,14 +27,14 @@ def __init__(self, message: gapic_types.PubsubMessage): def message(self): return self._message + @message.setter # type: ignore[no-redef] # resetting message value is intentional here + def message(self, message: gapic_types.PubsubMessage): + self._message = message + @property def create_span(self): return self._create_span - @message.setter - def message(self, message: gapic_types.PubsubMessage): - self._message = message - def __eq__(self, other): # pragma: NO COVER """Used for pytest asserts to compare two PublishMessageWrapper objects with the same message.""" if isinstance(self, other.__class__): @@ -68,7 +69,7 @@ def start_create_span(self, topic: str, ordering_key: str) -> None: setter=OpenTelemetryContextSetter(), ) - def end_create_span(self, exc: BaseException = None) -> None: + def end_create_span(self, exc: Optional[BaseException] = None) -> None: if self._create_span is None: # pragma: NO COVER warnings.warn( message="publish create span is None. Hence, not ending it", @@ -98,7 +99,9 @@ def start_publisher_flow_control_span(self) -> None: ) as flow_control_span: self._flow_control_span: trace.Span = flow_control_span - def end_publisher_flow_control_span(self, exc: BaseException = None) -> None: + def end_publisher_flow_control_span( + self, exc: Optional[BaseException] = None + ) -> None: if self._flow_control_span is None: # pragma: NO COVER warnings.warn( message="publish flow control span is None. Hence, not ending it", @@ -128,7 +131,7 @@ def start_publisher_batching_span(self) -> None: ) as batching_span: self._batching_span = batching_span - def end_publisher_batching_span(self, exc: BaseException = None) -> None: + def end_publisher_batching_span(self, exc: Optional[BaseException] = None) -> None: if self._batching_span is None: # pragma: NO COVER warnings.warn( message="publisher batching span is None. Hence, not ending it", diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index f022354df..c13d87b1b 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -129,7 +129,7 @@ def __init__( # Publish RPC Span that will be set by method `_create_publish_rpc_span` # if Open Telemetry is enabled. - self._rpc_span: trace.Span = None + self._rpc_span: Optional[trace.Span] = None @staticmethod def make_lock() -> threading.Lock: @@ -325,6 +325,7 @@ def _commit(self) -> None: ) if self._client.open_telemetry_enabled: + assert self._rpc_span is not None self._rpc_span.end() end_time = str(datetime.now()) for message_id, wrapper in zip( From f6c3c8fe9b388f5628e14b1ed411924e86f71e3e Mon Sep 17 00:00:00 2001 From: Mukund Date: Fri, 6 Sep 2024 23:00:01 +0000 Subject: [PATCH 14/19] Update messaging.system attribute of publish RPC span --- google/cloud/pubsub_v1/publisher/_batch/thread.py | 2 +- tests/unit/pubsub_v1/publisher/batch/test_thread.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index c13d87b1b..1a21fbed3 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -251,7 +251,7 @@ def _create_publish_rpc_span(self): with tracer.start_as_current_span( name=f"{self._topic} publish", attributes={ - "messaging.system": "com.google.cloud.pubsub.v1", + "messaging.system": "gcp_pubsub", "messaging.destination.name": self._topic, "gcp.project_id": self._topic.split("/")[1], "messaging.batch.message_count": len(self._message_wrappers), diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 88f5b9e65..409a3eff7 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -893,7 +893,7 @@ def test_opentelemetry_commit(span_exporter): assert publish_rpc_span.kind == trace.SpanKind.CLIENT assert publish_rpc_span.end_time is not None attributes = publish_rpc_span.attributes - assert attributes["messaging.system"] == "com.google.cloud.pubsub.v1" + assert attributes["messaging.system"] == "gcp_pubsub" assert attributes["messaging.destination.name"] == TOPIC assert attributes["gcp.project_id"] == "projectID" assert attributes["messaging.batch.message_count"] == 2 From cf8d039e26ae51c6dd5e3a50b0e22be82e486273 Mon Sep 17 00:00:00 2001 From: Mukund Date: Fri, 6 Sep 2024 23:05:58 +0000 Subject: [PATCH 15/19] Update publish RPC sampling test to include both sampled and unsampled spans --- .../pubsub_v1/publisher/batch/test_thread.py | 36 ++++++++++++------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 409a3eff7..246c0089a 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -825,14 +825,21 @@ def test_opentelemetry_commit_sampling(span_exporter): enable_open_telemetry=True, ) - message = PublishMessageWrapper( + message1 = PublishMessageWrapper( message=gapic_types.PubsubMessage(data=b"foo"), ) - message.start_create_span(topic=TOPIC, ordering_key=None) - message.create_span.is_recording = mock.Mock(return_value=False) - batch.publish(message) + message1.start_create_span(topic=TOPIC, ordering_key=None) + message1.create_span.is_recording = mock.Mock(return_value=False) - publish_response = gapic_types.PublishResponse(message_ids=["a"]) + message2 = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"bar"), + ) + message2.start_create_span(topic=TOPIC, ordering_key=None) + + batch.publish(message1) + batch.publish(message2) + + publish_response = gapic_types.PublishResponse(message_ids=["a", "b"]) with mock.patch.object( type(batch.client), "_gapic_publish", return_value=publish_response ): @@ -840,15 +847,20 @@ def test_opentelemetry_commit_sampling(span_exporter): spans = span_exporter.get_finished_spans() - # Span 1: Publish RPC span - # Span 2: Create span - assert len(spans) == 2 + # Span 1: Publish RPC span of both messages + # Span 2: Create span of message 1 + # Span 3: Create span of message 2 + assert len(spans) == 3 - publish_rpc_span, create_span = spans + publish_rpc_span, create_span1, create_span2 = spans - # Verify no links added when the spans are not sampled. - assert len(publish_rpc_span.links) == 0 - assert len(create_span.links) == 0 + # Verify publish RPC span has only one link corresponding to + # message 2 which is included in the sample. + assert len(publish_rpc_span.links) == 1 + assert len(create_span1.links) == 0 + assert len(create_span2.links) == 1 + assert publish_rpc_span.links[0].context == create_span2.context + assert create_span2.links[0].context == publish_rpc_span.context @pytest.mark.skipif( From 3519fbd51279ff9274e96565ce78a6ade67602f0 Mon Sep 17 00:00:00 2001 From: Mukund Date: Fri, 6 Sep 2024 23:08:12 +0000 Subject: [PATCH 16/19] Fix test comments --- tests/unit/pubsub_v1/publisher/test_publisher_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index d8999bb33..1b3962128 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -277,8 +277,8 @@ def test_opentelemetry_flow_control_exception(creds, span_exporter): spans = span_exporter.get_finished_spans() # Span 1 = Publisher Flow Control Span of first publish # Span 2 = Publisher Batching Span of first publish - # Span 2 = Publisher Flow Control Span of second publish(raises FlowControlLimitError) - # Span 3 = Publish Create Span of second publish(raises FlowControlLimitError) + # Span 3 = Publisher Flow Control Span of second publish(raises FlowControlLimitError) + # Span 4 = Publish Create Span of second publish(raises FlowControlLimitError) assert len(spans) == 4 failed_flow_control_span = spans[2] From 3bc3a566c670904956e42daec6fcc31d25094b7e Mon Sep 17 00:00:00 2001 From: Mukund Date: Fri, 6 Sep 2024 23:24:44 +0000 Subject: [PATCH 17/19] Update publish RPC and create span to use shorter format of topic instead of its fully qualified name --- .../open_telemetry/publish_message_wrapper.py | 5 +++-- google/cloud/pubsub_v1/publisher/_batch/thread.py | 6 +++--- .../unit/pubsub_v1/publisher/batch/test_thread.py | 14 +++++++------- .../pubsub_v1/publisher/test_publisher_client.py | 2 +- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py index 1da83d084..7e386254b 100644 --- a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py +++ b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py @@ -43,11 +43,12 @@ def __eq__(self, other): # pragma: NO COVER def start_create_span(self, topic: str, ordering_key: str) -> None: tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) + topic_short_name = topic.split("/")[3] with tracer.start_as_current_span( - name=f"{topic} create", + name=f"{topic_short_name} create", attributes={ "messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM, - "messaging.destination.name": topic, + "messaging.destination.name": topic_short_name, "code.function": "google.cloud.pubsub.PublisherClient.publish", "messaging.gcp_pubsub.message.ordering_key": ordering_key, "messaging.operation": "create", diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index 1a21fbed3..20072330d 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -247,12 +247,12 @@ def _create_publish_rpc_span(self): # Add links only for sampled spans. if span.is_recording(): links.append(trace.Link(span.get_span_context())) - + topic_short_name = self._topic.split("/")[3] with tracer.start_as_current_span( - name=f"{self._topic} publish", + name=f"{topic_short_name} publish", attributes={ "messaging.system": "gcp_pubsub", - "messaging.destination.name": self._topic, + "messaging.destination.name": topic_short_name, "gcp.project_id": self._topic.split("/")[1], "messaging.batch.message_count": len(self._message_wrappers), "messaging.operation": "publish", diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 246c0089a..43bb38967 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -758,7 +758,7 @@ def test_open_telemetry_commit_publish_rpc_span_none(span_exporter): assert publish_create_span.status.status_code == trace.status.StatusCode.ERROR assert publish_create_span.end_time is not None - assert publish_create_span.name == f"{TOPIC} create" + assert publish_create_span.name == "topicID create" # Publish start event and exception event should be present in publish # create span. assert len(publish_create_span.events) == 2 @@ -801,12 +801,12 @@ def test_open_telemetry_commit_publish_rpc_exception(span_exporter): assert span.end_time is not None publish_rpc_span = spans[0] - assert publish_rpc_span.name == f"{TOPIC} publish" + assert publish_rpc_span.name == "topicID publish" assert len(publish_rpc_span.events) == 1 assert publish_rpc_span.events[0].name == "exception" publish_create_span = spans[1] - assert publish_create_span.name == f"{TOPIC} create" + assert publish_create_span.name == "topicID create" # Publish start event and exception event should be present in publish # create span. assert len(publish_create_span.events) == 2 @@ -901,12 +901,12 @@ def test_opentelemetry_commit(span_exporter): publish_rpc_span, create_span1, create_span2 = spans # Verify publish RPC span - assert publish_rpc_span.name == f"{TOPIC} publish" + assert publish_rpc_span.name == "topic publish" assert publish_rpc_span.kind == trace.SpanKind.CLIENT assert publish_rpc_span.end_time is not None attributes = publish_rpc_span.attributes assert attributes["messaging.system"] == "gcp_pubsub" - assert attributes["messaging.destination.name"] == TOPIC + assert attributes["messaging.destination.name"] == "topic" assert attributes["gcp.project_id"] == "projectID" assert attributes["messaging.batch.message_count"] == 2 assert attributes["messaging.operation"] == "publish" @@ -918,8 +918,8 @@ def test_opentelemetry_commit(span_exporter): assert publish_rpc_span.links[1].context == create_span2.context # Verify spans of the published messages. - assert create_span1.name == f"{TOPIC} create" - assert create_span2.name == f"{TOPIC} create" + assert create_span1.name == "topic create" + assert create_span2.name == "topic create" # Verify the publish create spans have been closed after publish success. assert create_span1.end_time is not None diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 1b3962128..983947f15 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -264,7 +264,7 @@ def test_opentelemetry_flow_control_exception(creds, span_exporter): client = publisher.Client(credentials=creds, publisher_options=publisher_options) mock_batch = mock.Mock(spec=client._batch_class) - topic = "topic/path" + topic = "projects/projectID/topics/topicID" client._set_batch(topic, mock_batch) future1 = client.publish(topic, b"a" * 60) From c5982ca7325d3d6459d7dff2c7268a9981bf3857 Mon Sep 17 00:00:00 2001 From: mukund-ananthu Date: Sat, 7 Sep 2024 10:18:35 +0000 Subject: [PATCH 18/19] Code clean up * Fix import ordering * Add typing wherever missing * Update unit tests with additional tests --- .../open_telemetry/context_propagation.py | 17 ++++++- .../open_telemetry/publish_message_wrapper.py | 21 ++++++-- .../pubsub_v1/publisher/_batch/thread.py | 11 ++-- google/cloud/pubsub_v1/publisher/client.py | 4 +- tests/unit/pubsub_v1/conftest.py | 4 +- .../pubsub_v1/publisher/batch/test_thread.py | 30 ++++++++--- .../publisher/test_publish_message_wrapper.py | 2 + .../publisher/test_publisher_client.py | 51 +++++++++++++------ 8 files changed, 105 insertions(+), 35 deletions(-) diff --git a/google/cloud/pubsub_v1/open_telemetry/context_propagation.py b/google/cloud/pubsub_v1/open_telemetry/context_propagation.py index d2bfbee16..59881f769 100644 --- a/google/cloud/pubsub_v1/open_telemetry/context_propagation.py +++ b/google/cloud/pubsub_v1/open_telemetry/context_propagation.py @@ -1,4 +1,19 @@ +# Copyright 2017, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from opentelemetry.propagators.textmap import Setter + from google.pubsub_v1 import types as gapic_types @@ -7,7 +22,7 @@ class OpenTelemetryContextSetter(Setter): Used by Open Telemetry for context propagation. """ - def set(self, carrier: gapic_types.PubsubMessage, key: str, value: str): + def set(self, carrier: gapic_types.PubsubMessage, key: str, value: str) -> None: """ Injects trace context into Pub/Sub message attributes with "googclient_" prefix. diff --git a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py index 7e386254b..7c609fbad 100644 --- a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py +++ b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py @@ -1,12 +1,27 @@ +# Copyright 2017, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import sys from datetime import datetime import warnings from typing import Optional -from google.pubsub_v1 import types as gapic_types from opentelemetry import trace from opentelemetry.trace.propagation import set_span_in_context from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + +from google.pubsub_v1 import types as gapic_types from google.cloud.pubsub_v1.open_telemetry.context_propagation import ( OpenTelemetryContextSetter, ) @@ -36,7 +51,7 @@ def create_span(self): return self._create_span def __eq__(self, other): # pragma: NO COVER - """Used for pytest asserts to compare two PublishMessageWrapper objects with the same message.""" + """Used for pytest asserts to compare two PublishMessageWrapper objects with the same message""" if isinstance(self, other.__class__): return self.message == other.message return False @@ -49,7 +64,7 @@ def start_create_span(self, topic: str, ordering_key: str) -> None: attributes={ "messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM, "messaging.destination.name": topic_short_name, - "code.function": "google.cloud.pubsub.PublisherClient.publish", + "code.function": "publish", "messaging.gcp_pubsub.message.ordering_key": ordering_key, "messaging.operation": "create", "gcp.project_id": topic.split("/")[1], diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index 20072330d..91e17de2c 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -22,9 +22,9 @@ from datetime import datetime from opentelemetry import trace - import google.api_core.exceptions from google.api_core import gapic_v1 + from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher import futures from google.cloud.pubsub_v1.publisher._batch import base @@ -92,6 +92,7 @@ class Batch(base.Batch): """ _OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1.publisher" + _OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub" def __init__( self, @@ -127,7 +128,7 @@ def __init__( self._commit_retry = commit_retry self._commit_timeout = commit_timeout - # Publish RPC Span that will be set by method `_create_publish_rpc_span` + # Publish RPC Span that will be set by method `_start_publish_rpc_span` # if Open Telemetry is enabled. self._rpc_span: Optional[trace.Span] = None @@ -238,7 +239,7 @@ def _start_commit_thread(self) -> None: ) commit_thread.start() - def _create_publish_rpc_span(self): + def _start_publish_rpc_span(self) -> None: tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) links = [] @@ -251,7 +252,7 @@ def _create_publish_rpc_span(self): with tracer.start_as_current_span( name=f"{topic_short_name} publish", attributes={ - "messaging.system": "gcp_pubsub", + "messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM, "messaging.destination.name": topic_short_name, "gcp.project_id": self._topic.split("/")[1], "messaging.batch.message_count": len(self._message_wrappers), @@ -314,7 +315,7 @@ def _commit(self) -> None: batch_transport_succeeded = True try: if self._client.open_telemetry_enabled: - self._create_publish_rpc_span() + self._start_publish_rpc_span() # Performs retries for errors defined by the retry configuration. response = self._client._gapic_publish( diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 996087d93..481a8472d 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -481,8 +481,8 @@ def on_publish_done(future): except BaseException as be: # Exceptions can be thrown when attempting to add messages to # the batch. If they're thrown, record them in publisher - # batching and create span, end them and bubble - # the exception up. + # batching and create span, end the spans and bubble the + # exception up. if self._open_telemetry_enabled: if wrapper: wrapper.end_publisher_batching_span(be) diff --git a/tests/unit/pubsub_v1/conftest.py b/tests/unit/pubsub_v1/conftest.py index f3cd421e3..b44e2fd84 100644 --- a/tests/unit/pubsub_v1/conftest.py +++ b/tests/unit/pubsub_v1/conftest.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest + from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry import trace - import google.auth.credentials -import pytest @pytest.fixture diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 43bb38967..b058c8933 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -43,7 +43,7 @@ ) -def create_client(enable_open_telemetry=False): +def create_client(enable_open_telemetry: bool = False): return publisher.Client( credentials=credentials.AnonymousCredentials(), publisher_options=types.PublisherOptions( @@ -725,9 +725,9 @@ def test_batch_done_callback_called_on_publish_response_invalid(): ) def test_open_telemetry_commit_publish_rpc_span_none(span_exporter): """ - Test coverage for scenario where OpenTelemetry is enabled, publish RPC - span creation fails(unexpected) and hence batch._rpc_span is None. - Required for code coverage. + Test scenario where OpenTelemetry is enabled, publish RPC + span creation fails(unexpected) and hence batch._rpc_span is None when + attempting to close it. Required for code coverage. """ TOPIC = "projects/projectID/topics/topicID" batch = create_batch(topic=TOPIC, enable_open_telemetry=True) @@ -738,12 +738,12 @@ def test_open_telemetry_commit_publish_rpc_span_none(span_exporter): message.start_create_span(topic=TOPIC, ordering_key=None) batch.publish(message) - # Mock publish error. + # Mock error when publish RPC span creation is attempted. error = google.api_core.exceptions.InternalServerError("error") with mock.patch.object( type(batch), - "_create_publish_rpc_span", + "_start_publish_rpc_span", side_effect=error, ): batch._commit() @@ -751,7 +751,8 @@ def test_open_telemetry_commit_publish_rpc_span_none(span_exporter): assert batch._rpc_span is None spans = span_exporter.get_finished_spans() - # Create span (mock error thrown when publish RPC span creation is attempted) + # Only Create span should be exported, since publish RPC span creation + # should fail with a mock error. assert len(spans) == 1 publish_create_span = spans[0] @@ -862,6 +863,17 @@ def test_opentelemetry_commit_sampling(span_exporter): assert publish_rpc_span.links[0].context == create_span2.context assert create_span2.links[0].context == publish_rpc_span.context + # Verify all spans have ended. + for span in spans: + assert span.end_time is not None + + # Verify both publish create spans have 2 events - publish start and publish + # end. + for span in spans[1:]: + assert len(span.events) == 2 + assert span.events[0].name == "publish start" + assert span.events[1].name == "publish end" + @pytest.mark.skipif( sys.version_info < (3, 8), reason="Open Telemetry requires python3.8 or higher" @@ -916,6 +928,10 @@ def test_opentelemetry_commit(span_exporter): assert len(publish_rpc_span.links) == 2 assert publish_rpc_span.links[0].context == create_span1.context assert publish_rpc_span.links[1].context == create_span2.context + assert len(create_span1.links) == 1 + assert create_span1.links[0].context == publish_rpc_span.get_span_context() + assert len(create_span2.links) == 1 + assert create_span2.links[0].context == publish_rpc_span.get_span_context() # Verify spans of the published messages. assert create_span1.name == "topic create" diff --git a/tests/unit/pubsub_v1/publisher/test_publish_message_wrapper.py b/tests/unit/pubsub_v1/publisher/test_publish_message_wrapper.py index 35888805c..135d71ceb 100644 --- a/tests/unit/pubsub_v1/publisher/test_publish_message_wrapper.py +++ b/tests/unit/pubsub_v1/publisher/test_publish_message_wrapper.py @@ -29,5 +29,7 @@ def test_message_setter(): def test_eq(): wrapper1 = PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"foo")) wrapper2 = PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"bar")) + wrapper3 = PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"foo")) assert wrapper1.__eq__(wrapper2) is False + assert wrapper1.__eq__(wrapper3) is True diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 983947f15..ba996da7a 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -28,17 +28,16 @@ import pytest import time -from opentelemetry import trace +from opentelemetry import trace from google.api_core import gapic_v1 from google.api_core import retry as retries from google.api_core.gapic_v1.client_info import METRICS_METADATA_KEY + from google.cloud.pubsub_v1 import publisher from google.cloud.pubsub_v1 import types - from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer - from google.pubsub_v1 import types as gapic_types from google.pubsub_v1.services.publisher import client as publisher_client from google.pubsub_v1.services.publisher.transports.grpc import PublisherGrpcTransport @@ -206,7 +205,7 @@ def test_opentelemetry_context_propagation(creds, span_exporter): "enable_open_telemetry", [ True, - False, # for test code coverage - exception thrown but Open Telemetry disabled + False, ], ) def test_opentelemetry_publisher_batching_exception( @@ -236,17 +235,30 @@ def test_opentelemetry_publisher_batching_exception( # Span 3: Create Publish span assert len(spans) == 3 - batching_span = spans[1] - create_span = spans[2] + flow_control_span, batching_span, create_span = spans + + # Verify batching span contents. assert batching_span.name == "publisher batching" assert batching_span.kind == trace.SpanKind.INTERNAL - assert batching_span._parent[1] == create_span._context[1] + assert batching_span.parent.span_id == create_span.get_span_context().span_id - # Verify exception recorded by the Publisher Batching span. + # Verify exception recorded by the publisher batching span. assert batching_span.status.status_code == trace.StatusCode.ERROR assert len(batching_span.events) == 1 assert batching_span.events[0].name == "exception" + # Verify exception recorded by the publisher create span. + assert create_span.status.status_code == trace.StatusCode.ERROR + assert len(create_span.events) == 2 + assert create_span.events[0].name == "publish start" + assert create_span.events[1].name == "exception" + + # Verify the finished flow control span. + assert flow_control_span.name == "publisher flow control" + assert len(flow_control_span.events) == 0 + else: + assert len(spans) == 0 + @pytest.mark.skipif( sys.version_info < (3, 8), @@ -283,16 +295,26 @@ def test_opentelemetry_flow_control_exception(creds, span_exporter): failed_flow_control_span = spans[2] finished_publish_create_span = spans[3] + + # Verify failed flow control span values. assert failed_flow_control_span.name == "publisher flow control" assert failed_flow_control_span.kind == trace.SpanKind.INTERNAL assert ( - failed_flow_control_span._parent[1] == finished_publish_create_span._context[1] + failed_flow_control_span.parent.span_id + == finished_publish_create_span.get_span_context().span_id ) assert failed_flow_control_span.status.status_code == trace.StatusCode.ERROR assert len(failed_flow_control_span.events) == 1 assert failed_flow_control_span.events[0].name == "exception" + # Verify finished publish create span values + assert finished_publish_create_span.name == "topicID create" + assert finished_publish_create_span.status.status_code == trace.StatusCode.ERROR + assert len(finished_publish_create_span.events) == 2 + assert finished_publish_create_span.events[0].name == "publish start" + assert finished_publish_create_span.events[1].name == "exception" + @pytest.mark.skipif( sys.version_info < (3, 8), @@ -312,22 +334,21 @@ def test_opentelemetry_publish(creds, span_exporter): # Span 1: Publisher Flow control span # Span 2: Publisher Batching span - # Publish Create Span would still be active, and hence not exported - # at this point in development. + # Publish Create Span would still be active, and hence not exported. assert len(spans) == 2 flow_control_span = spans[0] assert flow_control_span.name == "publisher flow control" assert flow_control_span.kind == trace.SpanKind.INTERNAL # Assert the Publisher Flow Control Span has a parent(the Publish Create - # Span is still not finished, and hence the value of parent cannot yet be - # asserted at this point in development) - assert flow_control_span._parent is not None + # span is still active, and hence unexported. So, the value of parent cannot + # be asserted) + assert flow_control_span.parent is not None batching_span = spans[1] assert batching_span.name == "publisher batching" assert batching_span.kind == trace.SpanKind.INTERNAL - assert batching_span._parent is not None + assert batching_span.parent is not None def test_init_w_api_endpoint(creds): From 4ab4426fa1c1b98b08316464c8c4a56077a19d55 Mon Sep 17 00:00:00 2001 From: mukund-ananthu Date: Tue, 10 Sep 2024 21:35:28 +0000 Subject: [PATCH 19/19] Address review comments --- .../pubsub_v1/open_telemetry/__init__.py | 13 +++++ .../open_telemetry/context_propagation.py | 15 ++++-- .../open_telemetry/publish_message_wrapper.py | 50 ++++++------------- .../pubsub_v1/publisher/_batch/thread.py | 7 +-- .../pubsub_v1/publisher/batch/test_thread.py | 15 ++++-- .../publisher/test_publish_message_wrapper.py | 20 ++++++++ .../publisher/test_publisher_client.py | 2 - 7 files changed, 76 insertions(+), 46 deletions(-) diff --git a/google/cloud/pubsub_v1/open_telemetry/__init__.py b/google/cloud/pubsub_v1/open_telemetry/__init__.py index e69de29bb..e88bb5dbb 100644 --- a/google/cloud/pubsub_v1/open_telemetry/__init__.py +++ b/google/cloud/pubsub_v1/open_telemetry/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/google/cloud/pubsub_v1/open_telemetry/context_propagation.py b/google/cloud/pubsub_v1/open_telemetry/context_propagation.py index 59881f769..37fad3e20 100644 --- a/google/cloud/pubsub_v1/open_telemetry/context_propagation.py +++ b/google/cloud/pubsub_v1/open_telemetry/context_propagation.py @@ -1,4 +1,4 @@ -# Copyright 2017, Google LLC All rights reserved. +# Copyright 2024, Google LLC All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ from opentelemetry.propagators.textmap import Setter -from google.pubsub_v1 import types as gapic_types +from google.pubsub_v1 import PubsubMessage class OpenTelemetryContextSetter(Setter): @@ -22,9 +22,18 @@ class OpenTelemetryContextSetter(Setter): Used by Open Telemetry for context propagation. """ - def set(self, carrier: gapic_types.PubsubMessage, key: str, value: str) -> None: + def set(self, carrier: PubsubMessage, key: str, value: str) -> None: """ Injects trace context into Pub/Sub message attributes with "googclient_" prefix. + + Args: + carrier(PubsubMessage): The Pub/Sub message which is the carrier of Open Telemetry + data. + key(str): The key for which the Open Telemetry context data needs to be set. + value(str): The Open Telemetry context value to be set. + + Returns: + None """ carrier.attributes["googclient_" + key] = value diff --git a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py index 7c609fbad..e03a8f800 100644 --- a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py +++ b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py @@ -14,7 +14,6 @@ import sys from datetime import datetime -import warnings from typing import Optional from opentelemetry import trace @@ -28,7 +27,7 @@ class PublishMessageWrapper: - _OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1.publisher" + _OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1" _OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub" _OPEN_TELEMETRY_PUBLISHER_BATCHING = "publisher batching" @@ -37,6 +36,9 @@ class PublishMessageWrapper: def __init__(self, message: gapic_types.PubsubMessage): self._message: gapic_types.PubsubMessage = message + self._create_span: Optional[trace.Span] = None + self._flow_control_span: Optional[trace.Span] = None + self._batching_span: Optional[trace.Span] = None @property def message(self): @@ -58,6 +60,7 @@ def __eq__(self, other): # pragma: NO COVER def start_create_span(self, topic: str, ordering_key: str) -> None: tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) + assert len(topic.split("/")) == 4 topic_short_name = topic.split("/")[3] with tracer.start_as_current_span( name=f"{topic_short_name} create", @@ -68,7 +71,9 @@ def start_create_span(self, topic: str, ordering_key: str) -> None: "messaging.gcp_pubsub.message.ordering_key": ordering_key, "messaging.operation": "create", "gcp.project_id": topic.split("/")[1], - "messaging.message.body.size": sys.getsizeof(self._message.data), + "messaging.message.body.size": sys.getsizeof( + self._message.data + ), # sys.getsizeof() used since the attribute expects size of message body in bytes }, kind=trace.SpanKind.PRODUCER, end_on_exit=False, @@ -79,19 +84,14 @@ def start_create_span(self, topic: str, ordering_key: str) -> None: "timestamp": str(datetime.now()), }, ) - self._create_span: trace.Span = create_span + self._create_span = create_span TraceContextTextMapPropagator().inject( carrier=self._message, setter=OpenTelemetryContextSetter(), ) def end_create_span(self, exc: Optional[BaseException] = None) -> None: - if self._create_span is None: # pragma: NO COVER - warnings.warn( - message="publish create span is None. Hence, not ending it", - category=RuntimeWarning, - ) - return + assert self._create_span is not None if exc: self._create_span.record_exception(exception=exc) self._create_span.set_status( @@ -101,29 +101,19 @@ def end_create_span(self, exc: Optional[BaseException] = None) -> None: def start_publisher_flow_control_span(self) -> None: tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) - if self._create_span is None: # pragma: NO COVER - warnings.warn( - message="publish create span is None. Hence, not starting publish flow control span", - category=RuntimeWarning, - ) - return + assert self._create_span is not None with tracer.start_as_current_span( name=self._PUBLISH_FLOW_CONTROL, kind=trace.SpanKind.INTERNAL, context=set_span_in_context(self._create_span), end_on_exit=False, ) as flow_control_span: - self._flow_control_span: trace.Span = flow_control_span + self._flow_control_span = flow_control_span def end_publisher_flow_control_span( self, exc: Optional[BaseException] = None ) -> None: - if self._flow_control_span is None: # pragma: NO COVER - warnings.warn( - message="publish flow control span is None. Hence, not ending it", - category=RuntimeWarning, - ) - return + assert self._flow_control_span is not None if exc: self._flow_control_span.record_exception(exception=exc) self._flow_control_span.set_status( @@ -132,12 +122,7 @@ def end_publisher_flow_control_span( self._flow_control_span.end() def start_publisher_batching_span(self) -> None: - if self._create_span is None: # pragma: NO COVER - warnings.warn( - message="publish create span is None. Hence, not starting publisher batching span", - category=RuntimeWarning, - ) - return + assert self._create_span is not None tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) with tracer.start_as_current_span( name=self._OPEN_TELEMETRY_PUBLISHER_BATCHING, @@ -148,12 +133,7 @@ def start_publisher_batching_span(self) -> None: self._batching_span = batching_span def end_publisher_batching_span(self, exc: Optional[BaseException] = None) -> None: - if self._batching_span is None: # pragma: NO COVER - warnings.warn( - message="publisher batching span is None. Hence, not ending it", - category=RuntimeWarning, - ) - return + assert self._batching_span is not None if exc: self._batching_span.record_exception(exception=exc) self._batching_span.set_status( diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index 91e17de2c..c4bf67c35 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -91,7 +91,7 @@ class Batch(base.Batch): timeout is used. """ - _OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1.publisher" + _OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1" _OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub" def __init__( @@ -246,8 +246,9 @@ def _start_publish_rpc_span(self) -> None: for wrapper in self._message_wrappers: span = wrapper.create_span # Add links only for sampled spans. - if span.is_recording(): + if span.get_span_context().trace_flags.sampled: links.append(trace.Link(span.get_span_context())) + assert len(self._topic.split("/")) == 4 topic_short_name = self._topic.split("/")[3] with tracer.start_as_current_span( name=f"{topic_short_name} publish", @@ -266,7 +267,7 @@ def _start_publish_rpc_span(self) -> None: ctx = rpc_span.get_span_context() for wrapper in self._message_wrappers: span = wrapper.create_span - if span.is_recording(): + if span.get_span_context().trace_flags.sampled: span.add_link(ctx) self._rpc_span = rpc_span diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index b058c8933..32eaa3d98 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -26,6 +26,7 @@ import pytest from opentelemetry import trace +from opentelemetry.trace import SpanContext import google.api_core.exceptions from google.api_core import gapic_v1 @@ -830,21 +831,29 @@ def test_opentelemetry_commit_sampling(span_exporter): message=gapic_types.PubsubMessage(data=b"foo"), ) message1.start_create_span(topic=TOPIC, ordering_key=None) - message1.create_span.is_recording = mock.Mock(return_value=False) message2 = PublishMessageWrapper( message=gapic_types.PubsubMessage(data=b"bar"), ) message2.start_create_span(topic=TOPIC, ordering_key=None) + # Mock the 'get_span_context' method to return a mock SpanContext + mock_span_context = mock.Mock(spec=SpanContext) + mock_span_context.trace_flags.sampled = False + batch.publish(message1) batch.publish(message2) publish_response = gapic_types.PublishResponse(message_ids=["a", "b"]) + + # Patch the 'create_span' method to return the mock SpanContext with mock.patch.object( - type(batch.client), "_gapic_publish", return_value=publish_response + message1.create_span, "get_span_context", return_value=mock_span_context ): - batch._commit() + with mock.patch.object( + type(batch.client), "_gapic_publish", return_value=publish_response + ): + batch._commit() spans = span_exporter.get_finished_spans() diff --git a/tests/unit/pubsub_v1/publisher/test_publish_message_wrapper.py b/tests/unit/pubsub_v1/publisher/test_publish_message_wrapper.py index 135d71ceb..e100950ad 100644 --- a/tests/unit/pubsub_v1/publisher/test_publish_message_wrapper.py +++ b/tests/unit/pubsub_v1/publisher/test_publish_message_wrapper.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pytest + from google.pubsub_v1 import types as gapic_types from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( PublishMessageWrapper, @@ -33,3 +35,21 @@ def test_eq(): assert wrapper1.__eq__(wrapper2) is False assert wrapper1.__eq__(wrapper3) is True + + +def test_end_create_span(): + wrapper = PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"foo")) + with pytest.raises(AssertionError): + wrapper.end_create_span() + + +def test_end_publisher_flow_control_span(): + wrapper = PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"foo")) + with pytest.raises(AssertionError): + wrapper.end_publisher_flow_control_span() + + +def test_end_publisher_batching_span(): + wrapper = PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"foo")) + with pytest.raises(AssertionError): + wrapper.end_publisher_batching_span() diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index ba996da7a..23255db3b 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -335,8 +335,6 @@ def test_opentelemetry_publish(creds, span_exporter): # Span 1: Publisher Flow control span # Span 2: Publisher Batching span # Publish Create Span would still be active, and hence not exported. - assert len(spans) == 2 - flow_control_span = spans[0] assert flow_control_span.name == "publisher flow control" assert flow_control_span.kind == trace.SpanKind.INTERNAL