Skip to content

Commit

Permalink
[feat][broker] PIP-264: Add OpenTelemetry consumer metrics (#22693)
Browse files Browse the repository at this point in the history
  • Loading branch information
dragosvictor authored May 10, 2024
1 parent d77c5de commit e558cfe
Show file tree
Hide file tree
Showing 7 changed files with 408 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.SchemaStorageFactory;
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.OpenTelemetryConsumerStats;
import org.apache.pulsar.broker.stats.OpenTelemetryTopicStats;
import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
Expand Down Expand Up @@ -254,6 +255,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private MetricsGenerator metricsGenerator;
private final PulsarBrokerOpenTelemetry openTelemetry;
private OpenTelemetryTopicStats openTelemetryTopicStats;
private OpenTelemetryConsumerStats openTelemetryConsumerStats;

private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
Expand Down Expand Up @@ -630,8 +632,13 @@ public CompletableFuture<Void> closeAsync() {
brokerClientSharedTimer.stop();
monotonicSnapshotClock.close();

if (openTelemetryConsumerStats != null) {
openTelemetryConsumerStats.close();
openTelemetryConsumerStats = null;
}
if (openTelemetryTopicStats != null) {
openTelemetryTopicStats.close();
openTelemetryTopicStats = null;
}

asyncCloseFutures.add(EventLoopUtil.shutdownGracefully(ioEventLoopGroup));
Expand Down Expand Up @@ -775,6 +782,7 @@ public void start() throws PulsarServerException {
}

openTelemetryTopicStats = new OpenTelemetryTopicStats(this);
openTelemetryConsumerStats = new OpenTelemetryConsumerStats(this);

localMetadataSynchronizer = StringUtils.isNotBlank(config.getMetadataSyncEventTopic())
? new PulsarMetadataEventSynchronizer(this, config.getMetadataSyncEventTopic())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.util.concurrent.AtomicDouble;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.time.Instant;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
Expand Down Expand Up @@ -90,7 +91,9 @@ public class Consumer {
private final Rate msgOut;
private final Rate msgRedeliver;
private final LongAdder msgOutCounter;
private final LongAdder msgRedeliverCounter;
private final LongAdder bytesOutCounter;
private final LongAdder messageAckCounter;
private final Rate messageAckRate;

private volatile long lastConsumedTimestamp;
Expand Down Expand Up @@ -152,6 +155,9 @@ public class Consumer {
@Getter
private final SchemaType schemaType;

@Getter
private final Instant connectedSince = Instant.now();

public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
boolean isDurable, TransportCnx cnx, String appId,
Expand Down Expand Up @@ -182,8 +188,10 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.msgOut = new Rate();
this.chunkedMessageRate = new Rate();
this.msgRedeliver = new Rate();
this.msgRedeliverCounter = new LongAdder();
this.bytesOutCounter = new LongAdder();
this.msgOutCounter = new LongAdder();
this.messageAckCounter = new LongAdder();
this.messageAckRate = new Rate();
this.appId = appId;

Expand All @@ -200,7 +208,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
stats = new ConsumerStatsImpl();
stats.setAddress(cnx.clientSourceAddressAndPort());
stats.consumerName = consumerName;
stats.setConnectedSince(DateFormatter.now());
stats.setConnectedSince(DateFormatter.format(connectedSince));
stats.setClientVersion(cnx.getClientVersion());
stats.metadata = this.metadata;

Expand Down Expand Up @@ -238,8 +246,10 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.consumerName = consumerName;
this.msgOut = null;
this.msgRedeliver = null;
this.msgRedeliverCounter = null;
this.msgOutCounter = null;
this.bytesOutCounter = null;
this.messageAckCounter = null;
this.messageAckRate = null;
this.pendingAcks = null;
this.stats = null;
Expand Down Expand Up @@ -502,6 +512,7 @@ public CompletableFuture<Void> messageAcked(CommandAck ack) {
return future
.thenApply(v -> {
this.messageAckRate.recordEvent(v);
this.messageAckCounter.add(v);
return null;
});
}
Expand Down Expand Up @@ -922,6 +933,14 @@ public long getBytesOutCounter() {
return bytesOutCounter.longValue();
}

public long getMessageAckCounter() {
return messageAckCounter.sum();
}

public long getMessageRedeliverCounter() {
return msgRedeliverCounter.sum();
}

public int getUnackedMessages() {
return unackedMessages;
}
Expand Down Expand Up @@ -1059,6 +1078,8 @@ public void redeliverUnacknowledgedMessages(long consumerEpoch) {
}

msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.intValue(), totalRedeliveryMessages.intValue());
msgRedeliverCounter.add(totalRedeliveryMessages.intValue());

subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
} else {
subscription.redeliverUnacknowledgedMessages(this, consumerEpoch);
Expand Down Expand Up @@ -1091,6 +1112,7 @@ public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {

subscription.redeliverUnacknowledgedMessages(this, pendingPositions);
msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages);
msgRedeliverCounter.add(totalRedeliveryMessages);

int numberOfBlockedPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0);

Expand Down Expand Up @@ -1153,6 +1175,14 @@ public String getClientAddress() {
return clientAddress;
}

public String getClientAddressAndPort() {
return cnx.clientSourceAddressAndPort();
}

public String getClientVersion() {
return cnx.getClientVersion();
}

public MessageId getStartMessageId() {
return startMessageId;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import java.util.Collection;
import java.util.Optional;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;

public class OpenTelemetryConsumerStats implements AutoCloseable {

// Replaces pulsar_consumer_msg_rate_out
public static final String MESSAGE_OUT_COUNTER = "pulsar.broker.consumer.message.outgoing.count";
private final ObservableLongMeasurement messageOutCounter;

// Replaces pulsar_consumer_msg_throughput_out
public static final String BYTES_OUT_COUNTER = "pulsar.broker.consumer.message.outgoing.size";
private final ObservableLongMeasurement bytesOutCounter;

// Replaces pulsar_consumer_msg_ack_rate
public static final String MESSAGE_ACK_COUNTER = "pulsar.broker.consumer.message.ack.count";
private final ObservableLongMeasurement messageAckCounter;

// Replaces pulsar_consumer_msg_rate_redeliver
public static final String MESSAGE_REDELIVER_COUNTER = "pulsar.broker.consumer.message.redeliver.count";
private final ObservableLongMeasurement messageRedeliverCounter;

// Replaces pulsar_consumer_unacked_messages
public static final String MESSAGE_UNACKNOWLEDGED_COUNTER = "pulsar.broker.consumer.message.unack.count";
private final ObservableLongMeasurement messageUnacknowledgedCounter;

// Replaces pulsar_consumer_available_permits
public static final String MESSAGE_PERMITS_COUNTER = "pulsar.broker.consumer.permit.count";
private final ObservableLongMeasurement messagePermitsCounter;

private final BatchCallback batchCallback;

public OpenTelemetryConsumerStats(PulsarService pulsar) {
var meter = pulsar.getOpenTelemetry().getMeter();

messageOutCounter = meter
.counterBuilder(MESSAGE_OUT_COUNTER)
.setUnit("{message}")
.setDescription("The total number of messages dispatched to this consumer.")
.buildObserver();

bytesOutCounter = meter
.counterBuilder(BYTES_OUT_COUNTER)
.setUnit("By")
.setDescription("The total number of messages bytes dispatched to this consumer.")
.buildObserver();

messageAckCounter = meter
.counterBuilder(MESSAGE_ACK_COUNTER)
.setUnit("{ack}")
.setDescription("The total number of message acknowledgments received from this consumer.")
.buildObserver();

messageRedeliverCounter = meter
.counterBuilder(MESSAGE_REDELIVER_COUNTER)
.setUnit("{message}")
.setDescription("The total number of messages that have been redelivered to this consumer.")
.buildObserver();

messageUnacknowledgedCounter = meter
.upDownCounterBuilder(MESSAGE_UNACKNOWLEDGED_COUNTER)
.setUnit("{message}")
.setDescription("The total number of messages unacknowledged by this consumer.")
.buildObserver();

messagePermitsCounter = meter
.upDownCounterBuilder(MESSAGE_PERMITS_COUNTER)
.setUnit("{permit}")
.setDescription("The number of permits currently available for this consumer.")
.buildObserver();

batchCallback = meter.batchCallback(() -> pulsar.getBrokerService()
.getTopics()
.values()
.stream()
.map(topicFuture -> topicFuture.getNow(Optional.empty()))
.filter(Optional::isPresent)
.map(Optional::get)
.map(Topic::getSubscriptions)
.flatMap(s -> s.values().stream())
.map(Subscription::getConsumers)
.flatMap(Collection::stream)
.forEach(this::recordMetricsForConsumer),
messageOutCounter,
bytesOutCounter,
messageAckCounter,
messageRedeliverCounter,
messageUnacknowledgedCounter,
messagePermitsCounter);
}

@Override
public void close() {
batchCallback.close();
}

private void recordMetricsForConsumer(Consumer consumer) {
var subscription = consumer.getSubscription();
var topicName = TopicName.get(subscription.getTopic().getName());

var builder = Attributes.builder()
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_NAME, consumer.consumerName())
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_ID, consumer.consumerId())
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_CONNECTED_SINCE,
consumer.getConnectedSince().getEpochSecond())
.put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_NAME, subscription.getName())
.put(OpenTelemetryAttributes.PULSAR_SUBSCRIPTION_TYPE, consumer.subType().toString())
.put(OpenTelemetryAttributes.PULSAR_DOMAIN, topicName.getDomain().toString())
.put(OpenTelemetryAttributes.PULSAR_TENANT, topicName.getTenant())
.put(OpenTelemetryAttributes.PULSAR_NAMESPACE, topicName.getNamespace())
.put(OpenTelemetryAttributes.PULSAR_TOPIC, topicName.getPartitionedTopicName());
if (topicName.isPartitioned()) {
builder.put(OpenTelemetryAttributes.PULSAR_PARTITION_INDEX, topicName.getPartitionIndex());
}
var clientAddress = consumer.getClientAddressAndPort();
if (clientAddress != null) {
builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_ADDRESS, clientAddress);
}
var clientVersion = consumer.getClientVersion();
if (clientVersion != null) {
builder.put(OpenTelemetryAttributes.PULSAR_CLIENT_VERSION, clientVersion);
}
var metadataList = consumer.getMetadata()
.entrySet()
.stream()
.map(e -> String.format("%s:%s", e.getKey(), e.getValue()))
.toList();
builder.put(OpenTelemetryAttributes.PULSAR_CONSUMER_METADATA, metadataList);
var attributes = builder.build();

messageOutCounter.record(consumer.getMsgOutCounter(), attributes);
bytesOutCounter.record(consumer.getBytesOutCounter(), attributes);
messageAckCounter.record(consumer.getMessageAckCounter(), attributes);
messageRedeliverCounter.record(consumer.getMessageRedeliverCounter(), attributes);
messageUnacknowledgedCounter.record(consumer.getUnackedMessages(),
Attributes.builder()
.putAll(attributes)
.put(OpenTelemetryAttributes.PULSAR_CONSUMER_BLOCKED, consumer.isBlocked())
.build());
messagePermitsCounter.record(consumer.getAvailablePermits(), attributes);
}
}
Loading

0 comments on commit e558cfe

Please sign in to comment.