Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Open Telemetry Publish Side Support #1241

Merged
merged 20 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
04e5623
Add opentelemetry as a dependency in setup.py
mukund-ananthu Aug 28, 2024
ac86799
Add enable_open_telemetry_tracing Publisher Option
mukund-ananthu Aug 28, 2024
b736635
Set/ Override Open Telemetry enabled PublisherOption in the Publish c…
mukund-ananthu Aug 28, 2024
d700967
Add OpenTelemetryContextSetter to use for context propagation
mukund-ananthu Aug 28, 2024
5a42d0e
Add Publish Create Span and PublishMessageWrapper
mukund-ananthu Aug 28, 2024
e7aea6c
Add Publish Flow Control Span
mukund-ananthu Aug 29, 2024
2e3f576
End Publisher Flow Control and Create Span on FlowControlLimitError
mukund-ananthu Aug 29, 2024
0f865d7
Add Publisher Batching Span
mukund-ananthu Aug 30, 2024
b4a51df
Fix TraceContextPropagation
mukund-ananthu Aug 30, 2024
f98b3c5
Plumb PublishMessageWrapper to sequencers and batch publish
mukund-ananthu Aug 30, 2024
343ef17
Increase test coverage across files
mukund-ananthu Aug 30, 2024
02b9355
Add Publish RPC Span
mukund-ananthu Sep 5, 2024
b519fb5
Fix mypy type checking errors
mukund-ananthu Sep 6, 2024
f6c3c8f
Update messaging.system attribute of publish RPC span
mukund-ananthu Sep 6, 2024
cf8d039
Update publish RPC sampling test to include both sampled and unsampled
mukund-ananthu Sep 6, 2024
3519fbd
Fix test comments
mukund-ananthu Sep 6, 2024
3bc3a56
Update publish RPC and create span to use shorter format of topic
mukund-ananthu Sep 6, 2024
c5982ca
Code clean up
mukund-ananthu Sep 7, 2024
4ab4426
Address review comments
mukund-ananthu Sep 10, 2024
de487a1
Merge branch 'main' into otel_publish
mukund-ananthu Sep 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
mukund-ananthu marked this conversation as resolved.
Show resolved Hide resolved
Empty file.
30 changes: 30 additions & 0 deletions google/cloud/pubsub_v1/open_telemetry/context_propagation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright 2017, Google LLC All rights reserved.
parthea marked this conversation as resolved.
Show resolved Hide resolved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from opentelemetry.propagators.textmap import Setter

from google.pubsub_v1 import types as gapic_types
parthea marked this conversation as resolved.
Show resolved Hide resolved


class OpenTelemetryContextSetter(Setter):
"""
Used by Open Telemetry for context propagation.
"""

def set(self, carrier: gapic_types.PubsubMessage, key: str, value: str) -> None:
"""
Injects trace context into Pub/Sub message attributes with
"googclient_" prefix.
parthea marked this conversation as resolved.
Show resolved Hide resolved
"""
carrier.attributes["googclient_" + key] = value
162 changes: 162 additions & 0 deletions google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# Copyright 2017, Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
from datetime import datetime
import warnings
from typing import Optional

from opentelemetry import trace
from opentelemetry.trace.propagation import set_span_in_context
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from google.pubsub_v1 import types as gapic_types
from google.cloud.pubsub_v1.open_telemetry.context_propagation import (
OpenTelemetryContextSetter,
)


class PublishMessageWrapper:
_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1.publisher"
_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"
_OPEN_TELEMETRY_PUBLISHER_BATCHING = "publisher batching"

_PUBLISH_START_EVENT: str = "publish start"
_PUBLISH_FLOW_CONTROL: str = "publisher flow control"

def __init__(self, message: gapic_types.PubsubMessage):
self._message: gapic_types.PubsubMessage = message

@property
def message(self):
return self._message

@message.setter # type: ignore[no-redef] # resetting message value is intentional here
def message(self, message: gapic_types.PubsubMessage):
self._message = message

@property
def create_span(self):
return self._create_span

def __eq__(self, other): # pragma: NO COVER
"""Used for pytest asserts to compare two PublishMessageWrapper objects with the same message"""
if isinstance(self, other.__class__):
return self.message == other.message
return False

