Skip to content

Commit

Permalink
feat: Add OpenTelemetry publish sample
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Sep 25, 2024
1 parent 747a016 commit 184b8e5
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 0 deletions.
92 changes: 92 additions & 0 deletions samples/snippets/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,87 @@ def delete_topic(project_id: str, topic_id: str) -> None:
# [END pubsub_delete_topic]


def pubsub_publish_otel_tracing(
topic_project_id: str, trace_project_id: str, topic_id: str
) -> None:
"""
Publish to `topic_id` in `topic_project_id` with OpenTelemetry enabled.
Export the OpenTelemetry traces to Google Cloud Trace in project
`trace_project_id`
Args:
topic_project_id: project ID of the topic to publish to.
trace_project_id: project ID to export Cloud Trace to.
topic_id: topic ID to publish to.
Returns:
None
"""
# [START pubsub_publish_otel_tracing]

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
)
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased

from google.cloud.pubsub_v1 import PublisherClient
from google.cloud.pubsub_v1.types import PublisherOptions

# TODO(developer)
# topic_project_id = "your-topic-project-id"
# trace_project_id = "your-trace-project-id"
# topic_id = "your-topic-id"

# In this sample, we use a Google Cloud Trace to export the OpenTelemetry
# traces: https://cloud.google.com/trace/docs/setup/python-ot
# Choose and configure the exporter for your set up accordingly.

# Sample 1 in every 1000 traces.
sampler = ParentBased(root=TraceIdRatioBased(1 / 1000))
trace.set_tracer_provider(TracerProvider(sampler=sampler))

# Export to Google Trace.
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=trace_project_id,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(cloud_trace_exporter)
)

# Set the `enable_open_telemetry_tracing` option to True when creating
# the publisher client. This in itself is necessary and sufficient for
# the library to export OpenTelemetry traces. However, where the traces
# must be exported to needs to be configured based on your OpenTelemetry
# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
publisher = PublisherClient(
publisher_options=PublisherOptions(
enable_open_telemetry_tracing=True,
),
)

# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(topic_project_id, topic_id)

tracer = trace.get_tracer("google.cloud.pubsub_v1.publisher")
with tracer.start_as_current_span("parent cloud trace span"):
# Publish messages.
for n in range(1, 10000):
data_str = f"Message number {n}"
# Data must be a bytestring
data = data_str.encode("utf-8")
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data)
print(future.result())

print(f"Published messages to {topic_path}.")

# [END pubsub_publish_otel_tracing]


def publish_messages(project_id: str, topic_id: str) -> None:
"""Publishes multiple messages to a Pub/Sub topic."""
# [START pubsub_quickstart_publisher]
Expand Down Expand Up @@ -522,6 +603,13 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
create_parser = subparsers.add_parser("create", help=create_topic.__doc__)
create_parser.add_argument("topic_id")

pubsub_publish_otel_tracing_parser = subparsers.add_parser(
"pubsub-publish-otel-tracing", help=pubsub_publish_otel_tracing.__doc__
)
pubsub_publish_otel_tracing_parser.add_argument("topic_project_id")
pubsub_publish_otel_tracing_parser.add_argument("trace_project_id")
pubsub_publish_otel_tracing_parser.add_argument("topic_id")

create_topic_with_kinesis_ingestion_parser = subparsers.add_parser(
"create_kinesis_ingestion", help=create_topic_with_kinesis_ingestion.__doc__
)
Expand Down Expand Up @@ -638,3 +726,7 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
resume_publish_with_ordering_keys(args.project_id, args.topic_id)
elif args.command == "detach-subscription":
detach_subscription(args.project_id, args.subscription_id)
elif args.command == "pubsub-publish-otel-tracing":
pubsub_publish_otel_tracing(
args.topic_project_id, args.trace_project_id, args.topic_id
)
13 changes: 13 additions & 0 deletions samples/snippets/publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import typing
from typing import Any, Callable, cast, Iterator, TypeVar, Union
import uuid
import sys

from _pytest.capture import CaptureFixture
import backoff
Expand Down Expand Up @@ -209,6 +210,18 @@ def test_list(topic_path: str, capsys: CaptureFixture[str]) -> None:
assert topic_path in out


@pytest.mark.skipif(
sys.version_info < (3, 8),
reason="Open Telemetry not supported below Python version 3.8",
)
def test_pubsub_publish_otel_tracing(
capsys: CaptureFixture[str],
) -> None:
publisher.pubsub_publish_otel_tracing(PROJECT_ID, PROJECT_ID, TOPIC_ID)
out, _ = capsys.readouterr()
assert f"Published messages to {topic_path}." in out


def test_publish(topic_path: str, capsys: CaptureFixture[str]) -> None:
publisher.publish_messages(PROJECT_ID, TOPIC_ID)

Expand Down
3 changes: 3 additions & 0 deletions samples/snippets/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ avro==1.12.0
protobuf===4.24.4; python_version == '3.7'
protobuf==5.28.0; python_version >= '3.8'
avro==1.12.0
opentelemetry-api==1.22.0
opentelemetry-sdk==1.22.0
opentelemetry-exporter-gcp-trace==1.7.0

0 comments on commit 184b8e5

Please sign in to comment.