diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java deleted file mode 100644 index b946f44bf..000000000 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenTelemetryPubsubTracer.java +++ /dev/null @@ -1,460 +0,0 @@ -/* - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsub.v1; - -import com.google.pubsub.v1.PubsubMessage; -import com.google.pubsub.v1.SubscriptionName; -import com.google.pubsub.v1.TopicName; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanBuilder; -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.context.Context; -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; -import java.util.List; - -public class OpenTelemetryPubsubTracer { - private final Tracer tracer; - private boolean enabled = false; - - private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publisher flow control"; - private static final String PUBLISH_BATCHING_SPAN_NAME = "publisher batching"; - private static final String SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME = - "subscriber concurrency control"; - private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler"; - - private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size"; - private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key"; - private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id"; - private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY = - "messaging.gcp_pubsub.message.exactly_once_delivery"; - private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY = - "messaging.gcp_pubsub.message.delivery_attempt"; - private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline"; - private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack"; - private static final String PROJECT_ATTR_KEY = "gcp.project_id"; - private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish"; - - private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub"; - - OpenTelemetryPubsubTracer(Tracer tracer, boolean enableOpenTelemetry) { - this.tracer = tracer; - if (this.tracer != null && enableOpenTelemetry) { - this.enabled = true; - } - } - - /** Populates attributes that are common the publisher parent span and publish RPC span. */ - private static final AttributesBuilder createCommonSpanAttributesBuilder( - String destinationName, String projectName, String codeFunction, String operation) { - AttributesBuilder attributesBuilder = - Attributes.builder() - .put(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) - .put(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName) - .put(PROJECT_ATTR_KEY, projectName) - .put(SemanticAttributes.CODE_FUNCTION, codeFunction); - if (operation != null) { - attributesBuilder.put(SemanticAttributes.MESSAGING_OPERATION, operation); - } - - return attributesBuilder; - } - - private Span startChildSpan(String name, Span parent) { - return tracer.spanBuilder(name).setParent(Context.current().with(parent)).startSpan(); - } - - /** - * Creates and starts the parent span with the appropriate span attributes and injects the span - * context into the {@link PubsubMessage} attributes. - */ - void startPublisherSpan(PubsubMessageWrapper message) { - if (!enabled) { - return; - } - AttributesBuilder attributesBuilder = - createCommonSpanAttributesBuilder( - message.getTopicName(), message.getTopicProject(), "publish", "create"); - - attributesBuilder.put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize()); - if (!message.getOrderingKey().isEmpty()) { - attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey()); - } - - Span publisherSpan = - tracer - .spanBuilder(message.getTopicName() + " create") - .setSpanKind(SpanKind.PRODUCER) - .setAllAttributes(attributesBuilder.build()) - .startSpan(); - - message.setPublisherSpan(publisherSpan); - if (publisherSpan.getSpanContext().isValid()) { - message.injectSpanContext(); - } - } - - void endPublisherSpan(PubsubMessageWrapper message) { - if (!enabled) { - return; - } - message.endPublisherSpan(); - } - - void setPublisherMessageIdSpanAttribute(PubsubMessageWrapper message, String messageId) { - if (!enabled) { - return; - } - message.setPublisherMessageIdSpanAttribute(messageId); - } - - /** Creates a span for publish-side flow control as a child of the parent publisher span. */ - void startPublishFlowControlSpan(PubsubMessageWrapper message) { - if (!enabled) { - return; - } - Span publisherSpan = message.getPublisherSpan(); - if (publisherSpan != null) - message.setPublishFlowControlSpan( - startChildSpan(PUBLISH_FLOW_CONTROL_SPAN_NAME, publisherSpan)); - } - - void endPublishFlowControlSpan(PubsubMessageWrapper message) { - if (!enabled) { - return; - } - message.endPublishFlowControlSpan(); - } - - void setPublishFlowControlSpanException(PubsubMessageWrapper message, Throwable t) { - if (!enabled) { - return; - } - message.setPublishFlowControlSpanException(t); - } - - /** Creates a span for publish message batching as a child of the parent publisher span. */ - void startPublishBatchingSpan(PubsubMessageWrapper message) { - if (!enabled) { - return; - } - Span publisherSpan = message.getPublisherSpan(); - if (publisherSpan != null) { - message.setPublishBatchingSpan(startChildSpan(PUBLISH_BATCHING_SPAN_NAME, publisherSpan)); - } - } - - void endPublishBatchingSpan(PubsubMessageWrapper message) { - if (!enabled) { - return; - } - message.endPublishBatchingSpan(); - } - - /** - * Creates, starts, and returns a publish RPC span for the given message batch. Bi-directional - * links with the publisher parent span are created for sampled messages in the batch. - */ - Span startPublishRpcSpan(String topic, List messages) { - if (!enabled) { - return null; - } - TopicName topicName = TopicName.parse(topic); - Attributes attributes = - createCommonSpanAttributesBuilder( - topicName.getTopic(), topicName.getProject(), "publishCall", "publish") - .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size()) - .build(); - SpanBuilder publishRpcSpanBuilder = - tracer - .spanBuilder(topicName.getTopic() + PUBLISH_RPC_SPAN_SUFFIX) - .setSpanKind(SpanKind.CLIENT) - .setAllAttributes(attributes); - Attributes linkAttributes = - Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, "publish").build(); - for (PubsubMessageWrapper message : messages) { - if (message.getPublisherSpan().getSpanContext().isSampled()) - publishRpcSpanBuilder.addLink(message.getPublisherSpan().getSpanContext(), linkAttributes); - } - Span publishRpcSpan = publishRpcSpanBuilder.startSpan(); - - for (PubsubMessageWrapper message : messages) { - if (publishRpcSpan.getSpanContext().isSampled()) { - message.getPublisherSpan().addLink(publishRpcSpan.getSpanContext(), linkAttributes); - message.addPublishStartEvent(); - } - } - return publishRpcSpan; - } - - /** Ends the given publish RPC span if it exists. */ - void endPublishRpcSpan(Span publishRpcSpan) { - if (!enabled) { - return; - } - if (publishRpcSpan != null) { - publishRpcSpan.end(); - } - } - - /** - * Sets an error status and records an exception when an exception is thrown when publishing the - * message batch. - */ - void setPublishRpcSpanException(Span publishRpcSpan, Throwable t) { - if (!enabled) { - return; - } - if (publishRpcSpan != null) { - publishRpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on publish RPC."); - publishRpcSpan.recordException(t); - publishRpcSpan.end(); - } - } - - void startSubscriberSpan(PubsubMessageWrapper message, boolean exactlyOnceDeliveryEnabled) { - if (!enabled) { - return; - } - AttributesBuilder attributesBuilder = - createCommonSpanAttributesBuilder( - message.getSubscriptionName(), message.getSubscriptionProject(), "onResponse", null); - - attributesBuilder - .put(SemanticAttributes.MESSAGING_MESSAGE_ID, message.getMessageId()) - .put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize()) - .put(MESSAGE_ACK_ID_ATTR_KEY, message.getAckId()) - .put(MESSAGE_EXACTLY_ONCE_ATTR_KEY, exactlyOnceDeliveryEnabled); - if (!message.getOrderingKey().isEmpty()) { - attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey()); - } - if (message.getDeliveryAttempt() > 0) { - attributesBuilder.put(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, message.getDeliveryAttempt()); - } - Attributes attributes = attributesBuilder.build(); - Context publisherSpanContext = message.extractSpanContext(attributes); - message.setPublisherSpan(Span.fromContextOrNull(publisherSpanContext)); - message.setSubscriberSpan( - tracer - .spanBuilder(message.getSubscriptionName() + " subscribe") - .setSpanKind(SpanKind.CONSUMER) - .setParent(publisherSpanContext) - .setAllAttributes(attributes) - .startSpan()); - } - - void endSubscriberSpan(PubsubMessageWrapper message) { - if (!enabled) { - return; - } - message.endSubscriberSpan(); - } - - void setSubscriberSpanExpirationResult(PubsubMessageWrapper message) { - if (!enabled) { - return; - } - message.setSubscriberSpanExpirationResult(); - } - - void setSubscriberSpanException(PubsubMessageWrapper message, Throwable t, String exception) { - if (!enabled) { - return; - } - message.setSubscriberSpanException(t, exception); - } - - /** Creates a span for subscribe concurrency control as a child of the parent subscriber span. */ - void startSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) { - if (!enabled) { - return; - } - Span subscriberSpan = message.getSubscriberSpan(); - if (subscriberSpan != null) { - message.setSubscribeConcurrencyControlSpan( - startChildSpan(SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME, subscriberSpan)); - } - } - - void endSubscribeConcurrencyControlSpan(PubsubMessageWrapper message) { - if (!enabled) { - return; - } - message.endSubscribeConcurrencyControlSpan(); - } - - void setSubscribeConcurrencyControlSpanException(PubsubMessageWrapper message, Throwable t) { - if (!enabled) { - return; - } - message.setSubscribeConcurrencyControlSpanException(t); - } - - /** - * Creates a span for subscribe ordering key scheduling as a child of the parent subscriber span. - */ - void startSubscribeSchedulerSpan(PubsubMessageWrapper message) { - if (!enabled) { - return; - } - Span subscriberSpan = message.getSubscriberSpan(); - if (subscriberSpan != null) { - message.setSubscribeSchedulerSpan( - startChildSpan(SUBSCRIBE_SCHEDULER_SPAN_NAME, subscriberSpan)); - } - } - - void endSubscribeSchedulerSpan(PubsubMessageWrapper message) { - if (!enabled) { - return; - } - message.endSubscribeSchedulerSpan(); - } - - /** Creates a span for subscribe message processing as a child of the parent subscriber span. */ - void startSubscribeProcessSpan(PubsubMessageWrapper message) { - if (!enabled) { - return; - } - Span subscriberSpan = message.getSubscriberSpan(); - if (subscriberSpan != null) { - Span subscribeProcessSpan = - startChildSpan(message.getSubscriptionName() + " process", subscriberSpan); - subscribeProcessSpan.setAttribute( - SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE); - Span publisherSpan = message.getPublisherSpan(); - if (publisherSpan != null) { - subscribeProcessSpan.addLink(publisherSpan.getSpanContext()); - } - message.setSubscribeProcessSpan(subscribeProcessSpan); - } - } - - void endSubscribeProcessSpan(PubsubMessageWrapper message, String action) { - if (!enabled) { - return; - } - message.endSubscribeProcessSpan(action); - } - - /** - * Creates, starts, and returns spans for ModAck, Nack, and Ack RPC requests. Bi-directional links - * to parent subscribe span for sampled messages are added. - */ - Span startSubscribeRpcSpan( - String subscription, - String rpcOperation, - List messages, - int ackDeadline, - boolean isReceiptModack) { - if (!enabled) { - return null; - } - String codeFunction = rpcOperation == "ack" ? "sendAckOperations" : "sendModAckOperations"; - SubscriptionName subscriptionName = SubscriptionName.parse(subscription); - AttributesBuilder attributesBuilder = - createCommonSpanAttributesBuilder( - subscriptionName.getSubscription(), - subscriptionName.getProject(), - codeFunction, - rpcOperation) - .put(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messages.size()); - - // Ack deadline and receipt modack are specific to the modack operation - if (rpcOperation == "modack") { - attributesBuilder - .put(ACK_DEADLINE_ATTR_KEY, ackDeadline) - .put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack); - } - - SpanBuilder rpcSpanBuilder = - tracer - .spanBuilder(subscriptionName.getSubscription() + " " + rpcOperation) - .setSpanKind(SpanKind.CLIENT) - .setAllAttributes(attributesBuilder.build()); - Attributes linkAttributes = - Attributes.builder().put(SemanticAttributes.MESSAGING_OPERATION, rpcOperation).build(); - for (PubsubMessageWrapper message : messages) { - if (message.getSubscriberSpan().getSpanContext().isSampled()) { - rpcSpanBuilder.addLink(message.getSubscriberSpan().getSpanContext(), linkAttributes); - } - } - Span rpcSpan = rpcSpanBuilder.startSpan(); - - for (PubsubMessageWrapper message : messages) { - if (rpcSpan.getSpanContext().isSampled()) { - message.getSubscriberSpan().addLink(rpcSpan.getSpanContext(), linkAttributes); - switch (rpcOperation) { - case "ack": - message.addAckStartEvent(); - break; - case "modack": - message.addModAckStartEvent(); - break; - case "nack": - message.addNackStartEvent(); - break; - } - } - } - return rpcSpan; - } - - /** Ends the given subscribe RPC span if it exists. */ - void endSubscribeRpcSpan(Span rpcSpan) { - if (!enabled) { - return; - } - if (rpcSpan != null) { - rpcSpan.end(); - } - } - - /** - * Sets an error status and records an exception when an exception is thrown when handling a - * subscribe-side RPC. - */ - void setSubscribeRpcSpanException(Span rpcSpan, boolean isModack, int ackDeadline, Throwable t) { - if (!enabled) { - return; - } - if (rpcSpan != null) { - String operation = !isModack ? "ack" : (ackDeadline == 0 ? "nack" : "modack"); - rpcSpan.setStatus(StatusCode.ERROR, "Exception thrown on " + operation + " RPC."); - rpcSpan.recordException(t); - rpcSpan.end(); - } - } - - /** Adds the appropriate subscribe-side RPC end event. */ - void addEndRpcEvent( - PubsubMessageWrapper message, boolean rpcSampled, boolean isModack, int ackDeadline) { - if (!enabled || !rpcSampled) { - return; - } - if (!isModack) { - message.addAckEndEvent(); - } else if (ackDeadline == 0) { - message.addNackEndEvent(); - } else { - message.addModAckEndEvent(); - } - } -} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java deleted file mode 100644 index 94fd13085..000000000 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/PubsubMessageWrapper.java +++ /dev/null @@ -1,430 +0,0 @@ -/* - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsub.v1; - -import com.google.pubsub.v1.PubsubMessage; -import com.google.pubsub.v1.SubscriptionName; -import com.google.pubsub.v1.TopicName; -import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.propagation.TextMapGetter; -import io.opentelemetry.context.propagation.TextMapSetter; -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; - -/** - * A wrapper class for a {@link PubsubMessage} object that handles creation and tracking of - * OpenTelemetry {@link Span} objects for different operations that occur during publishing. - */ -public class PubsubMessageWrapper { - private PubsubMessage message; - - private final TopicName topicName; - private final SubscriptionName subscriptionName; - - // Attributes set only for messages received from a streaming pull response. - private final String ackId; - private final int deliveryAttempt; - - private static final String PUBLISH_START_EVENT = "publish start"; - private static final String PUBLISH_END_EVENT = "publish end"; - - private static final String MODACK_START_EVENT = "modack start"; - private static final String MODACK_END_EVENT = "modack end"; - private static final String NACK_START_EVENT = "nack start"; - private static final String NACK_END_EVENT = "nack end"; - private static final String ACK_START_EVENT = "ack start"; - private static final String ACK_END_EVENT = "ack end"; - - private static final String GOOGCLIENT_PREFIX = "googclient_"; - - private static final String MESSAGE_RESULT_ATTR_KEY = "messaging.gcp_pubsub.result"; - - private Span publisherSpan; - private Span publishFlowControlSpan; - private Span publishBatchingSpan; - - private Span subscriberSpan; - private Span subscribeConcurrencyControlSpan; - private Span subscribeSchedulerSpan; - private Span subscribeProcessSpan; - - private PubsubMessageWrapper(Builder builder) { - this.message = builder.message; - this.topicName = builder.topicName; - this.subscriptionName = builder.subscriptionName; - this.ackId = builder.ackId; - this.deliveryAttempt = builder.deliveryAttempt; - } - - static Builder newBuilder(PubsubMessage message, String topicName) { - return new Builder(message, topicName); - } - - static Builder newBuilder( - PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) { - return new Builder(message, subscriptionName, ackId, deliveryAttempt); - } - - /** Returns the PubsubMessage associated with this wrapper. */ - PubsubMessage getPubsubMessage() { - return message; - } - - void setPubsubMessage(PubsubMessage message) { - this.message = message; - } - - /** Returns the TopicName for this wrapper as a string. */ - String getTopicName() { - if (topicName != null) { - return topicName.getTopic(); - } - return ""; - } - - String getTopicProject() { - if (topicName != null) { - return topicName.getProject(); - } - return ""; - } - - /** Returns the SubscriptionName for this wrapper as a string. */ - String getSubscriptionName() { - if (subscriptionName != null) { - return subscriptionName.getSubscription(); - } - return ""; - } - - String getSubscriptionProject() { - if (subscriptionName != null) { - return subscriptionName.getProject(); - } - return ""; - } - - String getMessageId() { - return message.getMessageId(); - } - - String getAckId() { - return ackId; - } - - int getDataSize() { - return message.getData().size(); - } - - String getOrderingKey() { - return message.getOrderingKey(); - } - - int getDeliveryAttempt() { - return deliveryAttempt; - } - - Span getPublisherSpan() { - return publisherSpan; - } - - void setPublisherSpan(Span span) { - this.publisherSpan = span; - } - - void setPublishFlowControlSpan(Span span) { - this.publishFlowControlSpan = span; - } - - void setPublishBatchingSpan(Span span) { - this.publishBatchingSpan = span; - } - - Span getSubscriberSpan() { - return subscriberSpan; - } - - void setSubscriberSpan(Span span) { - this.subscriberSpan = span; - } - - void setSubscribeConcurrencyControlSpan(Span span) { - this.subscribeConcurrencyControlSpan = span; - } - - void setSubscribeSchedulerSpan(Span span) { - this.subscribeSchedulerSpan = span; - } - - void setSubscribeProcessSpan(Span span) { - this.subscribeProcessSpan = span; - } - - /** Creates a publish start event that is tied to the publish RPC span time. */ - void addPublishStartEvent() { - if (publisherSpan != null) { - publisherSpan.addEvent(PUBLISH_START_EVENT); - } - } - - /** - * Sets the message ID attribute in the publisher parent span. This is called after the publish - * RPC returns with a message ID. - */ - void setPublisherMessageIdSpanAttribute(String messageId) { - if (publisherSpan != null) { - publisherSpan.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId); - } - } - - /** Ends the publisher parent span if it exists. */ - void endPublisherSpan() { - if (publisherSpan != null) { - publisherSpan.addEvent(PUBLISH_END_EVENT); - publisherSpan.end(); - } - } - - /** Ends the publish flow control span if it exists. */ - void endPublishFlowControlSpan() { - if (publishFlowControlSpan != null) { - publishFlowControlSpan.end(); - } - } - - /** Ends the publish batching span if it exists. */ - void endPublishBatchingSpan() { - if (publishBatchingSpan != null) { - publishBatchingSpan.end(); - } - } - - /** - * Sets an error status and records an exception when an exception is thrown during flow control. - */ - void setPublishFlowControlSpanException(Throwable t) { - if (publishFlowControlSpan != null) { - publishFlowControlSpan.setStatus( - StatusCode.ERROR, "Exception thrown during publish flow control."); - publishFlowControlSpan.recordException(t); - endAllPublishSpans(); - } - } - - /** - * Creates start and end events for ModAcks, Nacks, and Acks that are tied to the corresponding - * RPC span start and end times. - */ - void addModAckStartEvent() { - if (subscriberSpan != null) { - subscriberSpan.addEvent(MODACK_START_EVENT); - } - } - - void addModAckEndEvent() { - if (subscriberSpan != null) { - subscriberSpan.addEvent(MODACK_END_EVENT); - } - } - - void addNackStartEvent() { - if (subscriberSpan != null) { - subscriberSpan.addEvent(NACK_START_EVENT); - } - } - - void addNackEndEvent() { - if (subscriberSpan != null) { - subscriberSpan.addEvent(NACK_END_EVENT); - } - } - - void addAckStartEvent() { - if (subscriberSpan != null) { - subscriberSpan.addEvent(ACK_START_EVENT); - } - } - - void addAckEndEvent() { - if (subscriberSpan != null) { - subscriberSpan.addEvent(ACK_END_EVENT); - } - } - - /** Ends the subscriber parent span if exists. */ - void endSubscriberSpan() { - if (subscriberSpan != null) { - subscriberSpan.end(); - } - } - - /** Ends the subscribe concurreny control span if exists. */ - void endSubscribeConcurrencyControlSpan() { - if (subscribeConcurrencyControlSpan != null) { - subscribeConcurrencyControlSpan.end(); - } - } - - /** Ends the subscribe scheduler span if exists. */ - void endSubscribeSchedulerSpan() { - if (subscribeSchedulerSpan != null) { - subscribeSchedulerSpan.end(); - } - } - - /** - * Ends the subscribe process span if it exists, creates an event with the appropriate result, and - * sets the result on the parent subscriber span. - */ - void endSubscribeProcessSpan(String action) { - if (subscribeProcessSpan != null) { - subscribeProcessSpan.addEvent(action + " called"); - subscribeProcessSpan.end(); - subscriberSpan.setAttribute(MESSAGE_RESULT_ATTR_KEY, action); - } - } - - /** Sets an exception on the subscriber span during Ack/ModAck/Nack failures */ - void setSubscriberSpanException(Throwable t, String exception) { - if (subscriberSpan != null) { - subscriberSpan.setStatus(StatusCode.ERROR, exception); - subscriberSpan.recordException(t); - endAllSubscribeSpans(); - } - } - - /** Sets result of the parent subscriber span to expired and ends its. */ - void setSubscriberSpanExpirationResult() { - if (subscriberSpan != null) { - subscriberSpan.setAttribute(MESSAGE_RESULT_ATTR_KEY, "expired"); - endSubscriberSpan(); - } - } - - /** - * Sets an error status and records an exception when an exception is thrown subscriber - * concurrency control. - */ - void setSubscribeConcurrencyControlSpanException(Throwable t) { - if (subscribeConcurrencyControlSpan != null) { - subscribeConcurrencyControlSpan.setStatus( - StatusCode.ERROR, "Exception thrown during subscribe concurrency control."); - subscribeConcurrencyControlSpan.recordException(t); - endAllSubscribeSpans(); - } - } - - /** Ends all publisher-side spans associated with this message wrapper. */ - private void endAllPublishSpans() { - endPublishFlowControlSpan(); - endPublishBatchingSpan(); - endPublisherSpan(); - } - - /** Ends all subscriber-side spans associated with this message wrapper. */ - private void endAllSubscribeSpans() { - endSubscribeConcurrencyControlSpan(); - endSubscribeSchedulerSpan(); - endSubscriberSpan(); - } - - /** - * Injects the span context into the attributes of a Pub/Sub message for propagation to the - * subscriber client. - */ - void injectSpanContext() { - TextMapSetter injectMessageAttributes = - new TextMapSetter() { - @Override - public void set(PubsubMessageWrapper carrier, String key, String value) { - PubsubMessage newMessage = - PubsubMessage.newBuilder(carrier.message) - .putAttributes(GOOGCLIENT_PREFIX + key, value) - .build(); - carrier.message = newMessage; - } - }; - W3CTraceContextPropagator.getInstance() - .inject(Context.current().with(publisherSpan), this, injectMessageAttributes); - } - - /** - * Extracts the span context from the attributes of a Pub/Sub message and creates the parent - * subscriber span using that context. - */ - Context extractSpanContext(Attributes attributes) { - TextMapGetter extractMessageAttributes = - new TextMapGetter() { - @Override - public String get(PubsubMessageWrapper carrier, String key) { - return carrier.message.getAttributesOrDefault(GOOGCLIENT_PREFIX + key, ""); - } - - public Iterable keys(PubsubMessageWrapper carrier) { - return carrier.message.getAttributesMap().keySet(); - } - }; - Context context = - W3CTraceContextPropagator.getInstance() - .extract(Context.current(), this, extractMessageAttributes); - return context; - } - - /** Builder of {@link PubsubMessageWrapper PubsubMessageWrapper}. */ - static final class Builder { - private PubsubMessage message = null; - private TopicName topicName = null; - private SubscriptionName subscriptionName = null; - private String ackId = null; - private int deliveryAttempt = 0; - - public Builder(PubsubMessage message, String topicName) { - this.message = message; - if (topicName != null) { - this.topicName = TopicName.parse(topicName); - } - } - - public Builder( - PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) { - this.message = message; - if (subscriptionName != null) { - this.subscriptionName = SubscriptionName.parse(subscriptionName); - } - this.ackId = ackId; - this.deliveryAttempt = deliveryAttempt; - } - - public Builder( - PubsubMessage message, - SubscriptionName subscriptionName, - String ackId, - int deliveryAttempt) { - this.message = message; - this.subscriptionName = subscriptionName; - this.ackId = ackId; - this.deliveryAttempt = deliveryAttempt; - } - - public PubsubMessageWrapper build() { - return new PubsubMessageWrapper(this); - } - } -} diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java deleted file mode 100644 index b4433f41e..000000000 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenTelemetryTest.java +++ /dev/null @@ -1,669 +0,0 @@ -/* - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsub.v1; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import com.google.protobuf.ByteString; -import com.google.pubsub.v1.PubsubMessage; -import com.google.pubsub.v1.SubscriptionName; -import com.google.pubsub.v1.TopicName; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanKind; -import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.sdk.testing.assertj.AttributesAssert; -import io.opentelemetry.sdk.testing.assertj.EventDataAssert; -import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions; -import io.opentelemetry.sdk.testing.assertj.SpanDataAssert; -import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; -import io.opentelemetry.sdk.trace.data.LinkData; -import io.opentelemetry.sdk.trace.data.SpanData; -import io.opentelemetry.sdk.trace.data.StatusData; -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; -import java.util.Arrays; -import java.util.List; -import org.junit.Test; - -public class OpenTelemetryTest { - private static final TopicName FULL_TOPIC_NAME = - TopicName.parse("projects/test-project/topics/test-topic"); - private static final SubscriptionName FULL_SUBSCRIPTION_NAME = - SubscriptionName.parse("projects/test-project/subscriptions/test-sub"); - private static final String PROJECT_NAME = "test-project"; - private static final String ORDERING_KEY = "abc"; - private static final String MESSAGE_ID = "m0"; - private static final String ACK_ID = "def"; - private static final int DELIVERY_ATTEMPT = 1; - private static final int ACK_DEADLINE = 10; - private static final boolean EXACTLY_ONCE_ENABLED = true; - - private static final String PUBLISHER_SPAN_NAME = FULL_TOPIC_NAME.getTopic() + " create"; - private static final String PUBLISH_FLOW_CONTROL_SPAN_NAME = "publisher flow control"; - private static final String PUBLISH_BATCHING_SPAN_NAME = "publisher batching"; - private static final String PUBLISH_RPC_SPAN_NAME = FULL_TOPIC_NAME.getTopic() + " publish"; - private static final String PUBLISH_START_EVENT = "publish start"; - private static final String PUBLISH_END_EVENT = "publish end"; - - private static final String SUBSCRIBER_SPAN_NAME = - FULL_SUBSCRIPTION_NAME.getSubscription() + " subscribe"; - private static final String SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME = - "subscriber concurrency control"; - private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler"; - private static final String SUBSCRIBE_PROCESS_SPAN_NAME = - FULL_SUBSCRIPTION_NAME.getSubscription() + " process"; - private static final String SUBSCRIBE_MODACK_RPC_SPAN_NAME = - FULL_SUBSCRIPTION_NAME.getSubscription() + " modack"; - private static final String SUBSCRIBE_ACK_RPC_SPAN_NAME = - FULL_SUBSCRIPTION_NAME.getSubscription() + " ack"; - private static final String SUBSCRIBE_NACK_RPC_SPAN_NAME = - FULL_SUBSCRIPTION_NAME.getSubscription() + " nack"; - - private static final String PROCESS_ACTION = "ack"; - private static final String MODACK_START_EVENT = "modack start"; - private static final String MODACK_END_EVENT = "modack end"; - private static final String NACK_START_EVENT = "nack start"; - private static final String NACK_END_EVENT = "nack end"; - private static final String ACK_START_EVENT = "ack start"; - private static final String ACK_END_EVENT = "ack end"; - - private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub"; - private static final String PROJECT_ATTR_KEY = "gcp.project_id"; - private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size"; - private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key"; - private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline"; - private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack"; - private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id"; - private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY = - "messaging.gcp_pubsub.message.exactly_once_delivery"; - private static final String MESSAGE_RESULT_ATTR_KEY = "messaging.gcp_pubsub.result"; - private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY = - "messaging.gcp_pubsub.message.delivery_attempt"; - - private static final String TRACEPARENT_ATTRIBUTE = "googclient_traceparent"; - - private static final OpenTelemetryRule openTelemetryTesting = OpenTelemetryRule.create(); - - @Test - public void testPublishSpansSuccess() { - openTelemetryTesting.clearSpans(); - - PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); - List messageWrappers = Arrays.asList(messageWrapper); - - long messageSize = messageWrapper.getPubsubMessage().getData().size(); - Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); - OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); - - // Call all span start/end methods in the expected order - tracer.startPublisherSpan(messageWrapper); - tracer.startPublishFlowControlSpan(messageWrapper); - tracer.endPublishFlowControlSpan(messageWrapper); - tracer.startPublishBatchingSpan(messageWrapper); - tracer.endPublishBatchingSpan(messageWrapper); - Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers); - tracer.endPublishRpcSpan(publishRpcSpan); - tracer.setPublisherMessageIdSpanAttribute(messageWrapper, MESSAGE_ID); - tracer.endPublisherSpan(messageWrapper); - - List allSpans = openTelemetryTesting.getSpans(); - assertEquals(4, allSpans.size()); - SpanData flowControlSpanData = allSpans.get(0); - SpanData batchingSpanData = allSpans.get(1); - SpanData publishRpcSpanData = allSpans.get(2); - SpanData publisherSpanData = allSpans.get(3); - - SpanDataAssert flowControlSpanDataAssert = - OpenTelemetryAssertions.assertThat(flowControlSpanData); - flowControlSpanDataAssert - .hasName(PUBLISH_FLOW_CONTROL_SPAN_NAME) - .hasParent(publisherSpanData) - .hasEnded(); - - SpanDataAssert batchingSpanDataAssert = OpenTelemetryAssertions.assertThat(batchingSpanData); - batchingSpanDataAssert - .hasName(PUBLISH_BATCHING_SPAN_NAME) - .hasParent(publisherSpanData) - .hasEnded(); - - // Check span data, links, and attributes for the publish RPC span - SpanDataAssert publishRpcSpanDataAssert = - OpenTelemetryAssertions.assertThat(publishRpcSpanData); - publishRpcSpanDataAssert - .hasName(PUBLISH_RPC_SPAN_NAME) - .hasKind(SpanKind.CLIENT) - .hasNoParent() - .hasEnded(); - - List publishRpcLinks = publishRpcSpanData.getLinks(); - assertEquals(messageWrappers.size(), publishRpcLinks.size()); - assertEquals(publisherSpanData.getSpanContext(), publishRpcLinks.get(0).getSpanContext()); - - assertEquals(6, publishRpcSpanData.getAttributes().size()); - AttributesAssert publishRpcSpanAttributesAssert = - OpenTelemetryAssertions.assertThat(publishRpcSpanData.getAttributes()); - publishRpcSpanAttributesAssert - .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) - .containsEntry(SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_TOPIC_NAME.getTopic()) - .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) - .containsEntry(SemanticAttributes.CODE_FUNCTION, "publishCall") - .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "publish") - .containsEntry(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, messageWrappers.size()); - - // Check span data, events, links, and attributes for the publisher create span - SpanDataAssert publisherSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData); - publisherSpanDataAssert - .hasName(PUBLISHER_SPAN_NAME) - .hasKind(SpanKind.PRODUCER) - .hasNoParent() - .hasEnded(); - - assertEquals(2, publisherSpanData.getEvents().size()); - EventDataAssert startEventAssert = - OpenTelemetryAssertions.assertThat(publisherSpanData.getEvents().get(0)); - startEventAssert.hasName(PUBLISH_START_EVENT); - EventDataAssert endEventAssert = - OpenTelemetryAssertions.assertThat(publisherSpanData.getEvents().get(1)); - endEventAssert.hasName(PUBLISH_END_EVENT); - - List publisherLinks = publisherSpanData.getLinks(); - assertEquals(1, publisherLinks.size()); - assertEquals(publishRpcSpanData.getSpanContext(), publisherLinks.get(0).getSpanContext()); - - assertEquals(8, publisherSpanData.getAttributes().size()); - AttributesAssert publisherSpanAttributesAssert = - OpenTelemetryAssertions.assertThat(publisherSpanData.getAttributes()); - publisherSpanAttributesAssert - .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) - .containsEntry(SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_TOPIC_NAME.getTopic()) - .containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME) - .containsEntry(SemanticAttributes.CODE_FUNCTION, "publish") - .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "create") - .containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY) - .containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize) - .containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID); - - // Check that the message has the attribute containing the trace context. - PubsubMessage message = messageWrapper.getPubsubMessage(); - assertEquals(1, message.getAttributesMap().size()); - assertTrue(message.containsAttributes(TRACEPARENT_ATTRIBUTE)); - assertTrue( - message - .getAttributesOrDefault(TRACEPARENT_ATTRIBUTE, "") - .contains(publisherSpanData.getTraceId())); - assertTrue( - message - .getAttributesOrDefault(TRACEPARENT_ATTRIBUTE, "") - .contains(publisherSpanData.getSpanId())); - } - - @Test - public void testPublishFlowControlSpanFailure() { - openTelemetryTesting.clearSpans(); - - PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); - - Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); - OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); - - tracer.startPublisherSpan(messageWrapper); - tracer.startPublishFlowControlSpan(messageWrapper); - - Exception e = new Exception("test-exception"); - tracer.setPublishFlowControlSpanException(messageWrapper, e); - - List allSpans = openTelemetryTesting.getSpans(); - assertEquals(2, allSpans.size()); - SpanData flowControlSpanData = allSpans.get(0); - SpanData publisherSpanData = allSpans.get(1); - - SpanDataAssert flowControlSpanDataAssert = - OpenTelemetryAssertions.assertThat(flowControlSpanData); - StatusData expectedStatus = - StatusData.create(StatusCode.ERROR, "Exception thrown during publish flow control."); - flowControlSpanDataAssert - .hasName(PUBLISH_FLOW_CONTROL_SPAN_NAME) - .hasParent(publisherSpanData) - .hasStatus(expectedStatus) - .hasException(e) - .hasEnded(); - - SpanDataAssert publisherSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData); - publisherSpanDataAssert - .hasName(PUBLISHER_SPAN_NAME) - .hasKind(SpanKind.PRODUCER) - .hasNoParent() - .hasEnded(); - } - - @Test - public void testPublishRpcSpanFailure() { - openTelemetryTesting.clearSpans(); - - PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); - - List messageWrappers = Arrays.asList(messageWrapper); - Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); - OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); - - tracer.startPublisherSpan(messageWrapper); - Span publishRpcSpan = tracer.startPublishRpcSpan(FULL_TOPIC_NAME.toString(), messageWrappers); - - Exception e = new Exception("test-exception"); - tracer.setPublishRpcSpanException(publishRpcSpan, e); - tracer.endPublisherSpan(messageWrapper); - - List allSpans = openTelemetryTesting.getSpans(); - assertEquals(2, allSpans.size()); - SpanData rpcSpanData = allSpans.get(0); - SpanData publisherSpanData = allSpans.get(1); - - SpanDataAssert rpcSpanDataAssert = OpenTelemetryAssertions.assertThat(rpcSpanData); - StatusData expectedStatus = - StatusData.create(StatusCode.ERROR, "Exception thrown on publish RPC."); - rpcSpanDataAssert - .hasName(PUBLISH_RPC_SPAN_NAME) - .hasKind(SpanKind.CLIENT) - .hasStatus(expectedStatus) - .hasException(e) - .hasEnded(); - - SpanDataAssert publisherSpanDataAssert = OpenTelemetryAssertions.assertThat(publisherSpanData); - publisherSpanDataAssert - .hasName(PUBLISHER_SPAN_NAME) - .hasKind(SpanKind.PRODUCER) - .hasNoParent() - .hasEnded(); - } - - @Test - public void testSubscribeSpansSuccess() { - openTelemetryTesting.clearSpans(); - - Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); - OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); - - PubsubMessageWrapper publishMessageWrapper = - PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build(); - // Initialize the Publisher span to inject the context in the message - tracer.startPublisherSpan(publishMessageWrapper); - tracer.endPublisherSpan(publishMessageWrapper); - - PubsubMessage publishedMessage = - publishMessageWrapper.getPubsubMessage().toBuilder().setMessageId(MESSAGE_ID).build(); - PubsubMessageWrapper subscribeMessageWrapper = - PubsubMessageWrapper.newBuilder( - publishedMessage, FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, 1) - .build(); - List subscribeMessageWrappers = Arrays.asList(subscribeMessageWrapper); - - long messageSize = subscribeMessageWrapper.getPubsubMessage().getData().size(); - - // Call all span start/end methods in the expected order - tracer.startSubscriberSpan(subscribeMessageWrapper, EXACTLY_ONCE_ENABLED); - tracer.startSubscribeConcurrencyControlSpan(subscribeMessageWrapper); - tracer.endSubscribeConcurrencyControlSpan(subscribeMessageWrapper); - tracer.startSubscribeSchedulerSpan(subscribeMessageWrapper); - tracer.endSubscribeSchedulerSpan(subscribeMessageWrapper); - tracer.startSubscribeProcessSpan(subscribeMessageWrapper); - tracer.endSubscribeProcessSpan(subscribeMessageWrapper, PROCESS_ACTION); - Span subscribeModackRpcSpan = - tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), - "modack", - subscribeMessageWrappers, - ACK_DEADLINE, - true); - tracer.endSubscribeRpcSpan(subscribeModackRpcSpan); - tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, ACK_DEADLINE); - Span subscribeAckRpcSpan = - tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), "ack", subscribeMessageWrappers, 0, false); - tracer.endSubscribeRpcSpan(subscribeAckRpcSpan); - tracer.addEndRpcEvent(subscribeMessageWrapper, true, false, 0); - Span subscribeNackRpcSpan = - tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), "nack", subscribeMessageWrappers, 0, false); - tracer.endSubscribeRpcSpan(subscribeNackRpcSpan); - tracer.addEndRpcEvent(subscribeMessageWrapper, true, true, 0); - tracer.endSubscriberSpan(subscribeMessageWrapper); - - List allSpans = openTelemetryTesting.getSpans(); - assertEquals(8, allSpans.size()); - - SpanData publisherSpanData = allSpans.get(0); - SpanData concurrencyControlSpanData = allSpans.get(1); - SpanData schedulerSpanData = allSpans.get(2); - SpanData processSpanData = allSpans.get(3); - SpanData modackRpcSpanData = allSpans.get(4); - SpanData ackRpcSpanData = allSpans.get(5); - SpanData nackRpcSpanData = allSpans.get(6); - SpanData subscriberSpanData = allSpans.get(7); - - SpanDataAssert concurrencyControlSpanDataAssert = - OpenTelemetryAssertions.assertThat(concurrencyControlSpanData); - concurrencyControlSpanDataAssert - .hasName(SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME) - .hasParent(subscriberSpanData) - .hasEnded(); - - SpanDataAssert schedulerSpanDataAssert = OpenTelemetryAssertions.assertThat(schedulerSpanData); - schedulerSpanDataAssert - .hasName(SUBSCRIBE_SCHEDULER_SPAN_NAME) - .hasParent(subscriberSpanData) - .hasEnded(); - - SpanDataAssert processSpanDataAssert = OpenTelemetryAssertions.assertThat(processSpanData); - processSpanDataAssert - .hasName(SUBSCRIBE_PROCESS_SPAN_NAME) - .hasParent(subscriberSpanData) - .hasEnded(); - - assertEquals(1, processSpanData.getEvents().size()); - EventDataAssert actionCalledEventAssert = - OpenTelemetryAssertions.assertThat(processSpanData.getEvents().get(0)); - actionCalledEventAssert.hasName(PROCESS_ACTION + " called"); - - // Check span data, links, and attributes for the modack RPC span - SpanDataAssert modackRpcSpanDataAssert = OpenTelemetryAssertions.assertThat(modackRpcSpanData); - modackRpcSpanDataAssert - .hasName(SUBSCRIBE_MODACK_RPC_SPAN_NAME) - .hasKind(SpanKind.CLIENT) - .hasNoParent() - .hasEnded(); - - List modackRpcLinks = modackRpcSpanData.getLinks(); - assertEquals(subscribeMessageWrappers.size(), modackRpcLinks.size()); - assertEquals(subscriberSpanData.getSpanContext(), modackRpcLinks.get(0).getSpanContext()); - - assertEquals(8, modackRpcSpanData.getAttributes().size()); - AttributesAssert modackRpcSpanAttributesAssert = - OpenTelemetryAssertions.assertThat(modackRpcSpanData.getAttributes()); - modackRpcSpanAttributesAssert - .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) - .containsEntry( - SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) - .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) - .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendModAckOperations") - .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "modack") - .containsEntry( - SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size()) - .containsEntry(ACK_DEADLINE_ATTR_KEY, 10) - .containsEntry(RECEIPT_MODACK_ATTR_KEY, true); - - // Check span data, links, and attributes for the ack RPC span - SpanDataAssert ackRpcSpanDataAssert = OpenTelemetryAssertions.assertThat(ackRpcSpanData); - ackRpcSpanDataAssert - .hasName(SUBSCRIBE_ACK_RPC_SPAN_NAME) - .hasKind(SpanKind.CLIENT) - .hasNoParent() - .hasEnded(); - - List ackRpcLinks = ackRpcSpanData.getLinks(); - assertEquals(subscribeMessageWrappers.size(), ackRpcLinks.size()); - assertEquals(subscriberSpanData.getSpanContext(), ackRpcLinks.get(0).getSpanContext()); - - assertEquals(6, ackRpcSpanData.getAttributes().size()); - AttributesAssert ackRpcSpanAttributesAssert = - OpenTelemetryAssertions.assertThat(ackRpcSpanData.getAttributes()); - ackRpcSpanAttributesAssert - .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) - .containsEntry( - SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) - .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) - .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendAckOperations") - .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "ack") - .containsEntry( - SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size()); - - // Check span data, links, and attributes for the nack RPC span - SpanDataAssert nackRpcSpanDataAssert = OpenTelemetryAssertions.assertThat(nackRpcSpanData); - nackRpcSpanDataAssert - .hasName(SUBSCRIBE_NACK_RPC_SPAN_NAME) - .hasKind(SpanKind.CLIENT) - .hasNoParent() - .hasEnded(); - - List nackRpcLinks = nackRpcSpanData.getLinks(); - assertEquals(subscribeMessageWrappers.size(), nackRpcLinks.size()); - assertEquals(subscriberSpanData.getSpanContext(), nackRpcLinks.get(0).getSpanContext()); - - assertEquals(6, nackRpcSpanData.getAttributes().size()); - AttributesAssert nackRpcSpanAttributesAssert = - OpenTelemetryAssertions.assertThat(nackRpcSpanData.getAttributes()); - nackRpcSpanAttributesAssert - .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) - .containsEntry( - SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) - .containsEntry(PROJECT_ATTR_KEY, FULL_TOPIC_NAME.getProject()) - .containsEntry(SemanticAttributes.CODE_FUNCTION, "sendModAckOperations") - .containsEntry(SemanticAttributes.MESSAGING_OPERATION, "nack") - .containsEntry( - SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size()); - - // Check span data, events, links, and attributes for the publisher create span - SpanDataAssert subscriberSpanDataAssert = - OpenTelemetryAssertions.assertThat(subscriberSpanData); - subscriberSpanDataAssert - .hasName(SUBSCRIBER_SPAN_NAME) - .hasKind(SpanKind.CONSUMER) - .hasParent(publisherSpanData) - .hasEnded(); - - assertEquals(6, subscriberSpanData.getEvents().size()); - EventDataAssert startModackEventAssert = - OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(0)); - startModackEventAssert.hasName(MODACK_START_EVENT); - EventDataAssert endModackEventAssert = - OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(1)); - endModackEventAssert.hasName(MODACK_END_EVENT); - EventDataAssert startAckEventAssert = - OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(2)); - startAckEventAssert.hasName(ACK_START_EVENT); - EventDataAssert endAckEventAssert = - OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(3)); - endAckEventAssert.hasName(ACK_END_EVENT); - EventDataAssert startNackEventAssert = - OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(4)); - startNackEventAssert.hasName(NACK_START_EVENT); - EventDataAssert endNackEventAssert = - OpenTelemetryAssertions.assertThat(subscriberSpanData.getEvents().get(5)); - endNackEventAssert.hasName(NACK_END_EVENT); - - List subscriberLinks = subscriberSpanData.getLinks(); - assertEquals(3, subscriberLinks.size()); - assertEquals(modackRpcSpanData.getSpanContext(), subscriberLinks.get(0).getSpanContext()); - assertEquals(ackRpcSpanData.getSpanContext(), subscriberLinks.get(1).getSpanContext()); - assertEquals(nackRpcSpanData.getSpanContext(), subscriberLinks.get(2).getSpanContext()); - - assertEquals(11, subscriberSpanData.getAttributes().size()); - AttributesAssert subscriberSpanAttributesAssert = - OpenTelemetryAssertions.assertThat(subscriberSpanData.getAttributes()); - subscriberSpanAttributesAssert - .containsEntry(SemanticAttributes.MESSAGING_SYSTEM, MESSAGING_SYSTEM_VALUE) - .containsEntry( - SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription()) - .containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME) - .containsEntry(SemanticAttributes.CODE_FUNCTION, "onResponse") - .containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize) - .containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY) - .containsEntry(MESSAGE_ACK_ID_ATTR_KEY, ACK_ID) - .containsEntry(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, DELIVERY_ATTEMPT) - .containsEntry(MESSAGE_EXACTLY_ONCE_ATTR_KEY, EXACTLY_ONCE_ENABLED) - .containsEntry(MESSAGE_RESULT_ATTR_KEY, PROCESS_ACTION) - .containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID); - } - - @Test - public void testSubscribeConcurrencyControlSpanFailure() { - openTelemetryTesting.clearSpans(); - - PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder( - getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) - .build(); - - Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); - OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); - - tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED); - tracer.startSubscribeConcurrencyControlSpan(messageWrapper); - - Exception e = new Exception("test-exception"); - tracer.setSubscribeConcurrencyControlSpanException(messageWrapper, e); - - List allSpans = openTelemetryTesting.getSpans(); - assertEquals(2, allSpans.size()); - SpanData concurrencyControlSpanData = allSpans.get(0); - SpanData subscriberSpanData = allSpans.get(1); - - SpanDataAssert concurrencyControlSpanDataAssert = - OpenTelemetryAssertions.assertThat(concurrencyControlSpanData); - StatusData expectedStatus = - StatusData.create( - StatusCode.ERROR, "Exception thrown during subscribe concurrency control."); - concurrencyControlSpanDataAssert - .hasName(SUBSCRIBE_CONCURRENCY_CONTROL_SPAN_NAME) - .hasParent(subscriberSpanData) - .hasStatus(expectedStatus) - .hasException(e) - .hasEnded(); - - SpanDataAssert subscriberSpanDataAssert = - OpenTelemetryAssertions.assertThat(subscriberSpanData); - subscriberSpanDataAssert - .hasName(SUBSCRIBER_SPAN_NAME) - .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasEnded(); - } - - @Test - public void testSubscriberSpanFailure() { - openTelemetryTesting.clearSpans(); - - PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder( - getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) - .build(); - - Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); - OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); - - tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED); - - Exception e = new Exception("test-exception"); - tracer.setSubscriberSpanException(messageWrapper, e, "Test exception"); - - List allSpans = openTelemetryTesting.getSpans(); - assertEquals(1, allSpans.size()); - SpanData subscriberSpanData = allSpans.get(0); - - StatusData expectedStatus = StatusData.create(StatusCode.ERROR, "Test exception"); - SpanDataAssert subscriberSpanDataAssert = - OpenTelemetryAssertions.assertThat(subscriberSpanData); - subscriberSpanDataAssert - .hasName(SUBSCRIBER_SPAN_NAME) - .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasStatus(expectedStatus) - .hasException(e) - .hasEnded(); - } - - @Test - public void testSubscribeRpcSpanFailures() { - openTelemetryTesting.clearSpans(); - - PubsubMessageWrapper messageWrapper = - PubsubMessageWrapper.newBuilder( - getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT) - .build(); - List messageWrappers = Arrays.asList(messageWrapper); - - Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test"); - OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true); - - tracer.startSubscriberSpan(messageWrapper, EXACTLY_ONCE_ENABLED); - Span subscribeModackRpcSpan = - tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), "modack", messageWrappers, ACK_DEADLINE, true); - Span subscribeAckRpcSpan = - tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), "ack", messageWrappers, 0, false); - Span subscribeNackRpcSpan = - tracer.startSubscribeRpcSpan( - FULL_SUBSCRIPTION_NAME.toString(), "nack", messageWrappers, 0, false); - - Exception e = new Exception("test-exception"); - tracer.setSubscribeRpcSpanException(subscribeModackRpcSpan, true, ACK_DEADLINE, e); - tracer.setSubscribeRpcSpanException(subscribeAckRpcSpan, false, 0, e); - tracer.setSubscribeRpcSpanException(subscribeNackRpcSpan, true, 0, e); - tracer.endSubscriberSpan(messageWrapper); - - List allSpans = openTelemetryTesting.getSpans(); - assertEquals(4, allSpans.size()); - SpanData modackSpanData = allSpans.get(0); - SpanData ackSpanData = allSpans.get(1); - SpanData nackSpanData = allSpans.get(2); - SpanData subscriberSpanData = allSpans.get(3); - - StatusData expectedModackStatus = - StatusData.create(StatusCode.ERROR, "Exception thrown on modack RPC."); - SpanDataAssert modackSpanDataAssert = OpenTelemetryAssertions.assertThat(modackSpanData); - modackSpanDataAssert - .hasName(SUBSCRIBE_MODACK_RPC_SPAN_NAME) - .hasKind(SpanKind.CLIENT) - .hasNoParent() - .hasStatus(expectedModackStatus) - .hasException(e) - .hasEnded(); - - StatusData expectedAckStatus = - StatusData.create(StatusCode.ERROR, "Exception thrown on ack RPC."); - SpanDataAssert ackSpanDataAssert = OpenTelemetryAssertions.assertThat(ackSpanData); - ackSpanDataAssert - .hasName(SUBSCRIBE_ACK_RPC_SPAN_NAME) - .hasKind(SpanKind.CLIENT) - .hasNoParent() - .hasStatus(expectedAckStatus) - .hasException(e) - .hasEnded(); - - StatusData expectedNackStatus = - StatusData.create(StatusCode.ERROR, "Exception thrown on nack RPC."); - SpanDataAssert nackSpanDataAssert = OpenTelemetryAssertions.assertThat(nackSpanData); - nackSpanDataAssert - .hasName(SUBSCRIBE_NACK_RPC_SPAN_NAME) - .hasKind(SpanKind.CLIENT) - .hasNoParent() - .hasStatus(expectedNackStatus) - .hasException(e) - .hasEnded(); - } - - private PubsubMessage getPubsubMessage() { - return PubsubMessage.newBuilder() - .setData(ByteString.copyFromUtf8("test-data")) - .setOrderingKey(ORDERING_KEY) - .build(); - } -}