Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Sep 11, 2024
1 parent c5982ca commit c563500
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 52 deletions.
13 changes: 13 additions & 0 deletions google/cloud/pubsub_v1/open_telemetry/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2024, Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
15 changes: 12 additions & 3 deletions google/cloud/pubsub_v1/open_telemetry/context_propagation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2017, Google LLC All rights reserved.
# Copyright 2024, Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,17 +14,26 @@

from opentelemetry.propagators.textmap import Setter

from google.pubsub_v1 import types as gapic_types
from google.pubsub_v1 import PubsubMessage


class OpenTelemetryContextSetter(Setter):
"""
Used by Open Telemetry for context propagation.
"""

def set(self, carrier: gapic_types.PubsubMessage, key: str, value: str) -> None:
def set(self, carrier: PubsubMessage, key: str, value: str) -> None:
"""
Injects trace context into Pub/Sub message attributes with
"googclient_" prefix.
Args:
carrier(PubsubMessage): The Pub/Sub message which is the carrier of Open Telemetry
data.
key(str): The key for which the Open Telemetry context data needs to be set.
value(str): The Open Telemetry context value to be set.
Returns:
None
"""
carrier.attributes["googclient_" + key] = value
58 changes: 20 additions & 38 deletions google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@

import sys
from datetime import datetime
import warnings
from typing import Optional

from opentelemetry import trace
from opentelemetry.trace.propagation import set_span_in_context
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY,
MESSAGING_MESSAGE_BODY_SIZE,
)