def start_create_span(self, topic: str, ordering_key: str) -> None:
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
topic_short_name = topic.split("/")[3]
hongalex marked this conversation as resolved.
Show resolved Hide resolved
mukund-ananthu marked this conversation as resolved.
Show resolved Hide resolved
with tracer.start_as_current_span(
name=f"{topic_short_name} create",
attributes={
"messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM,
hongalex marked this conversation as resolved.
Show resolved Hide resolved
"messaging.destination.name": topic_short_name,
"code.function": "publish",
"messaging.gcp_pubsub.message.ordering_key": ordering_key,
"messaging.operation": "create",
"gcp.project_id": topic.split("/")[1],
"messaging.message.body.size": sys.getsizeof(self._message.data),
parthea marked this conversation as resolved.
Show resolved Hide resolved
},
kind=trace.SpanKind.PRODUCER,
end_on_exit=False,
) as create_span:
create_span.add_event(
name=self._PUBLISH_START_EVENT,
attributes={
"timestamp": str(datetime.now()),
},
)
self._create_span: trace.Span = create_span
TraceContextTextMapPropagator().inject(
carrier=self._message,
setter=OpenTelemetryContextSetter(),
)

def end_create_span(self, exc: Optional[BaseException] = None) -> None:
if self._create_span is None: # pragma: NO COVER
mukund-ananthu marked this conversation as resolved.
Show resolved Hide resolved
warnings.warn(
message="publish create span is None. Hence, not ending it",
category=RuntimeWarning,
)
return
if exc:
self._create_span.record_exception(exception=exc)
self._create_span.set_status(
trace.Status(status_code=trace.StatusCode.ERROR)
)
self._create_span.end()

def start_publisher_flow_control_span(self) -> None:
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
if self._create_span is None: # pragma: NO COVER
warnings.warn(
message="publish create span is None. Hence, not starting publish flow control span",
category=RuntimeWarning,
)
return
with tracer.start_as_current_span(
name=self._PUBLISH_FLOW_CONTROL,
kind=trace.SpanKind.INTERNAL,
context=set_span_in_context(self._create_span),
end_on_exit=False,
) as flow_control_span:
self._flow_control_span: trace.Span = flow_control_span

def end_publisher_flow_control_span(
self, exc: Optional[BaseException] = None
) -> None:
if self._flow_control_span is None: # pragma: NO COVER
warnings.warn(
message="publish flow control span is None. Hence, not ending it",
category=RuntimeWarning,
)
return
if exc:
self._flow_control_span.record_exception(exception=exc)
self._flow_control_span.set_status(
trace.Status(status_code=trace.StatusCode.ERROR)
)
self._flow_control_span.end()

def start_publisher_batching_span(self) -> None:
if self._create_span is None: # pragma: NO COVER
warnings.warn(
message="publish create span is None. Hence, not starting publisher batching span",
category=RuntimeWarning,
)
return
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
with tracer.start_as_current_span(
name=self._OPEN_TELEMETRY_PUBLISHER_BATCHING,
kind=trace.SpanKind.INTERNAL,
context=set_span_in_context(self._create_span),
end_on_exit=False,
) as batching_span:
self._batching_span = batching_span

def end_publisher_batching_span(self, exc: Optional[BaseException] = None) -> None:
if self._batching_span is None: # pragma: NO COVER
warnings.warn(
message="publisher batching span is None. Hence, not ending it",
category=RuntimeWarning,
)
return
if exc:
self._batching_span.record_exception(exception=exc)
self._batching_span.set_status(
trace.Status(status_code=trace.StatusCode.ERROR)
)
self._batching_span.end()
8 changes: 6 additions & 2 deletions google/cloud/pubsub_v1/publisher/_batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
import typing
from typing import Optional, Sequence

from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import (
PublishMessageWrapper,
)


if typing.TYPE_CHECKING: # pragma: NO COVER
from google.cloud import pubsub_v1
Expand Down Expand Up @@ -54,7 +58,7 @@ class Batch(metaclass=abc.ABCMeta):

def __len__(self):
"""Return the number of messages currently in the batch."""
return len(self.messages)
return len(self.message_wrappers)

@staticmethod
@abc.abstractmethod
Expand All @@ -68,7 +72,7 @@ def make_lock(): # pragma: NO COVER

@property
@abc.abstractmethod
def messages(self) -> Sequence["gapic_types.PubsubMessage"]: # pragma: NO COVER
def message_wrappers(self) -> Sequence[PublishMessageWrapper]: # pragma: NO COVER
"""Return the messages currently in the batch.

Returns:
Expand Down
Loading