diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java new file mode 100644 index 000000000..b946f44bf --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java @@ -0,0 +1,460 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.List; + +public class OpenTelemetryPubsubTracer { + private final Tracer tracer; + private boolean enabled = false; + + private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publisher flow control"; + private static final String PUBLISH_BATCHING_SPAN_NAME = "publisher batching"; + private static final String SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME = + "subscriber concurrency control"; + private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler"; + + private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size"; + private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key"; + private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id"; + private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY = + "messaging.gcp_pubsub.message.exactly_once_delivery"; + private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY = + "messaging.gcp_pubsub.message.delivery_attempt"; + private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline"; + private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack"; + private static final String PROJECT_ATTR_KEY = "gcp.project_id"; + private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish"; + + private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub"; + + OpenTelemetryPubsubTracer(Tracer tracer, boolean enableOpenTelemetry) { + this.tracer = tracer; + if (this.tracer != null && enableOpenTelemetry) { + this.enabled = true; + } + } + + /** Populates attributes that are common the publisher parent span and publish RPC span. */ + private static final AttributesBuilder createCommonSpanAttributesBuilder( + String destinationName, String projectName, String codeFunction, String operation) { + AttributesBuilder attributesBuilder = + Attributes.builder() + .put(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) + .put(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName) + .put(PROJECT_ATTR_KEY, projectName) + .put(SemanticAttributes.CODE_FUNCTION, codeFunction); + if (operation != null) { + attributesBuilder.put(SemanticAttributes.MESSAGING_OPERATION, operation); + } + + return attributesBuilder; + } + + private Span startChildSpan(String name, Span parent) { + return tracer.spanBuilder(name).setParent(Context.current().with(parent)).startSpan(); + } + + /** + * Creates and starts the parent span with the appropriate span attributes and injects the span + * context into the {@link PubsubMessage} attributes. + */ + void startPublisherSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + AttributesBuilder attributesBuilder = + createCommonSpanAttributesBuilder( + message.getTopicName(), message.getTopicProject(), "publish", "create"); + + attributesBuilder.put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize()); + if (!message.getOrderingKey().isEmpty()) { + attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey()); + } + + Span publisherSpan = + tracer + .spanBuilder(message.getTopicName() + " create") + .setSpanKind(SpanKind.PRODUCER) + .setAllAttributes(attributesBuilder.build()) + .startSpan(); + + message.setPublisherSpan(publisherSpan); + if (publisherSpan.getSpanContext().isValid()) { + message.injectSpanContext(); + } + } + + void endPublisherSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + message.endPublisherSpan(); + } + + void setPublisherMessageIdSpanAttribute(PubsubMessageWrapper message, String messageId) { + if (!enabled) { + return; + } + message.setPublisherMessageIdSpanAttribute(messageId); + } + + /** Creates a span for publish-side flow control as a child of the parent publisher span. */ + void startPublishFlowControlSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + Span publisherSpan = message.getPublisherSpan(); + if (publisherSpan != null) + message.setPublishFlowControlSpan( + startChildSpan(PUBLISH_FLOW_CONTROL_SPAN_NAME, publisherSpan)); + } + + void endPublishFlowControlSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + message.endPublishFlowControlSpan(); + } + + void setPublishFlowControlSpanException(PubsubMessageWrapper message, Throwable t) { + if (!enabled) { + return; + } + message.setPublishFlowControlSpanException(t); + } + + /** Creates a span for publish message batching as a child of the parent publisher span. */ + void startPublishBatchingSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + Span publisherSpan = message.getPublisherSpan(); + if (publisherSpan != null) { + message.setPublishBatchingSpan(startChildSpan(PUBLISH_BATCHING_SPAN_NAME, publisherSpan)); + } + } + + void endPublishBatchingSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + message.endPublishBatchingSpan(); + } + + /** + * Creates, starts, and returns a publish RPC span for the given message batch. Bi-directional + * links with the publisher parent span are created for sampled messages in the batch. + */ + Span startPublishRpcSpan(String topic, List messages) { + if (!enabled) { + return null; + } + TopicName topicName = TopicName.parse(topic); + Attributes attributes = + createCommonSpanAttributesBuilder( + topicName.getTopic(), topicName.getProject(), "publishCall", "publish") + .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size()) + .build(); + SpanBuilder publishRpcSpanBuilder = + tracer + .spanBuilder(topicName.getTopic() + PUBLISH_RPC_SPAN_SUFFIX) + .setSpanKind(SpanKind.CLIENT) + .setAllAttributes(attributes); + Attributes linkAttributes = + Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, "publish").build(); + for (PubsubMessageWrapper message : messages) { + if (message.getPublisherSpan().getSpanContext().isSampled()) + publishRpcSpanBuilder.addLink(message.getPublisherSpan().getSpanContext(), linkAttributes); + } + Span publishRpcSpan = publishRpcSpanBuilder.startSpan(); + + for (PubsubMessageWrapper message : messages) { + if (publishRpcSpan.getSpanContext().isSampled()) { + message.getPublisherSpan().addLink(publishRpcSpan.getSpanContext(), linkAttributes); + message.addPublishStartEvent(); + } + } + return publishRpcSpan; + } + + /** Ends the given publish RPC span if it exists. */ + void endPublishRpcSpan(Span publishRpcSpan) { + if (!enabled) { + return; + } + if (publishRpcSpan != null) { + publishRpcSpan.end(); + } + } + + /** + * Sets an error status and records an exception when an exception is thrown when publishing the + * message batch. + */ + void setPublishRpcSpanException(Span publishRpcSpan, Throwable t) { + if (!enabled) { + return; + } + if (publishRpcSpan != null) { + publishRpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on publish RPC."); + publishRpcSpan.recordException(t); + publishRpcSpan.end(); + } + } + + void startSubscriberSpan(PubsubMessageWrapper message, boolean exactlyOnceDeliveryEnabled) { + if (!enabled) { + return; + } + AttributesBuilder attributesBuilder = + createCommonSpanAttributesBuilder( + message.getSubscriptionName(), message.getSubscriptionProject(), "onResponse", null); + + attributesBuilder + .put(SemanticAttributes.MESSAGING_MESSAGE_ID, message.getMessageId()) + .put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize()) + .put(MESSAGE_ACK_ID_ATTR_KEY, message.getAckId()) + .put(MESSAGE_EXACTLY_ONCE_ATTR_KEY, exactlyOnceDeliveryEnabled); + if (!message.getOrderingKey().isEmpty()) { + attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey()); + } + if (message.getDeliveryAttempt() > 0) { + attributesBuilder.put(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, message.getDeliveryAttempt()); + } + Attributes attributes = attributesBuilder.build(); + Context publisherSpanContext = message.extractSpanContext(attributes); + message.setPublisherSpan(Span.fromContextOrNull(publisherSpanContext)); + message.setSubscriberSpan( + tracer + .spanBuilder(message.getSubscriptionName() + " subscribe") + .setSpanKind(SpanKind.CONSUMER) + .setParent(publisherSpanContext) + .setAllAttributes(attributes) + .startSpan()); + } + + void endSubscriberSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + message.endSubscriberSpan(); + } + + void setSubscriberSpanExpirationResult(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + message.setSubscriberSpanExpirationResult(); + } + + void setSubscriberSpanException(PubsubMessageWrapper message, Throwable t, String exception) { + if (!enabled) { + return; + } + message.setSubscriberSpanException(t, exception); + } + + /** Creates a span for subscribe concurrency control as a child of the parent subscriber span. */ + void startSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + Span subscriberSpan = message.getSubscriberSpan(); + if (subscriberSpan != null) { + message.setSubscribeConcurrencyControlSpan( + startChildSpan(SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME, subscriberSpan)); + } + } + + void endSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + message.endSubscribeConcurrencyControlSpan(); + } + + void setSubscribeConcurrencyControlSpanException(PubsubMessageWrapper message, Throwable t) { + if (!enabled) { + return; + } + message.setSubscribeConcurrencyControlSpanException(t); + } + + /** + * Creates a span for subscribe ordering key scheduling as a child of the parent subscriber span. + */ + void startSubscribeSchedulerSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + Span subscriberSpan = message.getSubscriberSpan(); + if (subscriberSpan != null) { + message.setSubscribeSchedulerSpan( + startChildSpan(SUBSCRIBE_SCHEDULER_SPAN_NAME, subscriberSpan)); + } + } + + void endSubscribeSchedulerSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + message.endSubscribeSchedulerSpan(); + } + + /** Creates a span for subscribe message processing as a child of the parent subscriber span. */ + void startSubscribeProcessSpan(PubsubMessageWrapper message) { + if (!enabled) { + return; + } + Span subscriberSpan = message.getSubscriberSpan(); + if (subscriberSpan != null) { + Span subscribeProcessSpan = + startChildSpan(message.getSubscriptionName() + " process", subscriberSpan); + subscribeProcessSpan.setAttribute( + SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE); + Span publisherSpan = message.getPublisherSpan(); + if (publisherSpan != null) { + subscribeProcessSpan.addLink(publisherSpan.getSpanContext()); + } + message.setSubscribeProcessSpan(subscribeProcessSpan); + } + } + + void endSubscribeProcessSpan(PubsubMessageWrapper message, String action) { + if (!enabled) { + return; + } + message.endSubscribeProcessSpan(action); + } + + /** + * Creates, starts, and returns spans for ModAck, Nack, and Ack RPC requests. Bi-directional links + * to parent subscribe span for sampled messages are added. + */ + Span startSubscribeRpcSpan( + String subscription, + String rpcOperation, + List messages, + int ackDeadline, + boolean isReceiptModack) { + if (!enabled) { + return null; + } + String codeFunction = rpcOperation == "ack" ? "sendAckOperations" : "sendModAckOperations"; + SubscriptionName subscriptionName = SubscriptionName.parse(subscription); + AttributesBuilder attributesBuilder = + createCommonSpanAttributesBuilder( + subscriptionName.getSubscription(), + subscriptionName.getProject(), + codeFunction, + rpcOperation) + .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size()); + + // Ack deadline and receipt modack are specific to the modack operation + if (rpcOperation == "modack") { + attributesBuilder + .put(ACK_DEADLINE_ATTR_KEY, ackDeadline) + .put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack); + } + + SpanBuilder rpcSpanBuilder = + tracer + .spanBuilder(subscriptionName.getSubscription() + " " + rpcOperation) + .setSpanKind(SpanKind.CLIENT) + .setAllAttributes(attributesBuilder.build()); + Attributes linkAttributes = + Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, rpcOperation).build(); + for (PubsubMessageWrapper message : messages) { + if (message.getSubscriberSpan().getSpanContext().isSampled()) { + rpcSpanBuilder.addLink(message.getSubscriberSpan().getSpanContext(), linkAttributes); + } + } + Span rpcSpan = rpcSpanBuilder.startSpan(); + + for (PubsubMessageWrapper message : messages) { + if (rpcSpan.getSpanContext().isSampled()) { + message.getSubscriberSpan().addLink(rpcSpan.getSpanContext(), linkAttributes); + switch (rpcOperation) { + case "ack": + message.addAckStartEvent(); + break; + case "modack": + message.addModAckStartEvent(); + break; + case "nack": + message.addNackStartEvent(); + break; + } + } + } + return rpcSpan; + } + + /** Ends the given subscribe RPC span if it exists. */ + void endSubscribeRpcSpan(Span rpcSpan) { + if (!enabled) { + return; + } + if (rpcSpan != null) { + rpcSpan.end(); + } + } + + /** + * Sets an error status and records an exception when an exception is thrown when handling a + * subscribe-side RPC. + */ + void setSubscribeRpcSpanException(Span rpcSpan, boolean isModack, int ackDeadline, Throwable t) { + if (!enabled) { + return; + } + if (rpcSpan != null) { + String operation = !isModack ? "ack" : (ackDeadline == 0 ? "nack" : "modack"); + rpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on " + operation + " RPC."); + rpcSpan.recordException(t); + rpcSpan.end(); + } + } + + /** Adds the appropriate subscribe-side RPC end event. */ + void addEndRpcEvent( + PubsubMessageWrapper message, boolean rpcSampled, boolean isModack, int ackDeadline) { + if (!enabled || !rpcSampled) { + return; + } + if (!isModack) { + message.addAckEndEvent(); + } else if (ackDeadline == 0) { + message.addNackEndEvent(); + } else { + message.addModAckEndEvent(); + } + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java new file mode 100644 index 000000000..94fd13085 --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java @@ -0,0 +1,430 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.TextMapSetter; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; + +/** + * A wrapper class for a {@link PubsubMessage} object that handles creation and tracking of + * OpenTelemetry {@link Span} objects for different operations that occur during publishing. + */ +public class PubsubMessageWrapper { + private PubsubMessage message; + + private final TopicName topicName; + private final SubscriptionName subscriptionName; + + // Attributes set only for messages received from a streaming pull response. + private final String ackId; + private final int deliveryAttempt; + + private static final String PUBLISH_START_EVENT = "publish start"; + private static final String PUBLISH_END_EVENT = "publish end"; + + private static final String MODACK_START_EVENT = "modack start"; + private static final String MODACK_END_EVENT = "modack end"; + private static final String NACK_START_EVENT = "nack start"; + private static final String NACK_END_EVENT = "nack end"; + private static final String ACK_START_EVENT = "ack start"; + private static final String ACK_END_EVENT = "ack end"; + + private static final String GOOGCLIENT_PREFIX = "googclient_"; + + private static final String MESSAGE_RESULT_ATTR_KEY = "messaging.gcp_pubsub.result"; + + private Span publisherSpan; + private Span publishFlowControlSpan; + private Span publishBatchingSpan; + + private Span subscriberSpan; + private Span subscribeConcurrencyControlSpan; + private Span subscribeSchedulerSpan; + private Span subscribeProcessSpan; + + private PubsubMessageWrapper(Builder builder) { + this.message = builder.message; + this.topicName = builder.topicName; + this.subscriptionName = builder.subscriptionName; + this.ackId = builder.ackId; + this.deliveryAttempt = builder.deliveryAttempt; + } + + static Builder newBuilder(PubsubMessage message, String topicName) { + return new Builder(message, topicName); + } + + static Builder newBuilder( + PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) { + return new Builder(message, subscriptionName, ackId, deliveryAttempt); + } + + /** Returns the PubsubMessage associated with this wrapper. */ + PubsubMessage getPubsubMessage() { + return message; + } + + void setPubsubMessage(PubsubMessage message) { + this.message = message; + } + + /** Returns the TopicName for this wrapper as a string. */ + String getTopicName() { + if (topicName != null) { + return topicName.getTopic(); + } + return ""; + } + + String getTopicProject() { + if (topicName != null) { + return topicName.getProject(); + } + return ""; + } + + /** Returns the SubscriptionName for this wrapper as a string. */ + String getSubscriptionName() { + if (subscriptionName != null) { + return subscriptionName.getSubscription(); + } + return ""; + } + + String getSubscriptionProject() { + if (subscriptionName != null) { + return subscriptionName.getProject(); + } + return ""; + } + + String getMessageId() { + return message.getMessageId(); + } + + String getAckId() { + return ackId; + } + + int getDataSize() { + return message.getData().size(); + } + + String getOrderingKey() { + return message.getOrderingKey(); + } + + int getDeliveryAttempt() { + return deliveryAttempt; + } + + Span getPublisherSpan() { + return publisherSpan; + } + + void setPublisherSpan(Span span) { + this.publisherSpan = span; + } + + void setPublishFlowControlSpan(Span span) { + this.publishFlowControlSpan = span; + } + + void setPublishBatchingSpan(Span span) { + this.publishBatchingSpan = span; + } + + Span getSubscriberSpan() { + return subscriberSpan; + } + + void setSubscriberSpan(Span span) { + this.subscriberSpan = span; + } + + void setSubscribeConcurrencyControlSpan(Span span) { + this.subscribeConcurrencyControlSpan = span; + } + + void setSubscribeSchedulerSpan(Span span) { + this.subscribeSchedulerSpan = span; + } + + void setSubscribeProcessSpan(Span span) { + this.subscribeProcessSpan = span; + } + + /** Creates a publish start event that is tied to the publish RPC span time. */ + void addPublishStartEvent() { + if (publisherSpan != null) { + publisherSpan.addEvent(PUBLISH_START_EVENT); + } + } + + /** + * Sets the message ID attribute in the publisher parent span. This is called after the publish + * RPC returns with a message ID. + */ + void setPublisherMessageIdSpanAttribute(String messageId) { + if (publisherSpan != null) { + publisherSpan.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId); + } + } + + /** Ends the publisher parent span if it exists. */ + void endPublisherSpan() { + if (publisherSpan != null) { + publisherSpan.addEvent(PUBLISH_END_EVENT); + publisherSpan.end(); + } + } + + /** Ends the publish flow control span if it exists. */ + void endPublishFlowControlSpan() { + if (publishFlowControlSpan != null) { + publishFlowControlSpan.end(); + } + } + + /** Ends the publish batching span if it exists. */ + void endPublishBatchingSpan() { + if (publishBatchingSpan != null) { + publishBatchingSpan.end(); + } + } + + /** + * Sets an error status and records an exception when an exception is thrown during flow control. + */ + void setPublishFlowControlSpanException(Throwable t) { + if (publishFlowControlSpan != null) { + publishFlowControlSpan.setStatus( + StatusCode.ERROR, "Exception thrown during publish flow control."); + publishFlowControlSpan.recordException(t); + endAllPublishSpans(); + } + } + + /** + * Creates start and end events for ModAcks, Nacks, and Acks that are tied to the corresponding + * RPC span start and end times. + */ + void addModAckStartEvent() { + if (subscriberSpan != null) { + subscriberSpan.addEvent(MODACK_START_EVENT); + } + } + + void addModAckEndEvent() { + if (subscriberSpan != null) { + subscriberSpan.addEvent(MODACK_END_EVENT); + } + } + + void addNackStartEvent() { + if (subscriberSpan != null) { + subscriberSpan.addEvent(NACK_START_EVENT); + } + } + + void addNackEndEvent() { + if (subscriberSpan != null) { + subscriberSpan.addEvent(NACK_END_EVENT); + } + } + + void addAckStartEvent() { + if (subscriberSpan != null) { + subscriberSpan.addEvent(ACK_START_EVENT); + } + } + + void addAckEndEvent() { + if (subscriberSpan != null) { + subscriberSpan.addEvent(ACK_END_EVENT); + } + } + + /** Ends the subscriber parent span if exists. */ + void endSubscriberSpan() { + if (subscriberSpan != null) { + subscriberSpan.end(); + } + } + + /** Ends the subscribe concurreny control span if exists. */ + void endSubscribeConcurrencyControlSpan() { + if (subscribeConcurrencyControlSpan != null) { + subscribeConcurrencyControlSpan.end(); + } + } + + /** Ends the subscribe scheduler span if exists. */ + void endSubscribeSchedulerSpan() { + if (subscribeSchedulerSpan != null) { + subscribeSchedulerSpan.end(); + } + } + + /** + * Ends the subscribe process span if it exists, creates an event with the appropriate result, and + * sets the result on the parent subscriber span. + */ + void endSubscribeProcessSpan(String action) { + if (subscribeProcessSpan != null) { + subscribeProcessSpan.addEvent(action + " called"); + subscribeProcessSpan.end(); + subscriberSpan.setAttribute(MESSAGE_RESULT_ATTR_KEY, action); + } + } + + /** Sets an exception on the subscriber span during Ack/ModAck/Nack failures */ + void setSubscriberSpanException(Throwable t, String exception) { + if (subscriberSpan != null) { + subscriberSpan.setStatus(StatusCode.ERROR, exception); + subscriberSpan.recordException(t); + endAllSubscribeSpans(); + } + } + + /** Sets result of the parent subscriber span to expired and ends its. */ + void setSubscriberSpanExpirationResult() { + if (subscriberSpan != null) { + subscriberSpan.setAttribute(MESSAGE_RESULT_ATTR_KEY, "expired"); + endSubscriberSpan(); + } + } + + /** + * Sets an error status and records an exception when an exception is thrown subscriber + * concurrency control. + */ + void setSubscribeConcurrencyControlSpanException(Throwable t) { + if (subscribeConcurrencyControlSpan != null) { + subscribeConcurrencyControlSpan.setStatus( + StatusCode.ERROR, "Exception thrown during subscribe concurrency control."); + subscribeConcurrencyControlSpan.recordException(t); + endAllSubscribeSpans(); + } + } + + /** Ends all publisher-side spans associated with this message wrapper. */ + private void endAllPublishSpans() { + endPublishFlowControlSpan(); + endPublishBatchingSpan(); + endPublisherSpan(); + } + + /** Ends all subscriber-side spans associated with this message wrapper. */ + private void endAllSubscribeSpans() { + endSubscribeConcurrencyControlSpan(); + endSubscribeSchedulerSpan(); + endSubscriberSpan(); + } + + /** + * Injects the span context into the attributes of a Pub/Sub message for propagation to the + * subscriber client. + */ + void injectSpanContext() { + TextMapSetter injectMessageAttributes = + new TextMapSetter() { + @Override + public void set(PubsubMessageWrapper carrier, String key, String value) { + PubsubMessage newMessage = + PubsubMessage.newBuilder(carrier.message) + .putAttributes(GOOGCLIENT_PREFIX + key, value) + .build(); + carrier.message = newMessage; + } + }; + W3CTraceContextPropagator.getInstance() + .inject(Context.current().with(publisherSpan), this, injectMessageAttributes); + } + + /** + * Extracts the span context from the attributes of a Pub/Sub message and creates the parent + * subscriber span using that context. + */ + Context extractSpanContext(Attributes attributes) { + TextMapGetter extractMessageAttributes = + new TextMapGetter() { + @Override + public String get(PubsubMessageWrapper carrier, String key) { + return carrier.message.getAttributesOrDefault(GOOGCLIENT_PREFIX + key, ""); + } + + public Iterable keys(PubsubMessageWrapper carrier) { + return carrier.message.getAttributesMap().keySet(); + } + }; + Context context = + W3CTraceContextPropagator.getInstance() + .extract(Context.current(), this, extractMessageAttributes); + return context; + } + + /** Builder of {@link PubsubMessageWrapper PubsubMessageWrapper}. */ + static final class Builder { + private PubsubMessage message = null; + private TopicName topicName = null; + private SubscriptionName subscriptionName = null; + private String ackId = null; + private int deliveryAttempt = 0; + + public Builder(PubsubMessage message, String topicName) { + this.message = message; + if (topicName != null) { + this.topicName = TopicName.parse(topicName); + } + } + + public Builder( + PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) { + this.message = message; + if (subscriptionName != null) { + this.subscriptionName = SubscriptionName.parse(subscriptionName); + } + this.ackId = ackId; + this.deliveryAttempt = deliveryAttempt; + } + + public Builder( + PubsubMessage message, + SubscriptionName subscriptionName, + String ackId, + int deliveryAttempt) { + this.message = message; + this.subscriptionName = subscriptionName; + this.ackId = ackId; + this.deliveryAttempt = deliveryAttempt; + } + + public PubsubMessageWrapper build() { + return new PubsubMessageWrapper(this); + } + } +} diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java new file mode 100644 index 000000000..b4433f41e --- /dev/null +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java @@ -0,0 +1,669 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.sdk.testing.assertj.AttributesAssert; +import io.opentelemetry.sdk.testing.assertj.EventDataAssert; +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; +import io.opentelemetry.sdk.testing.assertj.SpanDataAssert; +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import java.util.Arrays; +import java.util.List; +import org.junit.Test; + +public class OpenTelemetryTest { + private static final TopicName FULL_TOPIC_NAME = + TopicName.parse("projects/test-project/topics/test-topic"); + private static final SubscriptionName FULL_SUBSCRIPTION_NAME = + SubscriptionName.parse("projects/test-project/subscriptions/test-sub"); + private static final String PROJECT_NAME = "test-project"; + private static final String ORDERING_KEY = "abc"; + private static final String MESSAGE_ID = "m0"; + private static final String ACK_ID = "def"; + private static final int DELIVERY_ATTEMPT = 1; + private static final int ACK_DEADLINE = 10; + private static final boolean EXACTLY_ONCE_ENABLED = true; + + private static final String PUBLISHER_SPAN_NAME = FULL_TOPIC_NAME.getTopic() + " create"; + private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publisher flow control"; + private static final String PUBLISH_BATCHING_SPAN_NAME = "publisher batching"; + private static final String PUBLISH_RPC_SPAN_NAME = FULL_TOPIC_NAME.getTopic() + " publish"; + private static final String PUBLISH_START_EVENT = "publish start"; + private static final String PUBLISH_END_EVENT = "publish end"; + + private static final String SUBSCRIBER_SPAN_NAME = + FULL_SUBSCRIPTION_NAME.getSubscription() + " subscribe"; + private static final String SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME = + "subscriber concurrency control"; + private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler"; + private static final String SUBSCRIBE_PROCESS_SPAN_NAME = + FULL_SUBSCRIPTION_NAME.getSubscription() + " process"; + private static final String SUBSCRIBE_MODACK_RPC_SPAN_NAME = + FULL_SUBSCRIPTION_NAME.getSubscription() + " modack"; + private static final String SUBSCRIBE_ACK_RPC_SPAN_NAME = + FULL_SUBSCRIPTION_NAME.getSubscription() + " ack"; + private static final String SUBSCRIBE_NACK_RPC_SPAN_NAME = + FULL_SUBSCRIPTION_NAME.getSubscription() + " nack"; + + private static final String PROCESS_ACTION = "ack"; + private static final String MODACK_START_EVENT = "modack start"; + private static final String MODACK_END_EVENT = "modack end"; + private static final String NACK_START_EVENT = "nack start"; + private static final String NACK_END_EVENT = "nack end"; + private static final String ACK_START_EVENT = "ack start"; + private static final String ACK_END_EVENT = "ack end"; + + private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub"; + private static final String PROJECT_ATTR_KEY = "gcp.project_id"; + private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size"; + private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key"; + private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline"; + private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack"; + private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id"; + private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY = + "messaging.gcp_pubsub.message.exactly_once_delivery"; + private static final String MESSAGE_RESULT_ATTR_KEY = "messaging.gcp_pubsub.result"; + private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY = + "messaging.gcp_pubsub.message.delivery_attempt"; + + private static final String TRACEPARENT_ATTRIBUTE = "googclient_traceparent"; + + private static final OpenTelemetryRule openTelemetryTesting = OpenTelemetryRule.create(); + + @Test + public void testPublishSpansSuccess() { + openTelemetryTesting.clearSpans(); + + PubsubMessageWrapper messageWrapper = + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); + List messageWrappers = Arrays.asList(messageWrapper); + + long messageSize = messageWrapper.getPubsubMessage().getData().size(); + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); + + // Call all span start/end methods in the expected order + tracer.startPublisherSpan(messageWrapper); + tracer.startPublishFlowControlSpan(messageWrapper); + tracer.endPublishFlowControlSpan(messageWrapper); + tracer.startPublishBatchingSpan(messageWrapper); + tracer.endPublishBatchingSpan(messageWrapper); + Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers); + tracer.endPublishRpcSpan(publishRpcSpan); + tracer.setPublisherMessageIdSpanAttribute(messageWrapper, MESSAGE_ID); + tracer.endPublisherSpan(messageWrapper); + + List allSpans = openTelemetryTesting.getSpans(); + assertEquals(4, allSpans.size()); + SpanData flowControlSpanData = allSpans.get(0); + SpanData batchingSpanData = allSpans.get(1); + SpanData publishRpcSpanData = allSpans.get(2); + SpanData publisherSpanData = allSpans.get(3); + + SpanDataAssert flowControlSpanDataAssert = + OpenTelemetryAssertions.assertThat(flowControlSpanData); + flowControlSpanDataAssert + .hasName(PUBLISH_FLOW_CONTROL_SPAN_NAME) + .hasParent(publisherSpanData) + .hasEnded(); + + SpanDataAssert batchingSpanDataAssert = OpenTelemetryAssertions.assertThat(batchingSpanData); + batchingSpanDataAssert + .hasName(PUBLISH_BATCHING_SPAN_NAME) + .hasParent(publisherSpanData) + .hasEnded(); + + // Check span data, links, and attributes for the publish RPC span + SpanDataAssert publishRpcSpanDataAssert = + OpenTelemetryAssertions.assertThat(publishRpcSpanData); + publishRpcSpanDataAssert + .hasName(PUBLISH_RPC_SPAN_NAME) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasEnded(); + + List publishRpcLinks = publishRpcSpanData.getLinks(); + assertEquals(messageWrappers.size(), publishRpcLinks.size()); + assertEquals(publisherSpanData.getSpanContext(), publishRpcLinks.get(0).getSpanContext()); + + assertEquals(6, publishRpcSpanData.getAttributes().size()); + AttributesAssert publishRpcSpanAttributesAssert = + OpenTelemetryAssertions.assertThat(publishRpcSpanData.getAttributes()); + publishRpcSpanAttributesAssert + .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) + .containsEntry(SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_TOPIC_NAME.getTopic()) + .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) + .containsEntry(SemanticAttributes.CODE_FUNCTION, "publishCall") + .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "publish") + .containsEntry(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messageWrappers.size()); + + // Check span data, events, links, and attributes for the publisher create span + SpanDataAssert publisherSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData); + publisherSpanDataAssert + .hasName(PUBLISHER_SPAN_NAME) + .hasKind(SpanKind.PRODUCER) + .hasNoParent() + .hasEnded(); + + assertEquals(2, publisherSpanData.getEvents().size()); + EventDataAssert startEventAssert = + OpenTelemetryAssertions.assertThat(publisherSpanData.getEvents().get(0)); + startEventAssert.hasName(PUBLISH_START_EVENT); + EventDataAssert endEventAssert = + OpenTelemetryAssertions.assertThat(publisherSpanData.getEvents().get(1)); + endEventAssert.hasName(PUBLISH_END_EVENT); + + List publisherLinks = publisherSpanData.getLinks(); + assertEquals(1, publisherLinks.size()); + assertEquals(publishRpcSpanData.getSpanContext(), publisherLinks.get(0).getSpanContext()); + + assertEquals(8, publisherSpanData.getAttributes().size()); + AttributesAssert publisherSpanAttributesAssert = + OpenTelemetryAssertions.assertThat(publisherSpanData.getAttributes()); + publisherSpanAttributesAssert + .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) + .containsEntry(SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_TOPIC_NAME.getTopic()) + .containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME) + .containsEntry(SemanticAttributes.CODE_FUNCTION, "publish") + .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "create") + .containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY) + .containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize) + .containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID); + + // Check that the message has the attribute containing the trace context. + PubsubMessage message = messageWrapper.getPubsubMessage(); + assertEquals(1, message.getAttributesMap().size()); + assertTrue(message.containsAttributes(TRACEPARENT_ATTRIBUTE)); + assertTrue( + message + .getAttributesOrDefault(TRACEPARENT_ATTRIBUTE, "") + .contains(publisherSpanData.getTraceId())); + assertTrue( + message + .getAttributesOrDefault(TRACEPARENT_ATTRIBUTE, "") + .contains(publisherSpanData.getSpanId())); + } + + @Test + public void testPublishFlowControlSpanFailure() { + openTelemetryTesting.clearSpans(); + + PubsubMessageWrapper messageWrapper = + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); + + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); + + tracer.startPublisherSpan(messageWrapper); + tracer.startPublishFlowControlSpan(messageWrapper); + + Exception e = new Exception("test-exception"); + tracer.setPublishFlowControlSpanException(messageWrapper, e); + + List allSpans = openTelemetryTesting.getSpans(); + assertEquals(2, allSpans.size()); + SpanData flowControlSpanData = allSpans.get(0); + SpanData publisherSpanData = allSpans.get(1); + + SpanDataAssert flowControlSpanDataAssert = + OpenTelemetryAssertions.assertThat(flowControlSpanData); + StatusData expectedStatus = + StatusData.create(StatusCode.ERROR, "Exception thrown during publish flow control."); + flowControlSpanDataAssert + .hasName(PUBLISH_FLOW_CONTROL_SPAN_NAME) + .hasParent(publisherSpanData) + .hasStatus(expectedStatus) + .hasException(e) + .hasEnded(); + + SpanDataAssert publisherSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData); + publisherSpanDataAssert + .hasName(PUBLISHER_SPAN_NAME) + .hasKind(SpanKind.PRODUCER) + .hasNoParent() + .hasEnded(); + } + + @Test + public void testPublishRpcSpanFailure() { + openTelemetryTesting.clearSpans(); + + PubsubMessageWrapper messageWrapper = + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); + + List messageWrappers = Arrays.asList(messageWrapper); + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); + + tracer.startPublisherSpan(messageWrapper); + Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers); + + Exception e = new Exception("test-exception"); + tracer.setPublishRpcSpanException(publishRpcSpan, e); + tracer.endPublisherSpan(messageWrapper); + + List allSpans = openTelemetryTesting.getSpans(); + assertEquals(2, allSpans.size()); + SpanData rpcSpanData = allSpans.get(0); + SpanData publisherSpanData = allSpans.get(1); + + SpanDataAssert rpcSpanDataAssert = OpenTelemetryAssertions.assertThat(rpcSpanData); + StatusData expectedStatus = + StatusData.create(StatusCode.ERROR, "Exception thrown on publish RPC."); + rpcSpanDataAssert + .hasName(PUBLISH_RPC_SPAN_NAME) + .hasKind(SpanKind.CLIENT) + .hasStatus(expectedStatus) + .hasException(e) + .hasEnded(); + + SpanDataAssert publisherSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData); + publisherSpanDataAssert + .hasName(PUBLISHER_SPAN_NAME) + .hasKind(SpanKind.PRODUCER) + .hasNoParent() + .hasEnded(); + } + + @Test + public void testSubscribeSpansSuccess() { + openTelemetryTesting.clearSpans(); + + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); + + PubsubMessageWrapper publishMessageWrapper = + PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); + // Initialize the Publisher span to inject the context in the message + tracer.startPublisherSpan(publishMessageWrapper); + tracer.endPublisherSpan(publishMessageWrapper); + + PubsubMessage publishedMessage = + publishMessageWrapper.getPubsubMessage().toBuilder().setMessageId(MESSAGE_ID).build(); + PubsubMessageWrapper subscribeMessageWrapper = + PubsubMessageWrapper.newBuilder( + publishedMessage, FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, 1) + .build(); + List subscribeMessageWrappers = Arrays.asList(subscribeMessageWrapper); + + long messageSize = subscribeMessageWrapper.getPubsubMessage().getData().size(); + + // Call all span start/end methods in the expected order + tracer.startSubscriberSpan(subscribeMessageWrapper, EXACTLY_ONCE_ENABLED); + tracer.startSubscribeConcurrencyControlSpan(subscribeMessageWrapper); + tracer.endSubscribeConcurrencyControlSpan(subscribeMessageWrapper); + tracer.startSubscribeSchedulerSpan(subscribeMessageWrapper); + tracer.endSubscribeSchedulerSpan(subscribeMessageWrapper); + tracer.startSubscribeProcessSpan(subscribeMessageWrapper); + tracer.endSubscribeProcessSpan(subscribeMessageWrapper, PROCESS_ACTION); + Span subscribeModackRpcSpan = + tracer.startSubscribeRpcSpan( + FULL_SUBSCRIPTION_NAME.toString(), + "modack", + subscribeMessageWrappers, + ACK_DEADLINE, + true); + tracer.endSubscribeRpcSpan(subscribeModackRpcSpan); + tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, ACK_DEADLINE); + Span subscribeAckRpcSpan = + tracer.startSubscribeRpcSpan( + FULL_SUBSCRIPTION_NAME.toString(), "ack", subscribeMessageWrappers, 0, false); + tracer.endSubscribeRpcSpan(subscribeAckRpcSpan); + tracer.addEndRpcEvent(subscribeMessageWrapper, true, false, 0); + Span subscribeNackRpcSpan = + tracer.startSubscribeRpcSpan( + FULL_SUBSCRIPTION_NAME.toString(), "nack", subscribeMessageWrappers, 0, false); + tracer.endSubscribeRpcSpan(subscribeNackRpcSpan); + tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, 0); + tracer.endSubscriberSpan(subscribeMessageWrapper); + + List allSpans = openTelemetryTesting.getSpans(); + assertEquals(8, allSpans.size()); + + SpanData publisherSpanData = allSpans.get(0); + SpanData concurrencyControlSpanData = allSpans.get(1); + SpanData schedulerSpanData = allSpans.get(2); + SpanData processSpanData = allSpans.get(3); + SpanData modackRpcSpanData = allSpans.get(4); + SpanData ackRpcSpanData = allSpans.get(5); + SpanData nackRpcSpanData = allSpans.get(6); + SpanData subscriberSpanData = allSpans.get(7); + + SpanDataAssert concurrencyControlSpanDataAssert = + OpenTelemetryAssertions.assertThat(concurrencyControlSpanData); + concurrencyControlSpanDataAssert + .hasName(SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME) + .hasParent(subscriberSpanData) + .hasEnded(); + + SpanDataAssert schedulerSpanDataAssert = OpenTelemetryAssertions.assertThat(schedulerSpanData); + schedulerSpanDataAssert + .hasName(SUBSCRIBE_SCHEDULER_SPAN_NAME) + .hasParent(subscriberSpanData) + .hasEnded(); + + SpanDataAssert processSpanDataAssert = OpenTelemetryAssertions.assertThat(processSpanData); + processSpanDataAssert + .hasName(SUBSCRIBE_PROCESS_SPAN_NAME) + .hasParent(subscriberSpanData) + .hasEnded(); + + assertEquals(1, processSpanData.getEvents().size()); + EventDataAssert actionCalledEventAssert = + OpenTelemetryAssertions.assertThat(processSpanData.getEvents().get(0)); + actionCalledEventAssert.hasName(PROCESS_ACTION + " called"); + + // Check span data, links, and attributes for the modack RPC span + SpanDataAssert modackRpcSpanDataAssert = OpenTelemetryAssertions.assertThat(modackRpcSpanData); + modackRpcSpanDataAssert + .hasName(SUBSCRIBE_MODACK_RPC_SPAN_NAME) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasEnded(); + + List modackRpcLinks = modackRpcSpanData.getLinks(); + assertEquals(subscribeMessageWrappers.size(), modackRpcLinks.size()); + assertEquals(subscriberSpanData.getSpanContext(), modackRpcLinks.get(0).getSpanContext()); + + assertEquals(8, modackRpcSpanData.getAttributes().size()); + AttributesAssert modackRpcSpanAttributesAssert = + OpenTelemetryAssertions.assertThat(modackRpcSpanData.getAttributes()); + modackRpcSpanAttributesAssert + .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) + .containsEntry( + SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) + .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) + .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendModAckOperations") + .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "modack") + .containsEntry( + SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size()) + .containsEntry(ACK_DEADLINE_ATTR_KEY, 10) + .containsEntry(RECEIPT_MODACK_ATTR_KEY, true); + + // Check span data, links, and attributes for the ack RPC span + SpanDataAssert ackRpcSpanDataAssert = OpenTelemetryAssertions.assertThat(ackRpcSpanData); + ackRpcSpanDataAssert + .hasName(SUBSCRIBE_ACK_RPC_SPAN_NAME) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasEnded(); + + List ackRpcLinks = ackRpcSpanData.getLinks(); + assertEquals(subscribeMessageWrappers.size(), ackRpcLinks.size()); + assertEquals(subscriberSpanData.getSpanContext(), ackRpcLinks.get(0).getSpanContext()); + + assertEquals(6, ackRpcSpanData.getAttributes().size()); + AttributesAssert ackRpcSpanAttributesAssert = + OpenTelemetryAssertions.assertThat(ackRpcSpanData.getAttributes()); + ackRpcSpanAttributesAssert + .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) + .containsEntry( + SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) + .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) + .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendAckOperations") + .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "ack") + .containsEntry( + SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size()); + + // Check span data, links, and attributes for the nack RPC span + SpanDataAssert nackRpcSpanDataAssert = OpenTelemetryAssertions.assertThat(nackRpcSpanData); + nackRpcSpanDataAssert + .hasName(SUBSCRIBE_NACK_RPC_SPAN_NAME) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasEnded(); + + List nackRpcLinks = nackRpcSpanData.getLinks(); + assertEquals(subscribeMessageWrappers.size(), nackRpcLinks.size()); + assertEquals(subscriberSpanData.getSpanContext(), nackRpcLinks.get(0).getSpanContext()); + + assertEquals(6, nackRpcSpanData.getAttributes().size()); + AttributesAssert nackRpcSpanAttributesAssert = + OpenTelemetryAssertions.assertThat(nackRpcSpanData.getAttributes()); + nackRpcSpanAttributesAssert + .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) + .containsEntry( + SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) + .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) + .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendModAckOperations") + .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "nack") + .containsEntry( + SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size()); + + // Check span data, events, links, and attributes for the publisher create span + SpanDataAssert subscriberSpanDataAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData); + subscriberSpanDataAssert + .hasName(SUBSCRIBER_SPAN_NAME) + .hasKind(SpanKind.CONSUMER) + .hasParent(publisherSpanData) + .hasEnded(); + + assertEquals(6, subscriberSpanData.getEvents().size()); + EventDataAssert startModackEventAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(0)); + startModackEventAssert.hasName(MODACK_START_EVENT); + EventDataAssert endModackEventAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(1)); + endModackEventAssert.hasName(MODACK_END_EVENT); + EventDataAssert startAckEventAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(2)); + startAckEventAssert.hasName(ACK_START_EVENT); + EventDataAssert endAckEventAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(3)); + endAckEventAssert.hasName(ACK_END_EVENT); + EventDataAssert startNackEventAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(4)); + startNackEventAssert.hasName(NACK_START_EVENT); + EventDataAssert endNackEventAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(5)); + endNackEventAssert.hasName(NACK_END_EVENT); + + List subscriberLinks = subscriberSpanData.getLinks(); + assertEquals(3, subscriberLinks.size()); + assertEquals(modackRpcSpanData.getSpanContext(), subscriberLinks.get(0).getSpanContext()); + assertEquals(ackRpcSpanData.getSpanContext(), subscriberLinks.get(1).getSpanContext()); + assertEquals(nackRpcSpanData.getSpanContext(), subscriberLinks.get(2).getSpanContext()); + + assertEquals(11, subscriberSpanData.getAttributes().size()); + AttributesAssert subscriberSpanAttributesAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData.getAttributes()); + subscriberSpanAttributesAssert + .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) + .containsEntry( + SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) + .containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME) + .containsEntry(SemanticAttributes.CODE_FUNCTION, "onResponse") + .containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize) + .containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY) + .containsEntry(MESSAGE_ACK_ID_ATTR_KEY, ACK_ID) + .containsEntry(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, DELIVERY_ATTEMPT) + .containsEntry(MESSAGE_EXACTLY_ONCE_ATTR_KEY, EXACTLY_ONCE_ENABLED) + .containsEntry(MESSAGE_RESULT_ATTR_KEY, PROCESS_ACTION) + .containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID); + } + + @Test + public void testSubscribeConcurrencyControlSpanFailure() { + openTelemetryTesting.clearSpans(); + + PubsubMessageWrapper messageWrapper = + PubsubMessageWrapper.newBuilder( + getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) + .build(); + + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); + + tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED); + tracer.startSubscribeConcurrencyControlSpan(messageWrapper); + + Exception e = new Exception("test-exception"); + tracer.setSubscribeConcurrencyControlSpanException(messageWrapper, e); + + List allSpans = openTelemetryTesting.getSpans(); + assertEquals(2, allSpans.size()); + SpanData concurrencyControlSpanData = allSpans.get(0); + SpanData subscriberSpanData = allSpans.get(1); + + SpanDataAssert concurrencyControlSpanDataAssert = + OpenTelemetryAssertions.assertThat(concurrencyControlSpanData); + StatusData expectedStatus = + StatusData.create( + StatusCode.ERROR, "Exception thrown during subscribe concurrency control."); + concurrencyControlSpanDataAssert + .hasName(SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME) + .hasParent(subscriberSpanData) + .hasStatus(expectedStatus) + .hasException(e) + .hasEnded(); + + SpanDataAssert subscriberSpanDataAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData); + subscriberSpanDataAssert + .hasName(SUBSCRIBER_SPAN_NAME) + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasEnded(); + } + + @Test + public void testSubscriberSpanFailure() { + openTelemetryTesting.clearSpans(); + + PubsubMessageWrapper messageWrapper = + PubsubMessageWrapper.newBuilder( + getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) + .build(); + + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); + + tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED); + + Exception e = new Exception("test-exception"); + tracer.setSubscriberSpanException(messageWrapper, e, "Test exception"); + + List allSpans = openTelemetryTesting.getSpans(); + assertEquals(1, allSpans.size()); + SpanData subscriberSpanData = allSpans.get(0); + + StatusData expectedStatus = StatusData.create(StatusCode.ERROR, "Test exception"); + SpanDataAssert subscriberSpanDataAssert = + OpenTelemetryAssertions.assertThat(subscriberSpanData); + subscriberSpanDataAssert + .hasName(SUBSCRIBER_SPAN_NAME) + .hasKind(SpanKind.CONSUMER) + .hasNoParent() + .hasStatus(expectedStatus) + .hasException(e) + .hasEnded(); + } + + @Test + public void testSubscribeRpcSpanFailures() { + openTelemetryTesting.clearSpans(); + + PubsubMessageWrapper messageWrapper = + PubsubMessageWrapper.newBuilder( + getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) + .build(); + List messageWrappers = Arrays.asList(messageWrapper); + + Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); + OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); + + tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED); + Span subscribeModackRpcSpan = + tracer.startSubscribeRpcSpan( + FULL_SUBSCRIPTION_NAME.toString(), "modack", messageWrappers, ACK_DEADLINE, true); + Span subscribeAckRpcSpan = + tracer.startSubscribeRpcSpan( + FULL_SUBSCRIPTION_NAME.toString(), "ack", messageWrappers, 0, false); + Span subscribeNackRpcSpan = + tracer.startSubscribeRpcSpan( + FULL_SUBSCRIPTION_NAME.toString(), "nack", messageWrappers, 0, false); + + Exception e = new Exception("test-exception"); + tracer.setSubscribeRpcSpanException(subscribeModackRpcSpan, true, ACK_DEADLINE, e); + tracer.setSubscribeRpcSpanException(subscribeAckRpcSpan, false, 0, e); + tracer.setSubscribeRpcSpanException(subscribeNackRpcSpan, true, 0, e); + tracer.endSubscriberSpan(messageWrapper); + + List allSpans = openTelemetryTesting.getSpans(); + assertEquals(4, allSpans.size()); + SpanData modackSpanData = allSpans.get(0); + SpanData ackSpanData = allSpans.get(1); + SpanData nackSpanData = allSpans.get(2); + SpanData subscriberSpanData = allSpans.get(3); + + StatusData expectedModackStatus = + StatusData.create(StatusCode.ERROR, "Exception thrown on modack RPC."); + SpanDataAssert modackSpanDataAssert = OpenTelemetryAssertions.assertThat(modackSpanData); + modackSpanDataAssert + .hasName(SUBSCRIBE_MODACK_RPC_SPAN_NAME) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasStatus(expectedModackStatus) + .hasException(e) + .hasEnded(); + + StatusData expectedAckStatus = + StatusData.create(StatusCode.ERROR, "Exception thrown on ack RPC."); + SpanDataAssert ackSpanDataAssert = OpenTelemetryAssertions.assertThat(ackSpanData); + ackSpanDataAssert + .hasName(SUBSCRIBE_ACK_RPC_SPAN_NAME) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasStatus(expectedAckStatus) + .hasException(e) + .hasEnded(); + + StatusData expectedNackStatus = + StatusData.create(StatusCode.ERROR, "Exception thrown on nack RPC."); + SpanDataAssert nackSpanDataAssert = OpenTelemetryAssertions.assertThat(nackSpanData); + nackSpanDataAssert + .hasName(SUBSCRIBE_NACK_RPC_SPAN_NAME) + .hasKind(SpanKind.CLIENT) + .hasNoParent() + .hasStatus(expectedNackStatus) + .hasException(e) + .hasEnded(); + } + + private PubsubMessage getPubsubMessage() { + return PubsubMessage.newBuilder() + .setData(ByteString.copyFromUtf8("test-data")) + .setOrderingKey(ORDERING_KEY) + .build(); + } +}