from google.pubsub_v1 import types as gapic_types
from google.cloud.pubsub_v1.open_telemetry.context_propagation import (
Expand All @@ -28,7 +32,7 @@


class PublishMessageWrapper:
_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1.publisher"
_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1"
_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"
_OPEN_TELEMETRY_PUBLISHER_BATCHING = "publisher batching"

Expand Down Expand Up @@ -58,17 +62,20 @@ def __eq__(self, other): # pragma: NO COVER

def start_create_span(self, topic: str, ordering_key: str) -> None:
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
assert len(topic.split("/")) == 4
topic_short_name = topic.split("/")[3]
with tracer.start_as_current_span(
name=f"{topic_short_name} create",
attributes={
"messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM,
"messaging.destination.name": topic_short_name,
"code.function": "publish",
"messaging.gcp_pubsub.message.ordering_key": ordering_key,
"messaging.operation": "create",
SpanAttributes.MESSAGING_SYSTEM: self._OPEN_TELEMETRY_MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION_NAME: topic_short_name,
SpanAttributes.CODE_FUNCTION: "publish",
MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY: ordering_key,
SpanAttributes.MESSAGING_OPERATION: "create",
"gcp.project_id": topic.split("/")[1],
"messaging.message.body.size": sys.getsizeof(self._message.data),
MESSAGING_MESSAGE_BODY_SIZE: sys.getsizeof(
self._message.data
), # sys.getsizeof() used since the attribute expects size of message body in bytes
},
kind=trace.SpanKind.PRODUCER,
end_on_exit=False,
Expand All @@ -86,12 +93,7 @@ def start_create_span(self, topic: str, ordering_key: str) -> None:
)

def end_create_span(self, exc: Optional[BaseException] = None) -> None:
if self._create_span is None: # pragma: NO COVER
warnings.warn(
message="publish create span is None. Hence, not ending it",
category=RuntimeWarning,
)
return
assert self._create_span is not None
if exc:
self._create_span.record_exception(exception=exc)
self._create_span.set_status(
Expand All @@ -101,12 +103,7 @@ def end_create_span(self, exc: Optional[BaseException] = None) -> None:

def start_publisher_flow_control_span(self) -> None:
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
if self._create_span is None: # pragma: NO COVER
warnings.warn(
message="publish create span is None. Hence, not starting publish flow control span",
category=RuntimeWarning,
)
return
assert self._create_span is not None
with tracer.start_as_current_span(
name=self._PUBLISH_FLOW_CONTROL,
kind=trace.SpanKind.INTERNAL,
Expand All @@ -118,12 +115,7 @@ def start_publisher_flow_control_span(self) -> None:
def end_publisher_flow_control_span(
self, exc: Optional[BaseException] = None
) -> None:
if self._flow_control_span is None: # pragma: NO COVER
warnings.warn(
message="publish flow control span is None. Hence, not ending it",
category=RuntimeWarning,
)
return
assert self._flow_control_span is not None
if exc:
self._flow_control_span.record_exception(exception=exc)
self._flow_control_span.set_status(
Expand All @@ -132,12 +124,7 @@ def end_publisher_flow_control_span(
self._flow_control_span.end()

def start_publisher_batching_span(self) -> None:
if self._create_span is None: # pragma: NO COVER
warnings.warn(
message="publish create span is None. Hence, not starting publisher batching span",
category=RuntimeWarning,
)
return
assert self._create_span is not None
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
with tracer.start_as_current_span(
name=self._OPEN_TELEMETRY_PUBLISHER_BATCHING,
Expand All @@ -148,12 +135,7 @@ def start_publisher_batching_span(self) -> None:
self._batching_span = batching_span

def end_publisher_batching_span(self, exc: Optional[BaseException] = None) -> None:
if self._batching_span is None: # pragma: NO COVER
warnings.warn(
message="publisher batching span is None. Hence, not ending it",
category=RuntimeWarning,
)
return
assert self._batching_span is not None
if exc:
self._batching_span.record_exception(exception=exc)
self._batching_span.set_status(
Expand Down
20 changes: 12 additions & 8 deletions google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
from datetime import datetime

from opentelemetry import trace
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
MESSAGING_BATCH_MESSAGE_COUNT,
)
import google.api_core.exceptions
from google.api_core import gapic_v1

Expand Down Expand Up @@ -91,7 +95,7 @@ class Batch(base.Batch):
timeout is used.
"""

_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1.publisher"
_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1"
_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"

def __init__(
Expand Down Expand Up @@ -246,18 +250,18 @@ def _start_publish_rpc_span(self) -> None:
for wrapper in self._message_wrappers:
span = wrapper.create_span
# Add links only for sampled spans.
if span.is_recording():
if span.get_span_context().trace_flags.sampled:
links.append(trace.Link(span.get_span_context()))
topic_short_name = self._topic.split("/")[3]
with tracer.start_as_current_span(
name=f"{topic_short_name} publish",
attributes={
"messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM,
"messaging.destination.name": topic_short_name,
SpanAttributes.MESSAGING_SYSTEM: self._OPEN_TELEMETRY_MESSAGING_SYSTEM,
SpanAttributes.MESSAGING_DESTINATION_NAME: topic_short_name,
"gcp.project_id": self._topic.split("/")[1],
"messaging.batch.message_count": len(self._message_wrappers),
"messaging.operation": "publish",
"code.function": "_commit",
MESSAGING_BATCH_MESSAGE_COUNT: len(self._message_wrappers),
SpanAttributes.MESSAGING_OPERATION: "publish",
SpanAttributes.CODE_FUNCTION: "_commit",
},
links=links,
kind=trace.SpanKind.CLIENT,
Expand All @@ -266,7 +270,7 @@ def _start_publish_rpc_span(self) -> None:
ctx = rpc_span.get_span_context()
for wrapper in self._message_wrappers:
span = wrapper.create_span
if span.is_recording():
if span.get_span_context().trace_flags.sampled:
span.add_link(ctx)
self._rpc_span = rpc_span

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
"grpcio-status >= 1.33.2",
"opentelemetry-api",
"opentelemetry-sdk",
"opentelemetry-semantic-conventions",
]
extras = {"libcst": "libcst >= 0.3.10"}
url = "https://github.com/googleapis/python-pubsub"
Expand Down
15 changes: 12 additions & 3 deletions tests/unit/pubsub_v1/publisher/batch/test_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import pytest

from opentelemetry import trace
from opentelemetry.trace import SpanContext

import google.api_core.exceptions
from google.api_core import gapic_v1
Expand Down Expand Up @@ -830,21 +831,29 @@ def test_opentelemetry_commit_sampling(span_exporter):
message=gapic_types.PubsubMessage(data=b"foo"),
)
message1.start_create_span(topic=TOPIC, ordering_key=None)
message1.create_span.is_recording = mock.Mock(return_value=False)

message2 = PublishMessageWrapper(
message=gapic_types.PubsubMessage(data=b"bar"),
)
message2.start_create_span(topic=TOPIC, ordering_key=None)

# Mock the 'get_span_context' method to return a mock SpanContext
mock_span_context = mock.Mock(spec=SpanContext)
mock_span_context.trace_flags.sampled = False

batch.publish(message1)
batch.publish(message2)

publish_response = gapic_types.PublishResponse(message_ids=["a", "b"])

# Patch the 'create_span' method to return the mock SpanContext
with mock.patch.object(
type(batch.client), "_gapic_publish", return_value=publish_response
message1.create_span, "get_span_context", return_value=mock_span_context
):
batch._commit()
with mock.patch.object(
type(batch.client), "_gapic_publish", return_value=publish_response
):
batch._commit()

spans = span_exporter.get_finished_spans()

Expand Down

0 comments on commit c563500

Please sign in to comment.