Skip to content

Commit

Permalink
Add ack span for dispatcher._retry_ack
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Sep 23, 2024
1 parent b66fd19 commit 54a88ba
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 12 deletions.
29 changes: 29 additions & 0 deletions google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,43 @@ def _retry_acks(self, requests_to_retry: List[requests.AckRequest]):
time.sleep(time_to_wait)

ack_reqs_dict = {req.ack_id: req for req in requests_to_retry}
subscription_id: Optional[str] = None
project_id: Optional[str] = None
subscribe_links: List[trace.Link] = []
for req in requests_to_retry:
if req.opentelemetry_data:
req.opentelemetry_data.add_subscribe_span_event("ack start")
if subscription_id is None:
subscription_id = req.opentelemetry_data.subscription_id
if project_id is None:
project_id = req.opentelemetry_data.project_id
subscribe_span: Optional[
trace.Span
] = req.opentelemetry_data.subscribe_span
if (
subscribe_span
and subscribe_span.get_span_context().trace_flags.sampled
):
subscribe_links.append(
trace.Link(subscribe_span.get_span_context())
)
ack_span: Optional[trace.Span] = None
if subscription_id and project_id:
ack_span = start_ack_span(
subscription_id,
len(ack_reqs_dict),
project_id,
subscribe_links,
)

requests_completed, requests_to_retry = self._manager.send_unary_ack(
ack_ids=[req.ack_id for req in requests_to_retry],
ack_reqs_dict=ack_reqs_dict,
)

if ack_span:
ack_span.end()

for completed_ack in requests_completed:
if completed_ack.opentelemetry_data:
completed_ack.opentelemetry_data.add_subscribe_span_event("ack end")
Expand Down
56 changes: 44 additions & 12 deletions tests/unit/pubsub_v1/subscriber/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,8 +596,15 @@ def test_opentelemetry_retry_acks(span_exporter):
streaming_pull_manager.StreamingPullManager, instance=True
)
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)
opentelemetry_data = SubscribeOpenTelemetry(message=PubsubMessage(data=b"foo"))
opentelemetry_data.start_subscribe_span(
data1 = SubscribeOpenTelemetry(message=PubsubMessage(data=b"foo"))
data1.start_subscribe_span(
subscription="projects/projectID/subscriptions/subscriptionID",
exactly_once_enabled=True,
ack_id="ack_id",
delivery_attempt=5,
)
data2 = SubscribeOpenTelemetry(message=PubsubMessage(data=b"foo"))
data2.start_subscribe_span(
subscription="projects/projectID/subscriptions/subscriptionID",
exactly_once_enabled=True,
ack_id="ack_id",
Expand All @@ -612,23 +619,48 @@ def test_opentelemetry_retry_acks(span_exporter):
time_to_ack=20,
ordering_key="",
future=f,
opentelemetry_data=opentelemetry_data,
)
opentelemetry_data=data1,
),
requests.AckRequest(
ack_id="ack_id_string2",
byte_size=0,
time_to_ack=20,
ordering_key="",
future=f,
opentelemetry_data=data2,
),
]
manager.send_unary_ack.side_effect = [(items, [])]
mock_span_context = mock.Mock(spec=trace.SpanContext)
mock_span_context.trace_flags.sampled = False
with mock.patch("time.sleep", return_value=None):
dispatcher_._retry_acks(items)
with mock.patch.object(
data2._subscribe_span, "get_span_context", return_value=mock_span_context
):
dispatcher_._retry_acks(items)

spans = span_exporter.get_finished_spans()

assert len(spans) == 1
subscribe_span = spans[0]
assert len(spans) == 3
ack_span = spans[0]

assert "messaging.gcp_pubsub.result" in subscribe_span.attributes
assert subscribe_span.attributes["messaging.gcp_pubsub.result"] == "acked"
assert len(subscribe_span.events) == 2
assert subscribe_span.events[0].name == "ack start"
assert subscribe_span.events[1].name == "ack end"
for subscribe_span in spans[1:]:
assert "messaging.gcp_pubsub.result" in subscribe_span.attributes
assert subscribe_span.attributes["messaging.gcp_pubsub.result"] == "acked"
assert len(subscribe_span.events) == 2
assert subscribe_span.events[0].name == "ack start"
assert subscribe_span.events[1].name == "ack end"

assert ack_span.name == "subscriptionID ack"
assert ack_span.kind == trace.SpanKind.CLIENT
assert ack_span.parent is None
assert len(ack_span.links) == 1
assert ack_span.attributes["messaging.system"] == "gcp_pubsub"
assert ack_span.attributes["messaging.batch.message_count"] == 2
assert ack_span.attributes["messaging.operation"] == "ack"
assert ack_span.attributes["gcp.project_id"] == "projectID"
assert ack_span.attributes["messaging.destination.name"] == "subscriptionID"
assert ack_span.attributes["code.function"] == "ack"


def test_retry_acks():
Expand Down

0 comments on commit 54a88ba

Please sign in to comment.