diff --git a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py index 6347b3963..f30cda5de 100644 --- a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py +++ b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py @@ -77,3 +77,14 @@ def add_subscribe_span_event(self, event: str) -> None: "timestamp": str(datetime.now()), }, ) + + def end_subscribe_span(self) -> None: + assert self._subscribe_span is not None + self._subscribe_span.end() + + def set_subscribe_span_result(self, result: str) -> None: + assert self._subscribe_span is not None + self._subscribe_span.set_attribute( + key="messaging.gcp_pubsub.result", + value=result, + ) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 15ad4abb3..fc5193555 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -243,6 +243,12 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None: ack_reqs_dict=ack_reqs_dict, ) + for completed_ack in requests_completed: + if completed_ack.opentelemetry_data: + completed_ack.opentelemetry_data.set_subscribe_span_result("acked") + completed_ack.opentelemetry_data.add_subscribe_span_event("ack end") + completed_ack.opentelemetry_data.end_subscribe_span() + # Remove the completed messages from lease management. self.drop(requests_completed) @@ -286,6 +292,13 @@ def _retry_acks(self, requests_to_retry): ack_ids=[req.ack_id for req in requests_to_retry], ack_reqs_dict=ack_reqs_dict, ) + + for completed_ack in requests_completed: + if completed_ack.opentelemetry_data: + completed_ack.opentelemetry_data.set_subscribe_span_result("acked") + completed_ack.opentelemetry_data.add_subscribe_span_event("ack end") + completed_ack.opentelemetry_data.end_subscribe_span() + assert ( len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE ), "Too many requests to be retried."